1. Introduction
In small Flutter applications, using simple Streams or basic BLoC patterns is often enough.
However, once applications grow into:
- Large e-commerce apps
- Social platforms
- Messaging systems
- Enterprise dashboards
basic state management becomes difficult to maintain.
Problems usually appear such as:
• duplicated streams
• inconsistent state updates
• race conditions
• difficult testing
• complex UI dependencies
This is where RxDart architecture becomes powerful.
RxDart allows us to build reactive data pipelines that scale well in large applications.
In this article we will build a production-level architecture using:
- RxDart
- Clean Architecture
- Repository pattern
- BLoC pattern
- Stream composition
2. Architecture Overview
A large Flutter application using RxDart often follows this structure:
UI (Widgets)
│
▼
BLoC / ViewModel
│
▼
UseCases
│
▼
Repository
│
▼
Data Sources (API / Database / Cache)
Reactive Data Flow
User Action
│
▼
BLoC Input (Sink)
│
▼
Stream Processing (RxDart operators)
│
▼
State Stream
│
▼
UI rebuild
3. Real Production Example
Let’s build a real scenario:
Feature
Product Search Page
Requirements:
• search products
• debounce input
• cancel old requests
• show loading state
• cache results
• show errors
This is extremely common in real apps.
4. Data Model
class Product {
final String id;
final String name;
final double price; Product({
required this.id,
required this.name,
required this.price,
});
}
5. Repository Layer
Repository hides API implementation.
abstract class ProductRepository {
Future<List<Product>> searchProducts(String query);
}
Implementation
class ProductRepositoryImpl implements ProductRepository { @override
Future<List<Product>> searchProducts(String query) async {
await Future.delayed(Duration(milliseconds: 600)); return List.generate(
5,
(index) => Product(
id: '$index',
name: '$query Product $index',
price: 10 + index.toDouble(),
),
);
}
}
6. State Model
Instead of emitting raw data, production apps use state objects.
class SearchState { final bool isLoading;
final List<Product> products;
final String? error; SearchState({
required this.isLoading,
required this.products,
this.error,
}); factory SearchState.initial() {
return SearchState(
isLoading: false,
products: [],
error: null,
);
}
}
7. RxDart BLoC Implementation
Now we create a reactive BLoC.
Step 1 — Inputs
final _searchQuery = PublishSubject<String>();
User typing into search field pushes values here.
Step 2 — Output
final BehaviorSubject<SearchState> _state =
BehaviorSubject.seeded(SearchState.initial());
Step 3 — Constructor Logic
This is where Rx magic happens.
SearchBloc(this.repository) { _searchQuery
.debounceTime(Duration(milliseconds: 400))
.distinct()
.switchMap((query) => _searchProducts(query).asStream())
.listen(_state.add);
}
Why These Operators?
| Operator | Purpose |
|---|---|
| debounceTime | wait for user to stop typing |
| distinct | ignore duplicate queries |
| switchMap | cancel previous requests |
This pattern is used in production search systems.
8. Search Logic
Future<SearchState> _searchProducts(String query) async { if (query.isEmpty) {
return SearchState.initial();
} try { _state.add(
SearchState(
isLoading: true,
products: [],
),
); final results = await repository.searchProducts(query); return SearchState(
isLoading: false,
products: results,
); } catch (e) { return SearchState(
isLoading: false,
products: [],
error: e.toString(),
); }
}
9. Public API
Sink<String> get search => _searchQuery.sink;Stream<SearchState> get state => _state.stream;
10. Dispose
void dispose() {
_searchQuery.close();
_state.close();
}
Always close streams in BLoC.
11. Flutter UI Integration
class SearchPage extends StatelessWidget {
final bloc = SearchBloc(ProductRepositoryImpl());
@override
Widget build(BuildContext context) {
return Scaffold(
appBar: AppBar(title: Text("Product Search")),
body: Column(
children: [
TextField(
onChanged: bloc.search.add,
decoration: InputDecoration(
hintText: "Search products...",
),
),
Expanded(
child: StreamBuilder<SearchState>(
stream: bloc.state,
builder: (context, snapshot) {
final state = snapshot.data;
if (state == null) {
return Container();
}
if (state.isLoading) {
return Center(child: CircularProgressIndicator());
}
if (state.error != null) {
return Center(child: Text(state.error!));
}
return ListView.builder(
itemCount: state.products.length,
itemBuilder: (_, index) {
final product = state.products[index];
return ListTile(
title: Text(product.name),
subtitle: Text("\$${product.price}"),
);
},
);
},
),
),
],
),
);
}
}
12. Why This Architecture Works Well
This pattern solves many problems.
1 — Cancel Old Requests
switchMap
When user types quickly:
iphone
iphone 1
iphone 14
Old requests are automatically cancelled.
2 — Prevent Spamming API
debounceTime
Only fires when typing stops.
3 — Avoid Duplicate Requests
distinct()
Same query won’t trigger new API call.
4 — Reactive UI
UI automatically updates via:
StreamBuilder
13. Production Improvements
Large apps usually add:
Loading + Error + Data State
Instead of simple model:
sealed class ViewState<T> {}
Example:
class Loading<T> extends ViewState<T> {}
class Data<T> extends ViewState<T> { final T value; }
class Error<T> extends ViewState<T> { final String message; }
Result
Stream<ViewState<List<Product>>>
This improves UI logic clarity.
14. Advanced Stream Composition
Sometimes you must combine multiple streams.
Example:
Search results depend on:
• query
• filter
• sort order
Example
Rx.combineLatest3(
searchQuery,
filterStream,
sortStream,
(query, filter, sort) {
return SearchParams(query, filter, sort);
},
)
.switchMap(repository.search)
This pattern powers complex dashboards.
15. Example: Cart Total Calculation
Rx.combineLatest2(
cartItems,
discountStream,
(items, discount) {
final total = items.fold(0.0, (sum, item) => sum + item.price); return total - discount;
},
)
16. Example: Login Validation
Rx.combineLatest2(
emailStream,
passwordStream,
(email, password) {
return email.isValid && password.length > 6;
},
)
17. Handling Side Effects
Side effects should be handled carefully.
Example:
stream.doOnData((value) {
analytics.logEvent(value);
});
18. Testing Rx BLoC
RxDart BLoCs are very testable.
Example:
test("search emits loading then results", () async { final bloc = SearchBloc(MockRepository()); bloc.search.add("iphone"); expectLater(
bloc.state,
emitsInOrder([
isA<SearchState>().having((s) => s.isLoading, "loading", true),
isA<SearchState>().having((s) => s.products.isNotEmpty, "products", true),
]),
);
});
19. Common Production Mistakes
1 — Too many Subjects
Bad:
PublishSubject
PublishSubject
PublishSubject
PublishSubject
Better:
Use stream composition instead.
2 — Memory Leaks
Forgetting:
dispose()
3 — Business Logic in UI
Wrong:
StreamBuilder(
builder: (context, snapshot) {
if (...) { ... }
}
)
Better: move logic into BLoC.
20. Recommended Project Structure
lib/
├── core/
│ └── rx_extensions
│
├── data/
│ ├── models
│ ├── repositories
│
├── domain/
│ ├── usecases
│
├── presentation/
│ ├── blocs
│ ├── pages
│ └── widgets
21. Performance Tips
Use:
shareReplay()
To avoid recomputation.
Example:
final cachedStream = apiCall.shareReplay(maxSize: 1);
22. When NOT to Use RxDart
Avoid RxDart when:
• simple screens
• small apps
• minimal state
In such cases:
- Provider
- Riverpod
- ValueNotifier
are simpler.
23. Where RxDart Excels
RxDart shines in:
• chat systems
• search engines
• live dashboards
• streaming apps
• real-time collaboration apps
24. Real Companies Using Reactive Systems
Many modern apps use reactive programming:
- Netflix
- Uber
- Airbnb
Though not always Dart, the concept is the same.
25. Conclusion
RxDart is not just a library.
It is a powerful architectural tool that allows Flutter apps to scale.
Key lessons:
• Streams represent time-based data
• Rx operators transform streams
• BLoC organizes reactive logic
• Architecture separates concerns
When used correctly, RxDart enables:
- scalable apps
- predictable state
- high testability
- reactive UI



