Mastering Streams and StreamBuilder in Flutter
Mastering Streams and StreamBuilder in Flutter
What You’ll Learn
Understand Dart Streams and how to use StreamBuilder to create reactive UIs that automatically update when data changes. Perfect for real-time features like chat messages, live scores, or sensor data.
What Are Streams?
Think of a Stream as a pipe that delivers data over time, unlike Future which delivers data once. Streams are essential for:
- Real-time data (WebSocket messages, Firebase updates)
- User input events (text changes, button clicks)
- Periodic updates (timers, location tracking)
- File reading in chunks
Stream vs Future:
- Future: Single value, completes once (like an API call)
- Stream: Multiple values over time (like a live video feed)
Example: Real-Time Counter Stream
Let’s build a counter that updates every second using Streams:
import 'dart:async';
import 'package:flutter/material.dart';
class CounterStreamExample extends StatefulWidget {
@override
State<CounterStreamExample> createState() => _CounterStreamExampleState();
}
class _CounterStreamExampleState extends State<CounterStreamExample> {
late StreamController<int> _counterController;
int _counter = 0;
Timer? _timer;
@override
void initState() {
super.initState();
// Create a stream controller
_counterController = StreamController<int>();
// Start emitting values every second
_timer = Timer.periodic(Duration(seconds: 1), (timer) {
_counter++;
_counterController.add(_counter); // Add value to stream
});
}
@override
void dispose() {
_timer?.cancel();
_counterController.close(); // Always close streams to prevent memory leaks
super.dispose();
}
@override
Widget build(BuildContext context) {
return Scaffold(
appBar: AppBar(title: Text('Stream Counter')),
body: Center(
child: StreamBuilder<int>(
stream: _counterController.stream,
initialData: 0,
builder: (context, snapshot) {
// Handle different connection states
if (snapshot.hasError) {
return Text(
'Error: ${snapshot.error}',
style: TextStyle(color: Colors.red),
);
}
if (!snapshot.hasData) {
return CircularProgressIndicator();
}
// Display the latest value
return Column(
mainAxisAlignment: MainAxisAlignment.center,
children: [
Text(
'${snapshot.data}',
style: TextStyle(fontSize: 72, fontWeight: FontWeight.bold),
),
SizedBox(height: 16),
Text(
'Seconds elapsed',
style: TextStyle(fontSize: 18, color: Colors.grey),
),
],
);
},
),
),
);
}
}
StreamBuilder States
StreamBuilder provides different snapshot states to handle:
StreamBuilder<String>(
stream: myStream,
builder: (context, snapshot) {
// Check connection state
switch (snapshot.connectionState) {
case ConnectionState.none:
return Text('No stream attached');
case ConnectionState.waiting:
return CircularProgressIndicator();
case ConnectionState.active:
if (snapshot.hasError) {
return Text('Error: ${snapshot.error}');
}
if (snapshot.hasData) {
return Text('Data: ${snapshot.data}');
}
return Text('No data yet');
case ConnectionState.done:
return Text('Stream closed');
}
},
)
Practical Example: Search with Debouncing
Streams excel at handling user input with debouncing (waiting for user to stop typing):
class SearchScreen extends StatefulWidget {
@override
State<SearchScreen> createState() => _SearchScreenState();
}
class _SearchScreenState extends State<SearchScreen> {
final _searchController = TextEditingController();
final _queryController = StreamController<String>();
late Stream<List<String>> _searchResults;
@override
void initState() {
super.initState();
// Transform the query stream with debouncing
_searchResults = _queryController.stream
.debounceTime(Duration(milliseconds: 500)) // Wait 500ms after typing stops
.distinct() // Skip duplicate queries
.asyncMap((query) => _performSearch(query)); // Perform async search
}
Future<List<String>> _performSearch(String query) async {
if (query.isEmpty) return [];
// Simulate API call
await Future.delayed(Duration(milliseconds: 300));
// Mock search results
return [
'Result 1: $query',
'Result 2: $query',
'Result 3: $query',
];
}
@override
void dispose() {
_searchController.dispose();
_queryController.close();
super.dispose();
}
@override
Widget build(BuildContext context) {
return Scaffold(
appBar: AppBar(title: Text('Stream Search')),
body: Column(
children: [
Padding(
padding: EdgeInsets.all(16),
child: TextField(
controller: _searchController,
decoration: InputDecoration(
hintText: 'Search...',
prefixIcon: Icon(Icons.search),
border: OutlineInputBorder(),
),
onChanged: (query) {
_queryController.add(query); // Add query to stream
},
),
),
Expanded(
child: StreamBuilder<List<String>>(
stream: _searchResults,
builder: (context, snapshot) {
if (!snapshot.hasData) {
return Center(child: Text('Start typing to search'));
}
if (snapshot.hasError) {
return Center(child: Text('Error: ${snapshot.error}'));
}
final results = snapshot.data!;
if (results.isEmpty) {
return Center(child: Text('No results found'));
}
return ListView.builder(
itemCount: results.length,
itemBuilder: (context, index) {
return ListTile(
title: Text(results[index]),
leading: Icon(Icons.article),
);
},
);
},
),
),
],
),
);
}
}
Note: To use debounceTime(), add rxdart package to pubspec.yaml:
dependencies:
rxdart: ^0.27.7
Then import:
import 'package:rxdart/rxdart.dart';
Stream Types
Single Subscription Stream:
- Only one listener allowed
- Data consumed once
- Good for file reading or HTTP responses
Stream<int> singleStream() async* {
for (int i = 0; i < 5; i++) {
await Future.delayed(Duration(seconds: 1));
yield i; // Emit value
}
}
Broadcast Stream:
- Multiple listeners allowed
- All listeners receive same data
- Good for user events or shared state
final controller = StreamController<int>.broadcast();
// Multiple listeners
controller.stream.listen((data) => print('Listener 1: $data'));
controller.stream.listen((data) => print('Listener 2: $data'));
controller.add(42); // Both listeners receive 42
Stream Transformation Cheat Sheet
// Map - transform each value
stream.map((value) => value * 2);
// Where - filter values
stream.where((value) => value > 10);
// Take - limit number of values
stream.take(5);
// Skip - skip first N values
stream.skip(2);
// Distinct - skip duplicates
stream.distinct();
// AsyncMap - transform with async operation
stream.asyncMap((value) async => await fetchData(value));
// Transform with multiple operations
stream
.where((x) => x > 0)
.map((x) => x * 2)
.take(10);
Try It Yourself
Build a live chat message feature:
- Create a Stream that emits new messages
- Use StreamBuilder to display messages in a ListView
- Add a text field to send new messages
- Display typing indicator when stream is waiting
- Handle errors gracefully
Challenge: Create a real-time stock price tracker that:
- Emits random price updates every 2 seconds
- Shows price with color (green if up, red if down)
- Displays a line chart of recent prices
- Allows pausing/resuming the stream
Tip of the Day
Memory Management: Always close StreamControllers in dispose() to prevent memory leaks. Use StreamSubscription.cancel() if you manually subscribe to streams.
StreamBuilder vs listen(): Prefer StreamBuilder for UI updates—it automatically handles subscription lifecycle. Use stream.listen() for side effects like logging or navigation.
Async Generators: Create streams easily with async* and yield:
Stream<int> countStream(int max) async* {
for (int i = 1; i <= max; i++) {
await Future.delayed(Duration(seconds: 1));
yield i;
}
}