Streams API Reference
Complete reference for Mearie's stream system. For conceptual understanding, see Streams Concept.
Core Types
Source<T>
A push-based stream that emits values of type T.
type Source<T> = (sink: Sink<T>) => Subscription;When you call a source with a sink, it:
- Starts pushing values to the sink
- Returns a subscription for cancellation
Sink<T>
Receives values from a source.
type Sink<T> = {
next(value: T): void; // Receive a value
complete(): void; // Receive completion signal
};Subscription
Controls an active stream.
type Subscription = {
unsubscribe(): void; // Cancel stream and cleanup resources
};Operator<T, R>
Transforms one source into another.
type Operator<T, R = T> = (source: Source<T>) => Source<R>;Composition
pipe
Composes operators left-to-right.
pipe<T, R>(
source: Source<T>,
...operators: Operator[]
): Source<R>Example:
const result$ = pipe(
source$,
filter((x) => x > 0),
map((x) => x * 2),
take(5),
);compose
Composes operators right-to-left (function composition).
compose<T, R>(...operators: Operator[]): Operator<T, R>Example:
const transform = compose(
take(5),
map((x) => x * 2),
filter((x) => x > 0),
);
const result$ = transform(source$);Sources
fromValue
Creates a source that emits a single value then completes.
fromValue<T>(value: T): Source<T>Example:
const source$ = fromValue(42);fromArray
Creates a source that emits each array element then completes.
fromArray<T>(array: readonly T[]): Source<T>Example:
const source$ = fromArray([1, 2, 3, 4, 5]);fromPromise
Converts a promise to a source.
fromPromise<T>(promise: Promise<T>): Source<T | null>Emits the resolved value or null if rejected, then completes.
Example:
const source$ = fromPromise(fetch('/api/data').then((r) => r.json()));makeSubject
Creates a subject - a source you can manually push values to.
makeSubject<T>(): Subject<T>
type Subject<T> = {
source: Source<T>;
next(value: T): void;
complete(): void;
};Example:
const subject = makeSubject<number>();
subscribe((value) => console.log(value))(subject.source);
subject.next(1);
subject.next(2);
subject.complete();fromSubscription
Wraps an external subscription as a source.
fromSubscription<T>(
subscribe: (sink: Sink<T>) => { unsubscribe(): void }
): Source<T>make
Create a custom source.
make<T>(
onSubscribe: (sink: Sink<T>) => Subscription | void
): Source<T>Example:
const ticker$ = make((sink) => {
const id = setInterval(() => sink.next(Date.now()), 1000);
return { unsubscribe: () => clearInterval(id) };
});Operators
filter
Emit only values that pass a predicate.
filter<T>(predicate: (value: T) => boolean): Operator<T>Example:
pipe(
source$,
filter((x) => x % 2 === 0), // Only even numbers
);map
Transform each value.
map<T, R>(transform: (value: T) => R): Operator<T, R>Example:
pipe(
source$,
map((x) => x * 2),
);mergeMap
Transform each value to a source and merge all sources.
mergeMap<T, R>(
transform: (value: T) => Source<R>
): Operator<T, R>Example:
pipe(
operations$,
mergeMap((op) => fromPromise(fetch(op.url))),
);take
Emit only the first N values then complete.
take<T>(count: number): Operator<T>Example:
pipe(
source$,
take(5), // First 5 values only
);takeUntil
Emit values until another source emits.
takeUntil<T>(notifier: Source<any>): Operator<T>Example:
pipe(
source$,
takeUntil(stop$), // Stop when stop$ emits
);merge
Merge multiple sources into one.
merge<T>(...sources: Source<T>[]): Source<T>Example:
const combined$ = merge(source1$, source2$, source3$);share
Share a single subscription among multiple subscribers.
share<T>(): Operator<T>Without share, each subscriber creates a new subscription. With share, subscribers share the same underlying subscription.
Example:
const shared$ = pipe(source$, share());
// Both use the same subscription
subscribe(handleA)(shared$);
subscribe(handleB)(shared$);tap
Perform side effects without modifying the stream.
tap<T>(effect: (value: T) => void): Operator<T>Example:
pipe(
source$,
tap((x) => console.log('Value:', x)),
map((x) => x * 2),
);initialize
Run a function when the stream is first subscribed to.
initialize<T>(effect: () => void): Operator<T>Example:
pipe(
source$,
initialize(() => console.log('Started!')),
);finalize
Run a cleanup function when the stream completes or is unsubscribed.
finalize<T>(cleanup: () => void): Operator<T>Example:
pipe(
source$,
finalize(() => console.log('Cleanup!')),
);Sinks
subscribe
Subscribe to a source with an observer.
subscribe<T>(observer: Observer<T>): (source: Source<T>) => Subscription
type Observer<T> = {
next?: (value: T) => void;
complete?: () => void;
} | ((value: T) => void);Example:
const subscription = subscribe({
next: (value) => console.log(value),
complete: () => console.log('Done'),
})(source$);
// Later...
subscription.unsubscribe();collect
Collect the first value from a source.
collect<T>(source: Source<T>): Promise<T>Example:
const firstValue = await collect(source$);collectAll
Collect all values into an array.
collectAll<T>(source: Source<T>): Promise<T[]>Example:
const allValues = await collectAll(source$);publish
Share a source among multiple subscribers, starting it immediately.
publish<T>(source: Source<T>): Source<T>peek
Subscribe to a source for side effects without holding a reference.
peek<T>(observer: Observer<T>): (source: Source<T>) => voidExchange Pattern
Exchanges use these APIs to transform operation and result streams:
const myExchange = (): Exchange => {
return ({ forward }) => {
return (ops$) => {
// Transform operations
const transformed$ = pipe(ops$, filter(shouldHandle), map(transformOperation));
// Forward and transform results
return pipe(transformed$, forward, map(transformResult));
};
};
};Next Steps
- Streams Concept - Understand the why and what
- Custom Exchanges - Build exchanges with these APIs
- Exchanges Guide - Learn the exchange system