You're using BLoC. Your widget rebuilds when the state changes. You've written this pattern a hundred times:
BlocBuilder<CartBloc, CartState>(
builder: (context, state) {
return Text('${state.items.length} items');
},
)It works. The text updates when items are added. But what is BlocBuilder actually listening to? Where do the state updates come from? If you dig into the flutter_bloc source, you'll find this at the core:
bloc.stream.listen((state) {
// Rebuild the widget
});A Stream. BLoC emits states on a Stream. BlocBuilder listens to that Stream. Every state change is a value pushed through a pipe, and the widget on the other end reacts. The entire BLoC pattern — events in, states out — is a StreamController with extra structure around it.
Streams are Dart's primitive for "values that arrive over time." A Future gives you one value, once. A Stream gives you zero, one, or many values, over any timespan, until it closes or errors. Understanding Streams means understanding the actual mechanism behind BLoC, Riverpod's StreamProvider, Firebase's realtime listeners, WebSocket connections, and every other "reactive" API in Flutter.
A Stream is a pipe
The simplest mental model: a Stream is a pipe with a producer on one end and a consumer on the other.
Producer ───► [ Stream ] ───► Consumer
(adds values) (receives values via listen)The producer pushes values into the pipe. The consumer receives them by calling .listen(). The Stream handles the delivery — buffering, pausing, resuming, error propagation, and completion signaling.
// Producer side
final controller = StreamController<int>();
controller.sink.add(1);
controller.sink.add(2);
controller.sink.add(3);
controller.close();
// Consumer side
controller.stream.listen(
(value) => print('Got: $value'),
onError: (e) => print('Error: $e'),
onDone: () => print('Stream closed'),
);Output:
Got: 1
Got: 2
Got: 3
Stream closedThree types of signals flow through a Stream:
- Data events — values of type
T(the normal case) - Error events — errors that occurred during production
- Done event — the stream has closed, no more values will arrive
A Stream can emit any combination of data and error events, in any order, followed by at most one done event. After the done event, the Stream is finished permanently.
Single-subscription vs broadcast
This is where most developers get confused, and where bugs hide.
Single-subscription streams can only be listened to once. One consumer. If you call .listen() a second time, you get a StateError: Stream has already been listened to.
final controller = StreamController<int>(); // Single-subscription by default
final stream = controller.stream;
stream.listen((v) => print('Listener A: $v')); // Works
stream.listen((v) => print('Listener B: $v')); // THROWS StateErrorWhy would you want this restriction? Because single-subscription streams support backpressure. The consumer can pause the stream, and the producer stops producing. This is critical for I/O — if you're reading a 500MB file as a stream of bytes, you don't want 500MB buffered in memory because the consumer is processing slowly. Single-subscription streams let the consumer say "hold on, I'm busy" and the producer respects it.
Most streams you encounter are single-subscription: file reads, HTTP response bodies, StreamController() by default.
Broadcast streams can have multiple listeners. Any number of consumers can call .listen():
final controller = StreamController<int>.broadcast();
final stream = controller.stream;
stream.listen((v) => print('Listener A: $v')); // Works
stream.listen((v) => print('Listener B: $v')); // Also worksThe tradeoff: no backpressure. The producer emits values regardless of whether any consumer is listening, and regardless of how fast consumers process. Values emitted before any listener subscribes are lost — there's no buffer.
Broadcast streams are used when multiple parts of the code need the same events: UI events (tap streams), state changes (BLoC's state stream is broadcast), and observable patterns.
BLoC uses broadcast. When you have two BlocBuilder widgets listening to the same Bloc, both receive every state. This works because BLoC uses a broadcast StreamController internally. If it used single-subscription, only one widget could listen — your second BlocBuilder would crash.
StreamController: the producer's tool
StreamController is the standard way to create a Stream that you control. You push values into the controller's sink, and they come out the controller's stream.
class TemperatureSensor {
final _controller = StreamController<double>();
Stream<double> get readings => _controller.stream;
void _onNewReading(double celsius) {
if (!_controller.isClosed) {
_controller.add(celsius);
}
}
void _onSensorError(Object error) {
_controller.addError(error);
}
void dispose() {
_controller.close();
}
}The StreamController manages the connection between producer and consumer. When a consumer calls stream.listen():
- The controller creates a
StreamSubscription— the active connection. - The subscription starts receiving events from the controller.
- The consumer gets a
StreamSubscriptionobject they can use topause(),resume(), orcancel().
Internally, the controller maintains a buffer of events. If events are added before any listener subscribes (single-subscription only), they're buffered and delivered when the listener attaches. For broadcast streams, events without listeners are dropped.
The subscription: the live connection
.listen() doesn't return void — it returns a StreamSubscription. This object is the active connection between the stream and your callback. It's also the source of one of the most common memory leaks in Flutter.
late StreamSubscription<CartState> _subscription;
@override
void initState() {
super.initState();
_subscription = cartBloc.stream.listen((state) {
// Update UI
});
}
@override
void dispose() {
_subscription.cancel(); // CRITICAL
super.dispose();
}If you don't cancel the subscription in dispose(), the callback keeps a reference to your widget's state, the stream keeps a reference to the callback, and the garbage collector can't collect any of it. The widget is off-screen, the Element is unmounted, but the callback still fires on every state change — updating state that no longer exists.
In mild cases, this wastes memory and CPU. In severe cases, the callback calls setState() on a disposed State object, producing the infamous:
FlutterError: setState() called after dispose()This is why BlocBuilder and StreamBuilder handle subscriptions internally — they cancel in their dispose(). If you use .listen() directly, the subscription lifecycle is your responsibility.
Cancellation is not optional. Every .listen() needs a corresponding .cancel(). If you see .listen() without a stored subscription, that's a bug:
// BUG: no way to cancel this
stream.listen((value) => doSomething(value));
// CORRECT: store and cancel
_subscription = stream.listen((value) => doSomething(value));`async*` and `yield`: generating streams
Just as async/await generates Futures, async*/yield generates Streams:
Stream<int> countDown(int from) async* {
for (int i = from; i >= 0; i--) {
yield i;
await Future.delayed(Duration(seconds: 1));
}
}
// Usage:
countDown(5).listen((n) => print(n));
// Prints: 5, 4, 3, 2, 1, 0 (one per second)async* marks a function that returns a Stream. yield pushes a value into that stream. await pauses the generator until the Future completes (the stream delivers no values during the pause).
The generator function doesn't run until someone calls .listen(). Once listened to, it runs until it hits a yield (pushes the value, suspends), an await (waits for the Future, suspends), or the function ends (closes the stream).
yield* delegates to another stream — every value from the inner stream is forwarded to the outer stream:
Stream<int> mergedCountDown() async* {
yield* countDown(3); // Yields 3, 2, 1, 0
yield* countDown(2); // Then yields 2, 1, 0
}The generator approach is cleaner than a StreamController when the production logic is sequential. Use StreamController when events come from external sources (callbacks, platform channels, hardware). Use async* when you're computing or transforming values in a flow you control.
Stream transformation
Streams support a chain of transformations, similar to Iterable methods but asynchronous:
sensorReadings
.where((temp) => temp > 30.0) // Filter: only hot readings
.map((temp) => '${temp.toStringAsFixed(1)}°C') // Transform: to string
.distinct() // Deduplicate: skip repeated values
.listen((reading) => print('Alert: $reading'));Each transformation creates a new Stream that wraps the original. The transformations are lazy — they don't buffer or pre-compute. Each value flows through the chain one at a time, as it's emitted by the source.
Common transformations:
| Method | What it does |
|---|---|
.map(fn) | Transform each value |
.where(fn) | Filter values |
.distinct() | Skip consecutive duplicates |
.take(n) | Only the first n values, then close |
.skip(n) | Ignore the first n values |
.expand(fn) | One input → multiple outputs |
.asyncMap(fn) | Transform with an async function (awaits each) |
.asyncExpand(fn) | One input → a stream of outputs |
.handleError(fn) | Catch and handle errors in the stream |
.timeout(duration) | Error if no value arrives within duration |
`.asyncMap()` deserves attention. It awaits each transformation before processing the next value. If the transformation is slow, it creates backpressure:
sensorReadings
.asyncMap((temp) async {
// This API call takes 200ms
final analysis = await analyzeTemperature(temp);
return analysis;
})
.listen((result) => print(result));If sensor readings arrive every 100ms but the analysis takes 200ms, .asyncMap() queues the values — it won't start the next analysis until the current one finishes. This prevents overwhelming the API but introduces latency. For "process the latest, skip stale values" behavior, you'd need a custom transformer or a debounce.
How BLoC uses Streams — concretely
Let's trace what happens inside a Bloc when you add an event:
class CartBloc extends Bloc<CartEvent, CartState> {
CartBloc() : super(CartState.empty()) {
on<AddItem>((event, emit) {
emit(state.copyWith(
items: [...state.items, event.item],
));
});
}
}When you call cartBloc.add(AddItem(product)):
- The event goes into an internal event stream (a
StreamController<CartEvent>). - BLoC's infrastructure listens to that event stream. It matches the event type to the registered handler (
on<AddItem>). - The handler runs.
emit()pushes the new state into the state stream (a broadcastStreamController<CartState>). - Every
BlocBuilder,BlocListener, and.stream.listen()subscriber receives the new state — as a microtask, delivered by the event loop.
The entire pattern is two StreamControllers wired together with a handler function in between:
User action
→ bloc.add(event)
→ event StreamController
→ event handler (your business logic)
→ emit(newState)
→ state StreamController (broadcast)
→ BlocBuilder.listen callback
→ setState → rebuildWhen people say "BLoC is just streams," this is what they mean. The Bloc class adds structure — event-to-state mapping, middleware (BlocObserver), error handling, state deduplication (emit skips if the new state == the previous state). But the transport mechanism is a broadcast stream.
This is also why BLoC states should override == (via Equatable or Freezed, as covered in the equality article). The stream doesn't automatically deduplicate — emit checks newState == state to decide whether to push. If your state class uses reference equality (the default), every copyWith creates a "different" state even if nothing changed, and every BlocBuilder rebuilds unnecessarily.
`StreamBuilder`: the widget side
Flutter's built-in StreamBuilder widget subscribes to a Stream and rebuilds whenever a new value arrives:
StreamBuilder<int>(
stream: countDown(10),
builder: (context, snapshot) {
if (snapshot.hasError) return Text('Error: ${snapshot.error}');
if (!snapshot.hasData) return CircularProgressIndicator();
return Text('${snapshot.data}');
},
)StreamBuilder manages the subscription lifecycle: it subscribes in initState, cancels and resubscribes if the stream property changes (via didUpdateWidget), and cancels in dispose. The AsyncSnapshot wrapper gives you the current state: waiting (no data yet), active (has data, stream still open), done (stream closed), or error.
The pattern works but has a rough edge: the builder runs on every value, even if you only care about some values. BlocBuilder improves on this with buildWhen — a filter that prevents unnecessary rebuilds:
BlocBuilder<CartBloc, CartState>(
buildWhen: (previous, current) => previous.items.length != current.items.length,
builder: (context, state) => Text('${state.items.length} items'),
)Under the hood, buildWhen is a conditional check inside BlocBuilder's listener callback — it receives every state but only calls setState when the condition returns true. This isn't a .where() stream transformation; the listener still sees every state (which is how previous tracking works), it just skips the rebuild for states that don't pass the filter.
Common stream mistakes
Listening to a single-subscription stream twice:
final stream = file.openRead();
stream.listen((bytes) => process(bytes));
stream.listen((bytes) => log(bytes)); // CRASH: already listened toFix: use .asBroadcastStream() if you need multiple listeners, or restructure so one listener handles both tasks.
Forgetting to cancel subscriptions:
// In initState — where's the cancel?
myStream.listen((v) => setState(() => _value = v));Fix: store the StreamSubscription, cancel in dispose(). Or use StreamBuilder/BlocBuilder which handle it for you.
Creating a new stream on every build:
StreamBuilder<int>(
stream: countDown(10), // NEW stream instance every rebuild!
builder: (context, snapshot) => Text('${snapshot.data}'),
)Every time build() runs, countDown(10) creates a new Stream. StreamBuilder detects the change (the stream object is different), cancels the old subscription, and subscribes to the new one. The countdown restarts from 10 on every rebuild. Fix: create the stream once (in initState, in the Bloc, in a provider) and pass the same instance.
Not handling stream errors:
stream.listen((v) => print(v)); // What if the stream emits an error?An unhandled error in a stream subscription crashes your app. Always provide onError, or use handleError in the transformation chain.
Streams vs Futures — when to use which
| Situation | Use |
|---|---|
| One async result (API call, file read) | Future |
| Multiple values over time (WebSocket, sensor, state changes) | Stream |
| User events (taps, scroll, form changes) | Stream |
| Polling (check every N seconds) | Stream (via Stream.periodic or Timer.periodic) |
| One-shot initialization | Future |
| Reactive state management | Stream (BLoC, Riverpod StreamProvider) |
The dividing line: if the answer to "how many values?" is "exactly one," use a Future. If it's "zero or more, over time," use a Stream.
The connection to the event loop
Every value delivered by a Stream goes through the event loop (Post 0). When a StreamController calls add(value), the listener's callback isn't invoked immediately — it's scheduled as a microtask (for synchronous controllers) or as an event (for asynchronous delivery).
This means stream delivery is never synchronous from the caller's perspective:
final controller = StreamController<int>();
controller.stream.listen((v) => print('Got: $v'));
controller.add(42);
print('Added');
// Output:
// Added
// Got: 42print('Added') runs first because the listener callback is scheduled, not called inline. The value 42 is delivered on the next microtask drain, after the current synchronous code finishes.
For synchronous delivery (rare, but sometimes needed for performance), use StreamController<int>(sync: true). A synchronous controller invokes the listener callback immediately during add(), without going through the event loop. This is faster but dangerous — the listener runs in the middle of whatever code called add(), which can cause reentrancy bugs if the listener modifies state that the caller is also modifying.
Recent versions of the bloc package use a synchronous controller (sync: true) for the state stream, so emit(newState) delivers the state to listeners immediately during the emit call, without going through the microtask queue. This is faster but means listener callbacks run inline — which is fine because BlocBuilder just marks the widget as dirty, and the actual rebuild waits for the next frame anyway.
The next post covers Isolates — what to do when the single thread and event loop aren't enough, and you need real parallelism.
This is Post 2 of the Async Dart series. Previous: [Futures: Promises With a Dart Accent](/blog/dart-futures-explained). Next: [Isolates: Real Parallelism](/blog/dart-isolates-real-parallelism).