Many developers understand Rx operators but still struggle to apply them in large applications.
Common problems include:
- streams scattered across the codebase
- excessive subjects
- difficult debugging
- memory leaks
- tightly coupled UI and logic
A good architecture should solve these problems while providing:
predictable data flow
clear separation of responsibilities
testability
scalability
In this article we will build a complete RxDart architecture used in real Flutter apps.
We will cover:
- Architecture layers
- Reactive BLoC pattern
- API pipelines
- caching streams
- pagination
- error recovery
- lifecycle management
All examples use RxDart.
1. Reactive Architecture Overview
A well-designed reactive system has clear layers.
UI
↓
BLoC
↓
Repository
↓
Data Source
↓
API / Database
Each layer communicates through streams.
Example pipeline:
User action
↓
BLoC input stream
↓
operators
↓
repository request
↓
result stream
↓
UI update
The important rule:
UI never talks directly to repositories or APIs.
Everything flows through the BLoC stream layer.
2. Project Structure
A typical Flutter project using RxDart might look like:
lib/
├── core/
│ ├── network
│ ├── error
│ └── rx_extensions
│
├── data/
│ ├── models
│ ├── api
│ └── repositories
│
├── domain/
│ ├── entities
│ └── usecases
│
├── presentation/
│ ├── blocs
│ ├── widgets
│ └── pages
Important layer:
presentation/blocs
This is where RxDart logic lives.
3. The Reactive BLoC Pattern
The BLoC pattern can be implemented with simple streams, but RxDart provides far more powerful pipelines.
Basic idea:
Inputs (Sink)
↓
Operators
↓
Output Streams
Example:
class CounterBloc {
final _increment = PublishSubject<void>();
Sink<void> get increment => _increment.sink;
late final Stream<int> counter =
_increment
.scan((count, _, __) => count + 1, 0)
.startWith(0);
void dispose() {
_increment.close();
}
}
Pipeline:
button click
↓
increment subject
↓
scan operator
↓
counter stream
↓
UI
UI usage:
StreamBuilder<int>(
stream: bloc.counter,
builder: (context, snapshot) {
return Text("${snapshot.data}");
},
);
4. Designing Input Streams
Inputs represent user actions.
Examples:
button taps
text input
scroll events
refresh actions
These are usually implemented using PublishSubject.
Example:
final _refresh = PublishSubject<void>();
final _searchQuery = PublishSubject<String>();
final _loadNextPage = PublishSubject<void>();
Important rule:
Subjects should only be used for inputs, not internal logic.
5. Building an API Request Pipeline
Consider a search feature.
Requirements:
user types
↓
debounce
↓
cancel previous request
↓
fetch API
↓
show results
Implementation:
class SearchBloc {
final _query = PublishSubject<String>();
Sink<String> get query => _query.sink;
late final Stream<List<Product>> results =
_query
.debounceTime(Duration(milliseconds: 300))
.distinct()
.switchMap(repository.search)
.shareReplay(maxSize: 1);
void dispose() {
_query.close();
}
}
Pipeline:
text input
↓
debounce
↓
distinct
↓
switchMap
↓
repository search
↓
result stream
Why switchMap?
Because it cancels previous requests.
Without it:
race conditions
outdated results
UI flickering
6. Building a Caching Stream
Large apps must avoid unnecessary network requests.
RxDart provides an elegant solution using shareReplay.
Example repository:
class ProductRepository {
Stream<List<Product>>? _cache;
Stream<List<Product>> getProducts() {
_cache ??= api.getProducts()
.shareReplay(maxSize: 1);
return _cache!;
}
}
Benefits:
multiple listeners share same data
API called only once
latest value cached
Example:
Home screen
Favorites screen
Cart screen
All subscribe to the same product stream.
7. Pagination with RxDart
Pagination is one of the best examples of reactive programming.
Flow:
scroll event
↓
check threshold
↓
request next page
↓
merge results
Implementation:
class PaginationBloc {
final _loadNextPage = PublishSubject<void>();
int _currentPage = 1;
late final Stream<List<Product>> products =
_loadNextPage
.startWith(null)
.exhaustMap((_) => repository.fetchPage(_currentPage++))
.scan<List<Product>>(
(all, page, _) => [...all, ...page],
[],
)
.shareReplay(maxSize: 1);
Sink<void> get loadNextPage => _loadNextPage.sink;
void dispose() {
_loadNextPage.close();
}
}
Key operators used:
startWith
exhaustMap
scan
shareReplay
This architecture:
prevents duplicate requests
keeps item order
accumulates pages
8. Error Handling Pipelines
Network requests often fail.
Reactive systems should recover gracefully.
Example:
repository.fetchProducts()
.retry(3)
.onErrorReturn([]);
Explanation:
retry 3 times
if still failing return empty list
More advanced recovery:
repository.fetchProducts()
.retryWhen((errors, _) =>
errors.delay(Duration(seconds: 2)));
This creates automatic retry logic.
9. Loading State Streams
Most screens require three UI states:
loading
data
error
Reactive architecture can represent this using state streams.
Example state model:
class UiState<T> {
final bool loading;
final T? data;
final Object? error;
UiState.loading() : loading = true, data = null, error = null;
UiState.data(this.data) : loading = false, error = null;
UiState.error(this.error) : loading = false, data = null;
}
Pipeline:
Stream<UiState<List<Product>>> products =
repository.getProducts()
.map(UiState.data)
.startWith(UiState.loading())
.onErrorReturnWith(UiState.error);
This stream can directly power the UI.
10. Stream Lifecycle Management
One of the most common mistakes with RxDart is memory leaks.
Always close subjects.
Example:
class BaseBloc {
final _disposables = <StreamController>[];
void register(StreamController controller) {
_disposables.add(controller);
}
void dispose() {
for (final c in _disposables) {
c.close();
}
}
}
Alternatively:
use rxdart CompositeSubscription pattern
Rule:
Every Subject must be closed.
11. Debugging Reactive Pipelines
Reactive code can be hard to debug.
Helpful operators:
doOnData
doOnError
doOnListen
Example:
stream
.doOnData((data) => print("data: $data"))
.doOnError((e) => print("error: $e"));
These provide observability for complex pipelines.
12. Real Example: Full Search Architecture
Let’s combine everything.
class SearchBloc {
final _query = PublishSubject<String>();
Sink<String> get query => _query.sink;
late final Stream<UiState<List<Product>>> results =
_query
.debounceTime(Duration(milliseconds: 300))
.distinct()
.switchMap(repository.search)
.map(UiState.data)
.startWith(UiState.loading())
.onErrorReturnWith(UiState.error)
.shareReplay(maxSize: 1);
void dispose() {
_query.close();
}
}
Pipeline visualization:
user typing
↓
debounce
↓
distinct
↓
switchMap(API)
↓
map to UI state
↓
cache result
↓
UI
This is a production-ready reactive pipeline.
13. Best Practices for Production RxDart
1. Limit Subject usage
Subjects should only represent inputs.
2. Prefer pure stream pipelines
Instead of:
Subject → Subject → Subject
Use:
Subject → operators → Stream
3. Cache expensive streams
Use:
shareReplay
for API results.
4. Always handle errors
Never allow streams to crash silently.
5. Close streams
Always implement dispose().
Key Takeaways
Production applications using RxDart should follow these principles:
clear architecture layers
minimal Subject usage
operator-driven pipelines
proper lifecycle management
error handling and caching
RxDart allows Flutter apps to build powerful reactive systems capable of handling:
real-time data
complex UI state
high-frequency events
network pipelines



