streamSignal#
Stream signals can be created by extension or method.
streamSignal#
final stream = () async* {
yield 1;
};
final s = streamSignal(() => stream);
toSignal()#
final stream = () async* {
yield 1;
};
final s = stream.toSignal();
.value, .peek()#
Returns AsyncState for the value and can handle the various states.
The value getter returns the value of the stream if it completed successfully.
.peek() can also be used to not subscribe in an effect
final stream = (int value) async* {
yield value;
};
final s = streamSignal(() => stream);
final value = s.value.value; // 1 or null
.reset()#
The reset method resets the stream to its initial state to recall on the next evaluation.
final stream = (int value) async* {
yield value;
};
final s = streamSignal(() => stream);
s.reset();
.refresh()#
Refresh the stream value by setting isLoading to true, but maintain the current state (AsyncData, AsyncLoading, AsyncError).
final stream = (int value) async* {
yield value;
};
final s = streamSignal(() => stream);
s.refresh();
print(s.value.isLoading); // true
.reload()#
Reload the stream value by setting the state to AsyncLoading and pass in the value or error as data.
final stream = (int value) async* {
yield value;
};
final s = streamSignal(() => stream);
s.reload();
print(s.value is AsyncLoading); // true
Dependencies#
By default the callback will be called once and the stream will be cached unless a signal is read in the callback.
final count = signal(0);
final s = streamSignal(() async* {
final value = count();
yield value;
});
await s.future; // 0
count.value = 1;
await s.future; // 1
If there are signals that need to be tracked across an async gap then use the dependencies
when creating the streamSignal to reset every time any signal in the dependency array changes.
final count = signal(0);
final s = streamSignal(
() async* {
final value = count();
yield value;
},
dependencies: [count],
);
s.value; // state with count 0
count.value = 1; // resets the future
s.value; // state with count 1
StreamSignal#
Stream signals wrap a standard asynchronous Stream and bridge it into the reactive state framework, exposing its emissions as a reactive AsyncState.
You can construct a stream signal via the helper function streamSignal
or by calling the .toSignal() extension method on any standard Stream.
1. Basic Stream Binding#
final s = streamSignal(() => countStream());
Or via the extension:
final s = countStream().toSignal();
2. Consuming stream emissions reactively#
Reading .value on a StreamSignal returns an
AsyncState object:
effect(() {
s.value.map(
data: (val) => print('Stream emitted: $val'),
error: (err, stack) => print('Stream encountered error: $err'),
loading: () => print('Waiting for first stream emission...'),
);
});
3. Subscription Lifecycle and Manual Control#
A stream signal automatically manages the underlying StreamSubscription. It listens when the signal has active subscribers and automatically cleans up/cancels when disposed to prevent memory leaks.
You can also manually control the subscription state:
pause(): Pauses the underlying stream subscription.resume(): Resumes a paused subscription.-
cancel(): Cancels the subscription and marks the stream signal as done. -
isDone: Returns whether the stream has finished emitting or has been cancelled.
final s = streamSignal(() => countStream());
s.pause(); // Temporarily halt stream values
4. Reactive Dependencies#
Any reactive signals read synchronously inside the stream callback act as dependencies. When they mutate, the stream signal automatically cancels the current stream subscription, recreates a new stream using the updated values, and starts listening.
final query = signal('flutter');
final s = streamSignal(() {
// Re-subscribes to a new database query stream every time the query changes!
return db.watchItems(query.value);
});
Constructors#
View Constructors
StreamSignal(Stream<T> Function() fn, {AsyncSignalOptions<T>? options, @Deprecated('Use options: AsyncSignalOptions(cancelOnError: ...) instead') bool? cancelOnError, @Deprecated('Use options: AsyncSignalOptions(initialValue: ...) instead') T? initialValue, @Deprecated('Use options: AsyncSignalOptions(dependencies: ...) instead') List<ReadonlySignal<dynamic>>? dependencies, @Deprecated('Use options: AsyncSignalOptions(onDone: ...) instead') void Function()? onDone, @Deprecated('Use options: AsyncSignalOptions(lazy: ...) instead') bool? lazy, @Deprecated('Use options: AsyncSignalOptions(autoDispose: ...) instead') bool? autoDispose, @Deprecated('Use options: AsyncSignalOptions(name: ...) instead') String? debugLabel})
Stream signals can be created by extension or method.
streamSignal#
final stream = () async* {
yield 1;
};
final s = streamSignal(() => stream);
toSignal()#
final stream = () async* {
yield 1;
};
final s = stream.toSignal();
.value, .peek()#
Returns AsyncState for the value and can handle the various states.
The value getter returns the value of the stream if it completed successfully.
.peek() can also be used to not subscribe in an effect
final stream = (int value) async* {
yield value;
};
final s = streamSignal(() => stream);
final value = s.value.value; // 1 or null
.reset()#
The reset method resets the stream to its initial state to recall on the next evaluation.
final stream = (int value) async* {
yield value;
};
final s = streamSignal(() => stream);
s.reset();
.refresh()#
Refresh the stream value by setting isLoading to true, but maintain the current state (AsyncData, AsyncLoading, AsyncError).
final stream = (int value) async* {
yield value;
};
final s = streamSignal(() => stream);
s.refresh();
print(s.value.isLoading); // true
.reload()#
Reload the stream value by setting the state to AsyncLoading and pass in the value or error as data.
final stream = (int value) async* {
yield value;
};
final s = streamSignal(() => stream);
s.reload();
print(s.value is AsyncLoading); // true
Dependencies#
By default the callback will be called once and the stream will be cached unless a signal is read in the callback.
final count = signal(0);
final s = streamSignal(() async* {
final value = count();
yield value;
});
await s.future; // 0
count.value = 1;
await s.future; // 1
If there are signals that need to be tracked across an async gap then use the dependencies when creating the streamSignal to reset every time any signal in the dependency array changes.
final count = signal(0);
final s = streamSignal(
() async* {
final value = count();
yield value;
},
dependencies: [count],
);
s.value; // state with count 0
count.value = 1; // resets the future
s.value; // state with count 1
Properties#
View Properties
bool? cancelOnError
Cancel the subscription on error
List<ReadonlySignal<dynamic>> dependencies
List of dependencies to recompute the stream
Methods#
View Methods
bool isDone
Check if the signal is done
Future<T> last
First value of the stream
Future<T> first
Last value of the stream
Future<void> execute(Stream<T> src)
Execute the stream
bool isPaused
Check if the subscription is paused
void pause([Future<void>? resume])
Pause the subscription
void resume()
Resume the subscription
Future<void> cancel()
Cancel the subscription