Now we go deeper into advanced Rx operators and patterns used in complex systems.
These operators help solve problems like:
- preventing duplicate user actions
- sequential API workflows
- batching events
- building reactive state machines
- handling unstable networks
All examples use RxDart.
1. exhaustMap — Prevent Duplicate Requests
Problem
Users sometimes trigger the same action multiple times.
Example:
User taps "Submit Order"
tap tap tap tap
Without protection:
4 API requests sent
This can cause:
- duplicate payments
- duplicate orders
- data corruption
Solution: exhaustMap
exhaustMap ignores new events while the current one is executing.
Example:
submitClicks
.exhaustMap((_) => repository.submitOrder())
.listen(showResult);
Timeline:
tap1 → API call starts
tap2 → ignored
tap3 → ignored
tap4 → ignored
Only one request executes.
Real Flutter Example
class CheckoutBloc {
final _submit = PublishSubject<void>();
late final Stream<OrderResult> result =
_submit.exhaustMap((_) => repository.placeOrder().asStream());
Sink<void> get submit => _submit.sink;
}
This prevents double purchase bugs.
2. concatMap — Sequential Async Tasks
Problem
Sometimes async tasks must execute in order.
Example:
upload file1
upload file2
upload file3
Requests must not overlap.
Solution: concatMap
concatMap ensures tasks run one after another.
uploadRequests.concatMap(repository.uploadFile);
Timeline:
upload1 → finish
upload2 → start
upload3 → start
Example
filesStream
.concatMap((file) => api.upload(file))
.listen(print);
Used in:
- file uploads
- transaction systems
- message queues
3. buffer — Batch Events
Problem
Sending events individually can be expensive.
Example:
analytics events
click events
tracking events
Sending each event individually is inefficient.
Solution: buffer
Batch events.
events.bufferTime(Duration(seconds: 5));
Example:
click click click click
Output:
[click, click, click, click]
Flutter Example
userEvents
.bufferTime(Duration(seconds: 10))
.listen(sendAnalyticsBatch);
Reduces network usage significantly.
4. window — Streaming Batches
window works like buffer, but returns streams of batches.
Example:
stream.windowCount(5);
Input:
1 2 3 4 5 6
Output:
Stream[1,2,3,4,5]
Stream[6,...]
Useful for streaming analytics pipelines.
5. shareReplay — Stream Caching
One of the most important operators in production.
Problem
Multiple widgets subscribe to the same stream.
Example:
Home screen
Cart screen
Favorites screen
Each subscription triggers another API call.
Solution
Use shareReplay.
final productsStream =
api.getProducts().shareReplay(maxSize: 1);
Now:
API called once
all subscribers share result
Why maxSize=1?
It caches latest result only, preventing memory growth.
6. retryWhen — Smart Network Retry
Networks fail frequently.
Basic retry:
stream.retry(3);
But sometimes you want delayed retries.
Example:
stream.retryWhen(
(errors, _) => errors.delay(Duration(seconds: 2)),
);
Timeline:
request fails
wait 2 seconds
retry
Production Example
repository.fetchProducts()
.retryWhen((errors, _) =>
errors.delay(Duration(seconds: 3)));
Useful for:
- unstable mobile networks
- websocket reconnection
7. doOnData — Debug Streams
Debugging reactive code can be difficult.
doOnData helps inspect streams.
Example:
stream
.doOnData((value) => print("data: $value"))
.listen(print);
Other debug operators:
doOnListen
doOnCancel
doOnError
doOnDone
These are very helpful in production debugging.
8. startWith — Initial State
Many UI streams require initial values.
Example:
counterStream.startWith(0);
UI immediately receives:
0
Instead of waiting for the first event.
Often used with loading states.
9. onErrorResume — Recover from Errors
Default stream behavior:
error → stream closes
But sometimes we want fallback values.
Example:
stream.onErrorResume((error, stack) {
return Stream.value([]);
});
Now:
error → return empty list
The stream continues working.
10. scan — Reactive State Machines
scan accumulates state over time.
Example:
Stream.fromIterable([1,2,3])
.scan((acc, value, _) => acc + value, 0);
Output:
1
3
6
Flutter Example: Cart
cartActions
.scan<CartState>((state, action, _) {
return state.reduce(action);
}, CartState.initial());
This builds a reactive state reducer.
11. Advanced Pattern — Infinite Scroll
Requirements:
scroll near bottom
load next page
append items
Implementation:
scrollStream
.throttleTime(Duration(milliseconds: 200))
.where((pos) => pos > threshold)
.exhaustMap((_) => repository.loadNextPage())
.scan<List<Item>>(
(all, page, _) => [...all, ...page],
[],
);
Operators used:
throttleTime
where
exhaustMap
scan
This creates efficient pagination streams.
12. Advanced Pattern — Live Search
queryStream
.debounceTime(Duration(milliseconds: 300))
.distinct()
.switchMap(repository.search)
.shareReplay(maxSize: 1);
Pipeline:
user typing
↓
debounce
↓
ignore duplicates
↓
cancel previous request
↓
cache results
13. Advanced Pattern — WebSocket + API Merge
Sometimes you combine real-time updates with API data.
Example:
Rx.merge([
api.fetchMessages(),
websocket.messageStream
]);
Result:
initial messages from API
live updates from websocket
Used in:
- chat apps
- trading platforms
- live dashboards
14. Memory Management Tips
Reactive systems can leak memory if misused.
Rules:
- Always close subjects
subject.close();
- Avoid unlimited replay buffers
Bad:
ReplaySubject();
Better:
ReplaySubject(maxSize: 10);
- Use
shareReplaycarefully.
15. Operator Selection Guide
| Problem | Operator |
|---|---|
| Cancel previous request | switchMap |
| Prevent duplicate click | exhaustMap |
| Sequential tasks | concatMap |
| Parallel tasks | flatMap |
| Cache latest value | shareReplay |
| Retry network | retryWhen |
| Batch events | buffer |
Key Takeaways
Advanced Rx operators unlock powerful capabilities in RxDart.
We explored:
- exhaustMap
- concatMap
- buffer
- window
- shareReplay
- retryWhen
- scan
- onErrorResume
These operators allow Flutter developers to build highly scalable reactive systems.



