Mastering Streams and StreamBuilder in Flutter

Mastering Streams and StreamBuilder in Flutter

What You’ll Learn

Understand Dart Streams and how to use StreamBuilder to create reactive UIs that automatically update when data changes. Perfect for real-time features like chat messages, live scores, or sensor data.

What Are Streams?

Think of a Stream as a pipe that delivers data over time, unlike Future which delivers data once. Streams are essential for:

Stream vs Future:

Example: Real-Time Counter Stream

Let’s build a counter that updates every second using Streams:

import 'dart:async';
import 'package:flutter/material.dart';

class CounterStreamExample extends StatefulWidget {
  @override
  State<CounterStreamExample> createState() => _CounterStreamExampleState();
}

class _CounterStreamExampleState extends State<CounterStreamExample> {
  late StreamController<int> _counterController;
  int _counter = 0;
  Timer? _timer;

  @override
  void initState() {
    super.initState();

    // Create a stream controller
    _counterController = StreamController<int>();

    // Start emitting values every second
    _timer = Timer.periodic(Duration(seconds: 1), (timer) {
      _counter++;
      _counterController.add(_counter); // Add value to stream
    });
  }

  @override
  void dispose() {
    _timer?.cancel();
    _counterController.close(); // Always close streams to prevent memory leaks
    super.dispose();
  }

  @override
  Widget build(BuildContext context) {
    return Scaffold(
      appBar: AppBar(title: Text('Stream Counter')),
      body: Center(
        child: StreamBuilder<int>(
          stream: _counterController.stream,
          initialData: 0,
          builder: (context, snapshot) {
            // Handle different connection states
            if (snapshot.hasError) {
              return Text(
                'Error: ${snapshot.error}',
                style: TextStyle(color: Colors.red),
              );
            }

            if (!snapshot.hasData) {
              return CircularProgressIndicator();
            }

            // Display the latest value
            return Column(
              mainAxisAlignment: MainAxisAlignment.center,
              children: [
                Text(
                  '${snapshot.data}',
                  style: TextStyle(fontSize: 72, fontWeight: FontWeight.bold),
                ),
                SizedBox(height: 16),
                Text(
                  'Seconds elapsed',
                  style: TextStyle(fontSize: 18, color: Colors.grey),
                ),
              ],
            );
          },
        ),
      ),
    );
  }
}

StreamBuilder States

StreamBuilder provides different snapshot states to handle:

StreamBuilder<String>(
  stream: myStream,
  builder: (context, snapshot) {
    // Check connection state
    switch (snapshot.connectionState) {
      case ConnectionState.none:
        return Text('No stream attached');

      case ConnectionState.waiting:
        return CircularProgressIndicator();

      case ConnectionState.active:
        if (snapshot.hasError) {
          return Text('Error: ${snapshot.error}');
        }
        if (snapshot.hasData) {
          return Text('Data: ${snapshot.data}');
        }
        return Text('No data yet');

      case ConnectionState.done:
        return Text('Stream closed');
    }
  },
)

Practical Example: Search with Debouncing

Streams excel at handling user input with debouncing (waiting for user to stop typing):

class SearchScreen extends StatefulWidget {
  @override
  State<SearchScreen> createState() => _SearchScreenState();
}

class _SearchScreenState extends State<SearchScreen> {
  final _searchController = TextEditingController();
  final _queryController = StreamController<String>();

  late Stream<List<String>> _searchResults;

  @override
  void initState() {
    super.initState();

    // Transform the query stream with debouncing
    _searchResults = _queryController.stream
        .debounceTime(Duration(milliseconds: 500)) // Wait 500ms after typing stops
        .distinct() // Skip duplicate queries
        .asyncMap((query) => _performSearch(query)); // Perform async search
  }

  Future<List<String>> _performSearch(String query) async {
    if (query.isEmpty) return [];

    // Simulate API call
    await Future.delayed(Duration(milliseconds: 300));

    // Mock search results
    return [
      'Result 1: $query',
      'Result 2: $query',
      'Result 3: $query',
    ];
  }

  @override
  void dispose() {
    _searchController.dispose();
    _queryController.close();
    super.dispose();
  }

  @override
  Widget build(BuildContext context) {
    return Scaffold(
      appBar: AppBar(title: Text('Stream Search')),
      body: Column(
        children: [
          Padding(
            padding: EdgeInsets.all(16),
            child: TextField(
              controller: _searchController,
              decoration: InputDecoration(
                hintText: 'Search...',
                prefixIcon: Icon(Icons.search),
                border: OutlineInputBorder(),
              ),
              onChanged: (query) {
                _queryController.add(query); // Add query to stream
              },
            ),
          ),
          Expanded(
            child: StreamBuilder<List<String>>(
              stream: _searchResults,
              builder: (context, snapshot) {
                if (!snapshot.hasData) {
                  return Center(child: Text('Start typing to search'));
                }

                if (snapshot.hasError) {
                  return Center(child: Text('Error: ${snapshot.error}'));
                }

                final results = snapshot.data!;

                if (results.isEmpty) {
                  return Center(child: Text('No results found'));
                }

                return ListView.builder(
                  itemCount: results.length,
                  itemBuilder: (context, index) {
                    return ListTile(
                      title: Text(results[index]),
                      leading: Icon(Icons.article),
                    );
                  },
                );
              },
            ),
          ),
        ],
      ),
    );
  }
}

Note: To use debounceTime(), add rxdart package to pubspec.yaml:

dependencies:
  rxdart: ^0.27.7

Then import:

import 'package:rxdart/rxdart.dart';

Stream Types

Single Subscription Stream:

Stream<int> singleStream() async* {
  for (int i = 0; i < 5; i++) {
    await Future.delayed(Duration(seconds: 1));
    yield i; // Emit value
  }
}

Broadcast Stream:

final controller = StreamController<int>.broadcast();

// Multiple listeners
controller.stream.listen((data) => print('Listener 1: $data'));
controller.stream.listen((data) => print('Listener 2: $data'));

controller.add(42); // Both listeners receive 42

Stream Transformation Cheat Sheet

// Map - transform each value
stream.map((value) => value * 2);

// Where - filter values
stream.where((value) => value > 10);

// Take - limit number of values
stream.take(5);

// Skip - skip first N values
stream.skip(2);

// Distinct - skip duplicates
stream.distinct();

// AsyncMap - transform with async operation
stream.asyncMap((value) async => await fetchData(value));

// Transform with multiple operations
stream
  .where((x) => x > 0)
  .map((x) => x * 2)
  .take(10);

Try It Yourself

Build a live chat message feature:

  1. Create a Stream that emits new messages
  2. Use StreamBuilder to display messages in a ListView
  3. Add a text field to send new messages
  4. Display typing indicator when stream is waiting
  5. Handle errors gracefully

Challenge: Create a real-time stock price tracker that:

Tip of the Day

Memory Management: Always close StreamControllers in dispose() to prevent memory leaks. Use StreamSubscription.cancel() if you manually subscribe to streams.

StreamBuilder vs listen(): Prefer StreamBuilder for UI updates—it automatically handles subscription lifecycle. Use stream.listen() for side effects like logging or navigation.

Async Generators: Create streams easily with async* and yield:

Stream<int> countStream(int max) async* {
  for (int i = 1; i <= max; i++) {
    await Future.delayed(Duration(seconds: 1));
    yield i;
  }
}