Building a Scalable Rx Architecture

1. Introduction

In small Flutter applications, using simple Streams or basic BLoC patterns is often enough.

However, once applications grow into:

  • Large e-commerce apps
  • Social platforms
  • Messaging systems
  • Enterprise dashboards

basic state management becomes difficult to maintain.

Problems usually appear such as:

• duplicated streams
• inconsistent state updates
• race conditions
• difficult testing
• complex UI dependencies

This is where RxDart architecture becomes powerful.

RxDart allows us to build reactive data pipelines that scale well in large applications.

In this article we will build a production-level architecture using:

  • RxDart
  • Clean Architecture
  • Repository pattern
  • BLoC pattern
  • Stream composition

2. Architecture Overview

A large Flutter application using RxDart often follows this structure:

UI (Widgets)


BLoC / ViewModel


UseCases


Repository


Data Sources (API / Database / Cache)

Reactive Data Flow

User Action


BLoC Input (Sink)


Stream Processing (RxDart operators)


State Stream


UI rebuild

3. Real Production Example

Let’s build a real scenario:

Feature

Product Search Page

Requirements:

• search products
• debounce input
• cancel old requests
• show loading state
• cache results
• show errors

This is extremely common in real apps.


4. Data Model

class Product {
final String id;
final String name;
final double price; Product({
required this.id,
required this.name,
required this.price,
});
}

5. Repository Layer

Repository hides API implementation.

abstract class ProductRepository {
Future<List<Product>> searchProducts(String query);
}

Implementation

class ProductRepositoryImpl implements ProductRepository {  @override
Future<List<Product>> searchProducts(String query) async {
await Future.delayed(Duration(milliseconds: 600)); return List.generate(
5,
(index) => Product(
id: '$index',
name: '$query Product $index',
price: 10 + index.toDouble(),
),
);
}
}

6. State Model

Instead of emitting raw data, production apps use state objects.

class SearchState {  final bool isLoading;
final List<Product> products;
final String? error; SearchState({
required this.isLoading,
required this.products,
this.error,
}); factory SearchState.initial() {
return SearchState(
isLoading: false,
products: [],
error: null,
);
}
}

7. RxDart BLoC Implementation

Now we create a reactive BLoC.


Step 1 — Inputs

final _searchQuery = PublishSubject<String>();

User typing into search field pushes values here.


Step 2 — Output

final BehaviorSubject<SearchState> _state =
BehaviorSubject.seeded(SearchState.initial());

Step 3 — Constructor Logic

This is where Rx magic happens.

SearchBloc(this.repository) {  _searchQuery
.debounceTime(Duration(milliseconds: 400))
.distinct()
.switchMap((query) => _searchProducts(query).asStream())
.listen(_state.add);
}

Why These Operators?

OperatorPurpose
debounceTimewait for user to stop typing
distinctignore duplicate queries
switchMapcancel previous requests

This pattern is used in production search systems.


8. Search Logic

Future<SearchState> _searchProducts(String query) async {  if (query.isEmpty) {
return SearchState.initial();
} try { _state.add(
SearchState(
isLoading: true,
products: [],
),
); final results = await repository.searchProducts(query); return SearchState(
isLoading: false,
products: results,
); } catch (e) { return SearchState(
isLoading: false,
products: [],
error: e.toString(),
); }
}

9. Public API

Sink<String> get search => _searchQuery.sink;Stream<SearchState> get state => _state.stream;

10. Dispose

void dispose() {
_searchQuery.close();
_state.close();
}

Always close streams in BLoC.


11. Flutter UI Integration

class SearchPage extends StatelessWidget {

final bloc = SearchBloc(ProductRepositoryImpl());

@override
Widget build(BuildContext context) {

return Scaffold(
appBar: AppBar(title: Text("Product Search")),

body: Column(
children: [

TextField(
onChanged: bloc.search.add,
decoration: InputDecoration(
hintText: "Search products...",
),
),

Expanded(
child: StreamBuilder<SearchState>(
stream: bloc.state,
builder: (context, snapshot) {

final state = snapshot.data;

if (state == null) {
return Container();
}

if (state.isLoading) {
return Center(child: CircularProgressIndicator());
}

if (state.error != null) {
return Center(child: Text(state.error!));
}

return ListView.builder(
itemCount: state.products.length,
itemBuilder: (_, index) {
final product = state.products[index];

return ListTile(
title: Text(product.name),
subtitle: Text("\$${product.price}"),
);
},
);
},
),
),
],
),
);
}
}

12. Why This Architecture Works Well

This pattern solves many problems.


1 — Cancel Old Requests

switchMap

When user types quickly:

iphone
iphone 1
iphone 14

Old requests are automatically cancelled.


2 — Prevent Spamming API

debounceTime

Only fires when typing stops.


3 — Avoid Duplicate Requests

distinct()

Same query won’t trigger new API call.


4 — Reactive UI

UI automatically updates via:

StreamBuilder

13. Production Improvements

Large apps usually add:


Loading + Error + Data State

Instead of simple model:

sealed class ViewState<T> {}

Example:

class Loading<T> extends ViewState<T> {}
class Data<T> extends ViewState<T> { final T value; }
class Error<T> extends ViewState<T> { final String message; }

Result

Stream<ViewState<List<Product>>>

This improves UI logic clarity.


14. Advanced Stream Composition

Sometimes you must combine multiple streams.

Example:

Search results depend on:

• query
• filter
• sort order


Example

Rx.combineLatest3(
searchQuery,
filterStream,
sortStream,
(query, filter, sort) {
return SearchParams(query, filter, sort);
},
)
.switchMap(repository.search)

This pattern powers complex dashboards.


15. Example: Cart Total Calculation

Rx.combineLatest2(
cartItems,
discountStream,
(items, discount) {
final total = items.fold(0.0, (sum, item) => sum + item.price); return total - discount;
},
)

16. Example: Login Validation

Rx.combineLatest2(
emailStream,
passwordStream,
(email, password) {
return email.isValid && password.length > 6;
},
)

17. Handling Side Effects

Side effects should be handled carefully.

Example:

stream.doOnData((value) {
analytics.logEvent(value);
});

18. Testing Rx BLoC

RxDart BLoCs are very testable.

Example:

test("search emits loading then results", () async {  final bloc = SearchBloc(MockRepository());  bloc.search.add("iphone");  expectLater(
bloc.state,
emitsInOrder([
isA<SearchState>().having((s) => s.isLoading, "loading", true),
isA<SearchState>().having((s) => s.products.isNotEmpty, "products", true),
]),
);
});

19. Common Production Mistakes

1 — Too many Subjects

Bad:

PublishSubject
PublishSubject
PublishSubject
PublishSubject

Better:

Use stream composition instead.


2 — Memory Leaks

Forgetting:

dispose()

3 — Business Logic in UI

Wrong:

StreamBuilder(
builder: (context, snapshot) {
if (...) { ... }
}
)

Better: move logic into BLoC.


20. Recommended Project Structure

lib/
├── core/
│ └── rx_extensions

├── data/
│ ├── models
│ ├── repositories

├── domain/
│ ├── usecases

├── presentation/
│ ├── blocs
│ ├── pages
│ └── widgets

21. Performance Tips

Use:

shareReplay()

To avoid recomputation.

Example:

final cachedStream = apiCall.shareReplay(maxSize: 1);

22. When NOT to Use RxDart

Avoid RxDart when:

• simple screens
• small apps
• minimal state

In such cases:

  • Provider
  • Riverpod
  • ValueNotifier

are simpler.


23. Where RxDart Excels

RxDart shines in:

• chat systems
• search engines
• live dashboards
• streaming apps
• real-time collaboration apps


24. Real Companies Using Reactive Systems

Many modern apps use reactive programming:

  • Netflix
  • Uber
  • Airbnb
  • Twitter

Though not always Dart, the concept is the same.


25. Conclusion

RxDart is not just a library.

It is a powerful architectural tool that allows Flutter apps to scale.

Key lessons:

• Streams represent time-based data
• Rx operators transform streams
• BLoC organizes reactive logic
• Architecture separates concerns

When used correctly, RxDart enables:

  • scalable apps
  • predictable state
  • high testability
  • reactive UI

Để lại một bình luận

Email của bạn sẽ không được hiển thị công khai. Các trường bắt buộc được đánh dấu *

Lên đầu trang