Streams API Reference 
Complete reference for Mearie's stream system. For conceptual understanding, see Streams Concept.
Core Types 
Source<T> 
A push-based stream that emits values of type T.
type Source<T> = (sink: Sink<T>) => Subscription;When you call a source with a sink, it:
- Starts pushing values to the sink
- Returns a subscription for cancellation
Sink<T> 
Receives values from a source.
type Sink<T> = {
  next(value: T): void; // Receive a value
  complete(): void; // Receive completion signal
};Subscription 
Controls an active stream.
type Subscription = {
  unsubscribe(): void; // Cancel stream and cleanup resources
};Operator<T, R> 
Transforms one source into another.
type Operator<T, R = T> = (source: Source<T>) => Source<R>;Composition 
pipe 
Composes operators left-to-right.
pipe<T, R>(
  source: Source<T>,
  ...operators: Operator[]
): Source<R>Example:
const result$ = pipe(
  source$,
  filter((x) => x > 0),
  map((x) => x * 2),
  take(5),
);compose 
Composes operators right-to-left (function composition).
compose<T, R>(...operators: Operator[]): Operator<T, R>Example:
const transform = compose(
  take(5),
  map((x) => x * 2),
  filter((x) => x > 0),
);
const result$ = transform(source$);Sources 
fromValue 
Creates a source that emits a single value then completes.
fromValue<T>(value: T): Source<T>Example:
const source$ = fromValue(42);fromArray 
Creates a source that emits each array element then completes.
fromArray<T>(array: readonly T[]): Source<T>Example:
const source$ = fromArray([1, 2, 3, 4, 5]);fromPromise 
Converts a promise to a source.
fromPromise<T>(promise: Promise<T>): Source<T | null>Emits the resolved value or null if rejected, then completes.
Example:
const source$ = fromPromise(fetch('/api/data').then((r) => r.json()));makeSubject 
Creates a subject - a source you can manually push values to.
makeSubject<T>(): Subject<T>
type Subject<T> = {
  source: Source<T>;
  next(value: T): void;
  complete(): void;
};Example:
const subject = makeSubject<number>();
subscribe((value) => console.log(value))(subject.source);
subject.next(1);
subject.next(2);
subject.complete();fromSubscription 
Wraps an external subscription as a source.
fromSubscription<T>(
  subscribe: (sink: Sink<T>) => { unsubscribe(): void }
): Source<T>make 
Create a custom source.
make<T>(
  onSubscribe: (sink: Sink<T>) => Subscription | void
): Source<T>Example:
const ticker$ = make((sink) => {
  const id = setInterval(() => sink.next(Date.now()), 1000);
  return { unsubscribe: () => clearInterval(id) };
});Operators 
filter 
Emit only values that pass a predicate.
filter<T>(predicate: (value: T) => boolean): Operator<T>Example:
pipe(
  source$,
  filter((x) => x % 2 === 0), // Only even numbers
);map 
Transform each value.
map<T, R>(transform: (value: T) => R): Operator<T, R>Example:
pipe(
  source$,
  map((x) => x * 2),
);mergeMap 
Transform each value to a source and merge all sources.
mergeMap<T, R>(
  transform: (value: T) => Source<R>
): Operator<T, R>Example:
pipe(
  operations$,
  mergeMap((op) => fromPromise(fetch(op.url))),
);take 
Emit only the first N values then complete.
take<T>(count: number): Operator<T>Example:
pipe(
  source$,
  take(5), // First 5 values only
);takeUntil 
Emit values until another source emits.
takeUntil<T>(notifier: Source<any>): Operator<T>Example:
pipe(
  source$,
  takeUntil(stop$), // Stop when stop$ emits
);merge 
Merge multiple sources into one.
merge<T>(...sources: Source<T>[]): Source<T>Example:
const combined$ = merge(source1$, source2$, source3$);share 
Share a single subscription among multiple subscribers.
share<T>(): Operator<T>Without share, each subscriber creates a new subscription. With share, subscribers share the same underlying subscription.
Example:
const shared$ = pipe(source$, share());
// Both use the same subscription
subscribe(handleA)(shared$);
subscribe(handleB)(shared$);tap 
Perform side effects without modifying the stream.
tap<T>(effect: (value: T) => void): Operator<T>Example:
pipe(
  source$,
  tap((x) => console.log('Value:', x)),
  map((x) => x * 2),
);initialize 
Run a function when the stream is first subscribed to.
initialize<T>(effect: () => void): Operator<T>Example:
pipe(
  source$,
  initialize(() => console.log('Started!')),
);finalize 
Run a cleanup function when the stream completes or is unsubscribed.
finalize<T>(cleanup: () => void): Operator<T>Example:
pipe(
  source$,
  finalize(() => console.log('Cleanup!')),
);Sinks 
subscribe 
Subscribe to a source with an observer.
subscribe<T>(observer: Observer<T>): (source: Source<T>) => Subscription
type Observer<T> = {
  next?: (value: T) => void;
  complete?: () => void;
} | ((value: T) => void);Example:
const subscription = subscribe({
  next: (value) => console.log(value),
  complete: () => console.log('Done'),
})(source$);
// Later...
subscription.unsubscribe();collect 
Collect the first value from a source.
collect<T>(source: Source<T>): Promise<T>Example:
const firstValue = await collect(source$);collectAll 
Collect all values into an array.
collectAll<T>(source: Source<T>): Promise<T[]>Example:
const allValues = await collectAll(source$);publish 
Share a source among multiple subscribers, starting it immediately.
publish<T>(source: Source<T>): Source<T>peek 
Subscribe to a source for side effects without holding a reference.
peek<T>(observer: Observer<T>): (source: Source<T>) => voidExchange Pattern 
Exchanges use these APIs to transform operation and result streams:
const myExchange = (): Exchange => {
  return ({ forward }) => {
    return (ops$) => {
      // Transform operations
      const transformed$ = pipe(ops$, filter(shouldHandle), map(transformOperation));
      // Forward and transform results
      return pipe(transformed$, forward, map(transformResult));
    };
  };
};Next Steps 
- Streams Concept - Understand the why and what
- Custom Exchanges - Build exchanges with these APIs
- Exchanges Guide - Learn the exchange system