Flutter’s stream-based architecture is one of its most powerful features, enabling reactive programming and efficient data flow management. In this comprehensive guide, we’ll explore streams from basic concepts to advanced implementations.
Table of Contents
- Introduction to Streams
- Basic Stream Concepts
- StreamController and StreamBuilder
- Stream Transformations
- Broadcast Streams
- Error Handling
- Advanced Stream Operations
- Best Practices and Common Patterns
Introduction to Streams
Streams in Flutter represent a sequence of asynchronous data. Think of them as a pipe through which data flows – you can put data in one end and listen to it come out the other end. They’re perfect for handling real-time data, user events, or any scenario where data arrives over time.
Basic Stream Concepts
Creating a Simple Stream
The most basic way to create a stream is using Stream.fromIterable
:
Stream<int> simpleStream() {
return Stream.fromIterable([1, 2, 3, 4, 5]);
}
// Using the stream
void listenToStream() {
simpleStream().listen(
(data) => print('Data: $data'),
onError: (error) => print('Error: $error'),
onDone: () => print('Stream completed'),
);
}
StreamController
StreamController gives you more control over your streams:
StreamController<String> streamController = StreamController<String>();
// Add data to the stream
void addData() {
streamController.sink.add('Hello');
streamController.sink.add('World');
}
// Listen to the stream
void listenToController() {
streamController.stream.listen(
(data) => print('Received: $data'),
onDone: () => print('Stream closed'),
);
}
// Don't forget to close
void dispose() {
streamController.close();
}
StreamBuilder Widget
StreamBuilder is Flutter’s way of rebuilding UI based on stream data:
class StreamDisplay extends StatelessWidget {
final Stream<String> dataStream;
StreamDisplay({required this.dataStream});
@override
Widget build(BuildContext context) {
return StreamBuilder<String>(
stream: dataStream,
initialData: 'Initial Data',
builder: (context, snapshot) {
if (snapshot.hasError) {
return Text('Error: ${snapshot.error}');
}
if (snapshot.connectionState == ConnectionState.waiting) {
return CircularProgressIndicator();
}
return Text('Data: ${snapshot.data}');
},
);
}
}
Stream Transformations
Using map, where, and expand
Stream<int> numberStream() async* {
for (int i = 1; i <= 5; i++) {
yield i;
}
}
void transformStreams() {
// Map transformation
numberStream()
.map((number) => number * 2)
.listen((data) => print('Doubled: $data'));
// Filtering with where
numberStream()
.where((number) => number.isEven)
.listen((data) => print('Even numbers: $data'));
// Expanding streams
numberStream()
.expand((number) => [number, number * number])
.listen((data) => print('Expanded: $data'));
}
Broadcast Streams
Broadcast streams allow multiple listeners:
StreamController<String> broadcastController = StreamController<String>.broadcast();
void setupBroadcast() {
// First listener
broadcastController.stream.listen(
(data) => print('Listener 1: $data'),
);
// Second listener
broadcastController.stream.listen(
(data) => print('Listener 2: $data'),
);
// Adding data
broadcastController.sink.add('Broadcasting!');
}
Error Handling
Proper error handling in streams is crucial:
Stream<int> errorStream() async* {
yield 1;
throw Exception('Something went wrong');
yield 2;
}
void handleErrors() {
errorStream()
.handleError((error) => print('Caught error: $error'))
.listen(
(data) => print('Data: $data'),
onError: (error) => print('Error in listen: $error'),
);
}
Advanced Stream Operations
Combining Streams
void combineStreams() {
Stream<int> stream1 = Stream.fromIterable([1, 2, 3]);
Stream<int> stream2 = Stream.fromIterable([4, 5, 6]);
// Merging streams
Stream.merge([stream1, stream2])
.listen((data) => print('Merged: $data'));
// Zipping streams
StreamZip([stream1, stream2])
.listen((List<int> data) => print('Zipped: $data'));
}
Debouncing and Throttling
Stream<String> searchStream(StreamController<String> controller) {
return controller.stream
.debounceTime(Duration(milliseconds: 300))
.distinct()
.listen((query) => print('Searching for: $query'));
}
AsyncMap for API Calls
Stream<String> fetchData(Stream<String> inputStream) {
return inputStream.asyncMap((query) async {
// Simulate API call
await Future.delayed(Duration(seconds: 1));
return 'Result for: $query';
});
}
Best Practices and Common Patterns
- Always Close Controllers: Prevent memory leaks by closing StreamControllers.
class StreamManager {
final StreamController _controller = StreamController();
void dispose() {
_controller.close();
}
}
- Use StreamSubscription Management:
class StreamSubscriptionManager {
StreamSubscription? _subscription;
void startListening(Stream stream) {
_subscription?.cancel();
_subscription = stream.listen((data) {
// Handle data
});
}
void dispose() {
_subscription?.cancel();
}
}
- Implement Retry Logic:
Stream<T> retryStream<T>(Stream<T> Function() streamFactory) {
return Stream.fromFuture(Future(() async {
int attempts = 3;
while (attempts > 0) {
try {
await for (final data in streamFactory()) {
yield data;
}
break;
} catch (e) {
attempts--;
if (attempts == 0) rethrow;
await Future.delayed(Duration(seconds: 1));
}
}
}));
}
Practical Example: Real-time Search
Here’s a complete example implementing a real-time search feature:
class SearchBloc {
final _searchController = StreamController<String>();
final _resultsController = StreamController<List<String>>();
StreamSubscription? _subscription;
Stream<List<String>> get results => _resultsController.stream;
SearchBloc() {
_subscription = _searchController.stream
.debounceTime(Duration(milliseconds: 300))
.distinct()
.where((query) => query.length >= 2)
.asyncMap(_performSearch)
.listen(_resultsController.add);
}
void search(String query) {
_searchController.add(query);
}
Future<List<String>> _performSearch(String query) async {
// Simulate API call
await Future.delayed(Duration(seconds: 1));
return ['Result 1 for $query', 'Result 2 for $query'];
}
void dispose() {
_searchController.close();
_resultsController.close();
_subscription?.cancel();
}
}
Conclusion
Streams are a fundamental part of Flutter’s reactive programming model. They provide a powerful way to handle asynchronous data and events in your applications. By understanding and properly implementing streams, you can create more responsive and maintainable Flutter applications.
Remember these key points:
- Use StreamController for fine-grained control
- Implement proper error handling
- Always dispose of streams and subscriptions
- Choose between single-subscription and broadcast streams based on your needs
- Leverage stream transformations for data manipulation
- Consider using StreamBuilder for UI updates
With these concepts and patterns in mind, you can effectively implement stream-based solutions in your Flutter applications.

