HomeDocumentationThe async nature of Dart
The async nature of Dart
15

Streams: The Async Iterator

Dart Streams Explained — Single-Subscription, Broadcast, and BLoC Under the Hood

April 2, 2026

You're using BLoC. Your widget rebuilds when the state changes. You've written this pattern a hundred times:

dart
BlocBuilder<CartBloc, CartState>(
  builder: (context, state) {
    return Text('${state.items.length} items');
  },
)

It works. The text updates when items are added. But what is BlocBuilder actually listening to? Where do the state updates come from? If you dig into the flutter_bloc source, you'll find this at the core:

dart
bloc.stream.listen((state) {
  // Rebuild the widget
});

A Stream. BLoC emits states on a Stream. BlocBuilder listens to that Stream. Every state change is a value pushed through a pipe, and the widget on the other end reacts. The entire BLoC pattern — events in, states out — is a StreamController with extra structure around it.

Streams are Dart's primitive for "values that arrive over time." A Future gives you one value, once. A Stream gives you zero, one, or many values, over any timespan, until it closes or errors. Understanding Streams means understanding the actual mechanism behind BLoC, Riverpod's StreamProvider, Firebase's realtime listeners, WebSocket connections, and every other "reactive" API in Flutter.

A Stream is a pipe

The simplest mental model: a Stream is a pipe with a producer on one end and a consumer on the other.

javascript
Producer ───► [ Stream ] ───► Consumer
  (adds values)            (receives values via listen)

The producer pushes values into the pipe. The consumer receives them by calling .listen(). The Stream handles the delivery — buffering, pausing, resuming, error propagation, and completion signaling.

dart
// Producer side
final controller = StreamController<int>();
controller.sink.add(1);
controller.sink.add(2);
controller.sink.add(3);
controller.close();

// Consumer side
controller.stream.listen(
  (value) => print('Got: $value'),
  onError: (e) => print('Error: $e'),
  onDone: () => print('Stream closed'),
);

Output:

javascript
Got: 1
Got: 2
Got: 3
Stream closed

Three types of signals flow through a Stream:

  • Data events — values of type T (the normal case)
  • Error events — errors that occurred during production
  • Done event — the stream has closed, no more values will arrive

A Stream can emit any combination of data and error events, in any order, followed by at most one done event. After the done event, the Stream is finished permanently.

Single-subscription vs broadcast

This is where most developers get confused, and where bugs hide.

Single-subscription streams can only be listened to once. One consumer. If you call .listen() a second time, you get a StateError: Stream has already been listened to.

dart
final controller = StreamController<int>(); // Single-subscription by default
final stream = controller.stream;

stream.listen((v) => print('Listener A: $v')); // Works
stream.listen((v) => print('Listener B: $v')); // THROWS StateError

Why would you want this restriction? Because single-subscription streams support backpressure. The consumer can pause the stream, and the producer stops producing. This is critical for I/O — if you're reading a 500MB file as a stream of bytes, you don't want 500MB buffered in memory because the consumer is processing slowly. Single-subscription streams let the consumer say "hold on, I'm busy" and the producer respects it.

Most streams you encounter are single-subscription: file reads, HTTP response bodies, StreamController() by default.

Broadcast streams can have multiple listeners. Any number of consumers can call .listen():

dart
final controller = StreamController<int>.broadcast();
final stream = controller.stream;

stream.listen((v) => print('Listener A: $v')); // Works
stream.listen((v) => print('Listener B: $v')); // Also works

The tradeoff: no backpressure. The producer emits values regardless of whether any consumer is listening, and regardless of how fast consumers process. Values emitted before any listener subscribes are lost — there's no buffer.

Broadcast streams are used when multiple parts of the code need the same events: UI events (tap streams), state changes (BLoC's state stream is broadcast), and observable patterns.

BLoC uses broadcast. When you have two BlocBuilder widgets listening to the same Bloc, both receive every state. This works because BLoC uses a broadcast StreamController internally. If it used single-subscription, only one widget could listen — your second BlocBuilder would crash.

StreamController: the producer's tool

StreamController is the standard way to create a Stream that you control. You push values into the controller's sink, and they come out the controller's stream.

dart
class TemperatureSensor {
  final _controller = StreamController<double>();

  Stream<double> get readings => _controller.stream;

  void _onNewReading(double celsius) {
    if (!_controller.isClosed) {
      _controller.add(celsius);
    }
  }

  void _onSensorError(Object error) {
    _controller.addError(error);
  }

  void dispose() {
    _controller.close();
  }
}

The StreamController manages the connection between producer and consumer. When a consumer calls stream.listen():

  1. The controller creates a StreamSubscription — the active connection.
  2. The subscription starts receiving events from the controller.
  3. The consumer gets a StreamSubscription object they can use to pause(), resume(), or cancel().

Internally, the controller maintains a buffer of events. If events are added before any listener subscribes (single-subscription only), they're buffered and delivered when the listener attaches. For broadcast streams, events without listeners are dropped.

The subscription: the live connection

.listen() doesn't return void — it returns a StreamSubscription. This object is the active connection between the stream and your callback. It's also the source of one of the most common memory leaks in Flutter.

dart
late StreamSubscription<CartState> _subscription;

@override
void initState() {
  super.initState();
  _subscription = cartBloc.stream.listen((state) {
    // Update UI
  });
}

@override
void dispose() {
  _subscription.cancel(); // CRITICAL
  super.dispose();
}

If you don't cancel the subscription in dispose(), the callback keeps a reference to your widget's state, the stream keeps a reference to the callback, and the garbage collector can't collect any of it. The widget is off-screen, the Element is unmounted, but the callback still fires on every state change — updating state that no longer exists.

In mild cases, this wastes memory and CPU. In severe cases, the callback calls setState() on a disposed State object, producing the infamous:

javascript
FlutterError: setState() called after dispose()

This is why BlocBuilder and StreamBuilder handle subscriptions internally — they cancel in their dispose(). If you use .listen() directly, the subscription lifecycle is your responsibility.

Cancellation is not optional. Every .listen() needs a corresponding .cancel(). If you see .listen() without a stored subscription, that's a bug:

dart
// BUG: no way to cancel this
stream.listen((value) => doSomething(value));

// CORRECT: store and cancel
_subscription = stream.listen((value) => doSomething(value));

`async*` and `yield`: generating streams

Just as async/await generates Futures, async*/yield generates Streams:

dart
Stream<int> countDown(int from) async* {
  for (int i = from; i >= 0; i--) {
    yield i;
    await Future.delayed(Duration(seconds: 1));
  }
}

// Usage:
countDown(5).listen((n) => print(n));
// Prints: 5, 4, 3, 2, 1, 0 (one per second)

async* marks a function that returns a Stream. yield pushes a value into that stream. await pauses the generator until the Future completes (the stream delivers no values during the pause).

The generator function doesn't run until someone calls .listen(). Once listened to, it runs until it hits a yield (pushes the value, suspends), an await (waits for the Future, suspends), or the function ends (closes the stream).

yield* delegates to another stream — every value from the inner stream is forwarded to the outer stream:

dart
Stream<int> mergedCountDown() async* {
  yield* countDown(3); // Yields 3, 2, 1, 0
  yield* countDown(2); // Then yields 2, 1, 0
}

The generator approach is cleaner than a StreamController when the production logic is sequential. Use StreamController when events come from external sources (callbacks, platform channels, hardware). Use async* when you're computing or transforming values in a flow you control.

Stream transformation

Streams support a chain of transformations, similar to Iterable methods but asynchronous:

dart
sensorReadings
    .where((temp) => temp > 30.0)           // Filter: only hot readings
    .map((temp) => '${temp.toStringAsFixed(1)}°C')  // Transform: to string
    .distinct()                              // Deduplicate: skip repeated values
    .listen((reading) => print('Alert: $reading'));

Each transformation creates a new Stream that wraps the original. The transformations are lazy — they don't buffer or pre-compute. Each value flows through the chain one at a time, as it's emitted by the source.

Common transformations:

MethodWhat it does
.map(fn)Transform each value
.where(fn)Filter values
.distinct()Skip consecutive duplicates
.take(n)Only the first n values, then close
.skip(n)Ignore the first n values
.expand(fn)One input → multiple outputs
.asyncMap(fn)Transform with an async function (awaits each)
.asyncExpand(fn)One input → a stream of outputs
.handleError(fn)Catch and handle errors in the stream
.timeout(duration)Error if no value arrives within duration

`.asyncMap()` deserves attention. It awaits each transformation before processing the next value. If the transformation is slow, it creates backpressure:

dart
sensorReadings
    .asyncMap((temp) async {
      // This API call takes 200ms
      final analysis = await analyzeTemperature(temp);
      return analysis;
    })
    .listen((result) => print(result));

If sensor readings arrive every 100ms but the analysis takes 200ms, .asyncMap() queues the values — it won't start the next analysis until the current one finishes. This prevents overwhelming the API but introduces latency. For "process the latest, skip stale values" behavior, you'd need a custom transformer or a debounce.

How BLoC uses Streams — concretely

Let's trace what happens inside a Bloc when you add an event:

dart
class CartBloc extends Bloc<CartEvent, CartState> {
  CartBloc() : super(CartState.empty()) {
    on<AddItem>((event, emit) {
      emit(state.copyWith(
        items: [...state.items, event.item],
      ));
    });
  }
}

When you call cartBloc.add(AddItem(product)):

  1. The event goes into an internal event stream (a StreamController<CartEvent>).
  2. BLoC's infrastructure listens to that event stream. It matches the event type to the registered handler (on<AddItem>).
  3. The handler runs. emit() pushes the new state into the state stream (a broadcast StreamController<CartState>).
  4. Every BlocBuilder, BlocListener, and .stream.listen() subscriber receives the new state — as a microtask, delivered by the event loop.

The entire pattern is two StreamControllers wired together with a handler function in between:

javascript
User action
  → bloc.add(event)
    → event StreamController
      → event handler (your business logic)
        → emit(newState)
          → state StreamController (broadcast)
            → BlocBuilder.listen callback
              → setState → rebuild

When people say "BLoC is just streams," this is what they mean. The Bloc class adds structure — event-to-state mapping, middleware (BlocObserver), error handling, state deduplication (emit skips if the new state == the previous state). But the transport mechanism is a broadcast stream.

This is also why BLoC states should override == (via Equatable or Freezed, as covered in the equality article). The stream doesn't automatically deduplicate — emit checks newState == state to decide whether to push. If your state class uses reference equality (the default), every copyWith creates a "different" state even if nothing changed, and every BlocBuilder rebuilds unnecessarily.

`StreamBuilder`: the widget side

Flutter's built-in StreamBuilder widget subscribes to a Stream and rebuilds whenever a new value arrives:

dart
StreamBuilder<int>(
  stream: countDown(10),
  builder: (context, snapshot) {
    if (snapshot.hasError) return Text('Error: ${snapshot.error}');
    if (!snapshot.hasData) return CircularProgressIndicator();
    return Text('${snapshot.data}');
  },
)

StreamBuilder manages the subscription lifecycle: it subscribes in initState, cancels and resubscribes if the stream property changes (via didUpdateWidget), and cancels in dispose. The AsyncSnapshot wrapper gives you the current state: waiting (no data yet), active (has data, stream still open), done (stream closed), or error.

The pattern works but has a rough edge: the builder runs on every value, even if you only care about some values. BlocBuilder improves on this with buildWhen — a filter that prevents unnecessary rebuilds:

dart
BlocBuilder<CartBloc, CartState>(
  buildWhen: (previous, current) => previous.items.length != current.items.length,
  builder: (context, state) => Text('${state.items.length} items'),
)

Under the hood, buildWhen is a conditional check inside BlocBuilder's listener callback — it receives every state but only calls setState when the condition returns true. This isn't a .where() stream transformation; the listener still sees every state (which is how previous tracking works), it just skips the rebuild for states that don't pass the filter.

Common stream mistakes

Listening to a single-subscription stream twice:

dart
final stream = file.openRead();
stream.listen((bytes) => process(bytes));
stream.listen((bytes) => log(bytes)); // CRASH: already listened to

Fix: use .asBroadcastStream() if you need multiple listeners, or restructure so one listener handles both tasks.

Forgetting to cancel subscriptions:

dart
// In initState — where's the cancel?
myStream.listen((v) => setState(() => _value = v));

Fix: store the StreamSubscription, cancel in dispose(). Or use StreamBuilder/BlocBuilder which handle it for you.

Creating a new stream on every build:

dart
StreamBuilder<int>(
  stream: countDown(10), // NEW stream instance every rebuild!
  builder: (context, snapshot) => Text('${snapshot.data}'),
)

Every time build() runs, countDown(10) creates a new Stream. StreamBuilder detects the change (the stream object is different), cancels the old subscription, and subscribes to the new one. The countdown restarts from 10 on every rebuild. Fix: create the stream once (in initState, in the Bloc, in a provider) and pass the same instance.

Not handling stream errors:

dart
stream.listen((v) => print(v)); // What if the stream emits an error?

An unhandled error in a stream subscription crashes your app. Always provide onError, or use handleError in the transformation chain.

Streams vs Futures — when to use which

SituationUse
One async result (API call, file read)Future
Multiple values over time (WebSocket, sensor, state changes)Stream
User events (taps, scroll, form changes)Stream
Polling (check every N seconds)Stream (via Stream.periodic or Timer.periodic)
One-shot initializationFuture
Reactive state managementStream (BLoC, Riverpod StreamProvider)

The dividing line: if the answer to "how many values?" is "exactly one," use a Future. If it's "zero or more, over time," use a Stream.

The connection to the event loop

Every value delivered by a Stream goes through the event loop (Post 0). When a StreamController calls add(value), the listener's callback isn't invoked immediately — it's scheduled as a microtask (for synchronous controllers) or as an event (for asynchronous delivery).

This means stream delivery is never synchronous from the caller's perspective:

dart
final controller = StreamController<int>();
controller.stream.listen((v) => print('Got: $v'));

controller.add(42);
print('Added');

// Output:
// Added
// Got: 42

print('Added') runs first because the listener callback is scheduled, not called inline. The value 42 is delivered on the next microtask drain, after the current synchronous code finishes.

For synchronous delivery (rare, but sometimes needed for performance), use StreamController<int>(sync: true). A synchronous controller invokes the listener callback immediately during add(), without going through the event loop. This is faster but dangerous — the listener runs in the middle of whatever code called add(), which can cause reentrancy bugs if the listener modifies state that the caller is also modifying.

Recent versions of the bloc package use a synchronous controller (sync: true) for the state stream, so emit(newState) delivers the state to listeners immediately during the emit call, without going through the microtask queue. This is faster but means listener callbacks run inline — which is fine because BlocBuilder just marks the widget as dirty, and the actual rebuild waits for the next frame anyway.

The next post covers Isolates — what to do when the single thread and event loop aren't enough, and you need real parallelism.

This is Post 2 of the Async Dart series. Previous: [Futures: Promises With a Dart Accent](/blog/dart-futures-explained). Next: [Isolates: Real Parallelism](/blog/dart-isolates-real-parallelism).

Related Topics

dart streams explaineddart streamcontrollerdart single subscription streamdart broadcast streamflutter bloc streamdart stream listendart async generatordart stream tutorial

Ready to build your app?

Flutter apps built on Clean Architecture — documented, tested, and yours to own. See which plan fits your project.