LogoSignals.dart
Copy Markdown
rodydavis/signals.dart 999999

Stream

Stream signals can be created by extension or method.

streamSignal#

Stream signals can be created by extension or method.

streamSignal#

final stream = () async* {
    yield 1;
};
final s = streamSignal(() => stream);

toSignal()#

final stream = () async* {
    yield 1;
};
final s = stream.toSignal();

.value, .peek()#

Returns AsyncState for the value and can handle the various states.

The value getter returns the value of the stream if it completed successfully.

.peek() can also be used to not subscribe in an effect

final stream = (int value) async* {
    yield value;
};
final s = streamSignal(() => stream);
final value = s.value.value; // 1 or null

.reset()#

The reset method resets the stream to its initial state to recall on the next evaluation.

final stream = (int value) async* {
    yield value;
};
final s = streamSignal(() => stream);
s.reset();

.refresh()#

Refresh the stream value by setting isLoading to true, but maintain the current state (AsyncData, AsyncLoading, AsyncError).

final stream = (int value) async* {
    yield value;
};
final s = streamSignal(() => stream);
s.refresh();
print(s.value.isLoading); // true

.reload()#

Reload the stream value by setting the state to AsyncLoading and pass in the value or error as data.

final stream = (int value) async* {
    yield value;
};
final s = streamSignal(() => stream);
s.reload();
print(s.value is AsyncLoading); // true

Dependencies#

By default the callback will be called once and the stream will be cached unless a signal is read in the callback.

final count = signal(0);
final s = streamSignal(() async* {
    final value = count();
    yield value;
});

await s.future; // 0
count.value = 1;
await s.future; // 1

If there are signals that need to be tracked across an async gap then use the dependencies when creating the streamSignal to reset every time any signal in the dependency array changes.

final count = signal(0);
final s = streamSignal(
    () async* {
        final value = count();
        yield value;
    },
    dependencies: [count],
);
s.value; // state with count 0
count.value = 1; // resets the future
s.value; // state with count 1

StreamSignal#

Stream signals wrap a standard asynchronous Stream and bridge it into the reactive state framework, exposing its emissions as a reactive AsyncState.

You can construct a stream signal via the helper function streamSignal or by calling the .toSignal() extension method on any standard Stream.

1. Basic Stream Binding#

final s = streamSignal(() => countStream());

Or via the extension:

final s = countStream().toSignal();

2. Consuming stream emissions reactively#

Reading .value on a StreamSignal returns an AsyncState object:

effect(() {
  s.value.map(
    data: (val) => print('Stream emitted: $val'),
    error: (err, stack) => print('Stream encountered error: $err'),
    loading: () => print('Waiting for first stream emission...'),
  );
});

3. Subscription Lifecycle and Manual Control#

A stream signal automatically manages the underlying StreamSubscription. It listens when the signal has active subscribers and automatically cleans up/cancels when disposed to prevent memory leaks.

You can also manually control the subscription state:

  • pause(): Pauses the underlying stream subscription.
  • resume(): Resumes a paused subscription.
  • cancel(): Cancels the subscription and marks the stream signal as done.
  • isDone: Returns whether the stream has finished emitting or has been cancelled.
final s = streamSignal(() => countStream());
s.pause(); // Temporarily halt stream values

4. Reactive Dependencies#

Any reactive signals read synchronously inside the stream callback act as dependencies. When they mutate, the stream signal automatically cancels the current stream subscription, recreates a new stream using the updated values, and starts listening.

final query = signal('flutter');
final s = streamSignal(() {
  // Re-subscribes to a new database query stream every time the query changes!
  return db.watchItems(query.value);
});

Constructors#

View Constructors
StreamSignal(Stream<T> Function() fn, {AsyncSignalOptions<T>? options, @Deprecated('Use options: AsyncSignalOptions(cancelOnError: ...) instead') bool? cancelOnError, @Deprecated('Use options: AsyncSignalOptions(initialValue: ...) instead') T? initialValue, @Deprecated('Use options: AsyncSignalOptions(dependencies: ...) instead') List<ReadonlySignal<dynamic>>? dependencies, @Deprecated('Use options: AsyncSignalOptions(onDone: ...) instead') void Function()? onDone, @Deprecated('Use options: AsyncSignalOptions(lazy: ...) instead') bool? lazy, @Deprecated('Use options: AsyncSignalOptions(autoDispose: ...) instead') bool? autoDispose, @Deprecated('Use options: AsyncSignalOptions(name: ...) instead') String? debugLabel})

Stream signals can be created by extension or method.

streamSignal#

final stream = () async* {
    yield 1;
};
final s = streamSignal(() => stream);

toSignal()#

final stream = () async* {
    yield 1;
};
final s = stream.toSignal();

.value, .peek()#

Returns AsyncState for the value and can handle the various states.

The value getter returns the value of the stream if it completed successfully.

.peek() can also be used to not subscribe in an effect

final stream = (int value) async* {
    yield value;
};
final s = streamSignal(() => stream);
final value = s.value.value; // 1 or null

.reset()#

The reset method resets the stream to its initial state to recall on the next evaluation.

final stream = (int value) async* {
    yield value;
};
final s = streamSignal(() => stream);
s.reset();

.refresh()#

Refresh the stream value by setting isLoading to true, but maintain the current state (AsyncData, AsyncLoading, AsyncError).

final stream = (int value) async* {
    yield value;
};
final s = streamSignal(() => stream);
s.refresh();
print(s.value.isLoading); // true

.reload()#

Reload the stream value by setting the state to AsyncLoading and pass in the value or error as data.

final stream = (int value) async* {
    yield value;
};
final s = streamSignal(() => stream);
s.reload();
print(s.value is AsyncLoading); // true

Dependencies#

By default the callback will be called once and the stream will be cached unless a signal is read in the callback.

final count = signal(0);
final s = streamSignal(() async* {
    final value = count();
    yield value;
});

await s.future; // 0
count.value = 1;
await s.future; // 1

If there are signals that need to be tracked across an async gap then use the dependencies when creating the streamSignal to reset every time any signal in the dependency array changes.

final count = signal(0);
final s = streamSignal(
    () async* {
        final value = count();
        yield value;
    },
    dependencies: [count],
);
s.value; // state with count 0
count.value = 1; // resets the future
s.value; // state with count 1

Properties#

View Properties
bool? cancelOnError

Cancel the subscription on error

List<ReadonlySignal<dynamic>> dependencies

List of dependencies to recompute the stream

Methods#

View Methods
bool isDone

Check if the signal is done

Future<T> last

First value of the stream

Future<T> first

Last value of the stream

Future<void> execute(Stream<T> src)

Execute the stream

bool isPaused

Check if the subscription is paused

void pause([Future<void>? resume])

Pause the subscription

void resume()

Resume the subscription

Future<void> cancel()

Cancel the subscription

Future<void> reload()
Future<void> refresh()
void reset([AsyncState<T>? value])
void dispose()
AsyncState<T> value
void setError(Object error, [StackTrace? stackTrace])