Building a Production-Grade RxDart Architecture in Flutter

Many developers understand Rx operators but still struggle to apply them in large applications.

Common problems include:

  • streams scattered across the codebase
  • excessive subjects
  • difficult debugging
  • memory leaks
  • tightly coupled UI and logic

A good architecture should solve these problems while providing:

predictable data flow
clear separation of responsibilities
testability
scalability

In this article we will build a complete RxDart architecture used in real Flutter apps.

We will cover:

  1. Architecture layers
  2. Reactive BLoC pattern
  3. API pipelines
  4. caching streams
  5. pagination
  6. error recovery
  7. lifecycle management

All examples use RxDart.


1. Reactive Architecture Overview

A well-designed reactive system has clear layers.

UI

BLoC

Repository

Data Source

API / Database

Each layer communicates through streams.

Example pipeline:

User action

BLoC input stream

operators

repository request

result stream

UI update

The important rule:

UI never talks directly to repositories or APIs.

Everything flows through the BLoC stream layer.


2. Project Structure

A typical Flutter project using RxDart might look like:

lib/
├── core/
│ ├── network
│ ├── error
│ └── rx_extensions

├── data/
│ ├── models
│ ├── api
│ └── repositories

├── domain/
│ ├── entities
│ └── usecases

├── presentation/
│ ├── blocs
│ ├── widgets
│ └── pages

Important layer:

presentation/blocs

This is where RxDart logic lives.


3. The Reactive BLoC Pattern

The BLoC pattern can be implemented with simple streams, but RxDart provides far more powerful pipelines.

Basic idea:

Inputs (Sink)

Operators

Output Streams

Example:

class CounterBloc {

final _increment = PublishSubject<void>();

Sink<void> get increment => _increment.sink;

late final Stream<int> counter =
_increment
.scan((count, _, __) => count + 1, 0)
.startWith(0);

void dispose() {
_increment.close();
}
}

Pipeline:

button click

increment subject

scan operator

counter stream

UI

UI usage:

StreamBuilder<int>(
stream: bloc.counter,
builder: (context, snapshot) {
return Text("${snapshot.data}");
},
);

4. Designing Input Streams

Inputs represent user actions.

Examples:

button taps
text input
scroll events
refresh actions

These are usually implemented using PublishSubject.

Example:

final _refresh = PublishSubject<void>();
final _searchQuery = PublishSubject<String>();
final _loadNextPage = PublishSubject<void>();

Important rule:

Subjects should only be used for inputs, not internal logic.


5. Building an API Request Pipeline

Consider a search feature.

Requirements:

user types

debounce

cancel previous request

fetch API

show results

Implementation:

class SearchBloc {

final _query = PublishSubject<String>();

Sink<String> get query => _query.sink;

late final Stream<List<Product>> results =
_query
.debounceTime(Duration(milliseconds: 300))
.distinct()
.switchMap(repository.search)
.shareReplay(maxSize: 1);

void dispose() {
_query.close();
}
}

Pipeline:

text input

debounce

distinct

switchMap

repository search

result stream

Why switchMap?

Because it cancels previous requests.

Without it:

race conditions
outdated results
UI flickering

6. Building a Caching Stream

Large apps must avoid unnecessary network requests.

RxDart provides an elegant solution using shareReplay.

Example repository:

class ProductRepository {

Stream<List<Product>>? _cache;

Stream<List<Product>> getProducts() {
_cache ??= api.getProducts()
.shareReplay(maxSize: 1);

return _cache!;
}
}

Benefits:

multiple listeners share same data
API called only once
latest value cached

Example:

Home screen
Favorites screen
Cart screen

All subscribe to the same product stream.


7. Pagination with RxDart

Pagination is one of the best examples of reactive programming.

Flow:

scroll event

check threshold

request next page

merge results

Implementation:

class PaginationBloc {

final _loadNextPage = PublishSubject<void>();

int _currentPage = 1;

late final Stream<List<Product>> products =
_loadNextPage
.startWith(null)
.exhaustMap((_) => repository.fetchPage(_currentPage++))
.scan<List<Product>>(
(all, page, _) => [...all, ...page],
[],
)
.shareReplay(maxSize: 1);

Sink<void> get loadNextPage => _loadNextPage.sink;

void dispose() {
_loadNextPage.close();
}
}

Key operators used:

startWith
exhaustMap
scan
shareReplay

This architecture:

prevents duplicate requests
keeps item order
accumulates pages

8. Error Handling Pipelines

Network requests often fail.

Reactive systems should recover gracefully.

Example:

repository.fetchProducts()
.retry(3)
.onErrorReturn([]);

Explanation:

retry 3 times
if still failing return empty list

More advanced recovery:

repository.fetchProducts()
.retryWhen((errors, _) =>
errors.delay(Duration(seconds: 2)));

This creates automatic retry logic.


9. Loading State Streams

Most screens require three UI states:

loading
data
error

Reactive architecture can represent this using state streams.

Example state model:

class UiState<T> {
final bool loading;
final T? data;
final Object? error;

UiState.loading() : loading = true, data = null, error = null;
UiState.data(this.data) : loading = false, error = null;
UiState.error(this.error) : loading = false, data = null;
}

Pipeline:

Stream<UiState<List<Product>>> products =
repository.getProducts()
.map(UiState.data)
.startWith(UiState.loading())
.onErrorReturnWith(UiState.error);

This stream can directly power the UI.


10. Stream Lifecycle Management

One of the most common mistakes with RxDart is memory leaks.

Always close subjects.

Example:

class BaseBloc {

final _disposables = <StreamController>[];

void register(StreamController controller) {
_disposables.add(controller);
}

void dispose() {
for (final c in _disposables) {
c.close();
}
}
}

Alternatively:

use rxdart CompositeSubscription pattern

Rule:

Every Subject must be closed.

11. Debugging Reactive Pipelines

Reactive code can be hard to debug.

Helpful operators:

doOnData
doOnError
doOnListen

Example:

stream
.doOnData((data) => print("data: $data"))
.doOnError((e) => print("error: $e"));

These provide observability for complex pipelines.


12. Real Example: Full Search Architecture

Let’s combine everything.

class SearchBloc {

final _query = PublishSubject<String>();

Sink<String> get query => _query.sink;

late final Stream<UiState<List<Product>>> results =
_query
.debounceTime(Duration(milliseconds: 300))
.distinct()
.switchMap(repository.search)
.map(UiState.data)
.startWith(UiState.loading())
.onErrorReturnWith(UiState.error)
.shareReplay(maxSize: 1);

void dispose() {
_query.close();
}
}

Pipeline visualization:

user typing

debounce

distinct

switchMap(API)

map to UI state

cache result

UI

This is a production-ready reactive pipeline.


13. Best Practices for Production RxDart

1. Limit Subject usage

Subjects should only represent inputs.


2. Prefer pure stream pipelines

Instead of:

Subject → Subject → Subject

Use:

Subject → operators → Stream

3. Cache expensive streams

Use:

shareReplay

for API results.


4. Always handle errors

Never allow streams to crash silently.


5. Close streams

Always implement dispose().


Key Takeaways

Production applications using RxDart should follow these principles:

clear architecture layers
minimal Subject usage
operator-driven pipelines
proper lifecycle management
error handling and caching

RxDart allows Flutter apps to build powerful reactive systems capable of handling:

real-time data
complex UI state
high-frequency events
network pipelines

Để lại một bình luận

Email của bạn sẽ không được hiển thị công khai. Các trường bắt buộc được đánh dấu *

Lên đầu trang