Custom Exchanges 
Create your own exchanges to add custom behavior to GraphQL operations using stream transformations.
Exchange Interface 
An exchange transforms operation streams into result streams:
type Exchange = (input: ExchangeInput) => ExchangeIO;
type ExchangeInput = {
  forward: ExchangeIO; // Next exchange in the chain
  client: Client; // Access to the client instance
};
type ExchangeIO = (operations: Source<Operation>) => Source<OperationResult>;Learn Stream Basics
Before writing custom exchanges, familiarize yourself with Streams Concept and the Streams Reference.
Critical: Call Forward Exactly Once
You must call forward exactly once in your exchange implementation. Calling forward multiple times creates duplicate instances of all downstream exchanges, breaking the client's single-instance guarantee.
If you need to handle different operation types separately, split the stream, merge them back into one, then forward once.
Basic Pattern 
Every exchange follows this structure:
import { type Exchange } from '@mearie/react';
const myExchange = (): Exchange => {
  return ({ forward }) => {
    return (ops$) => {
      // Transform operations stream
      const transformed$ = transformOperations(ops$);
      // Forward to next exchange and get results
      const results$ = forward(transformed$);
      // Transform results stream
      return transformResults(results$);
    };
  };
};Simple Example: Logging 
Log operations and results as they flow through:
import { pipe, tap } from '@mearie/core/stream';
import { type Exchange } from '@mearie/react';
import { schema } from '$mearie';
const loggingExchange = (): Exchange => {
  return ({ forward }) => {
    return (ops$) => {
      return pipe(
        ops$,
        tap((op) => {
          if (op.variant === 'request') {
            console.log('Operation:', op.artifact.kind, op.artifact.name);
          }
        }),
        forward,
        tap((result) => {
          console.log('Result:', result.data ? 'success' : 'error');
        }),
      );
    };
  };
};
export const client = createClient({
  schema,
  exchanges: [loggingExchange(), httpExchange({ url: 'https://api.example.com/graphql' })],
});Common Patterns 
Filtering Operations 
Filter operations while ensuring forward is called exactly once:
import { pipe, filter, merge } from '@mearie/core/stream';
import { type Exchange } from '@mearie/react';
const queriesOnlyExchange = (): Exchange => {
  return ({ forward }) => {
    return (ops$) => {
      // Separate queries from other operations
      const queries$ = pipe(
        ops$,
        filter((op) => op.variant === 'request' && op.artifact.kind === 'query'),
      );
      const others$ = pipe(
        ops$,
        filter((op) => op.variant !== 'request' || op.artifact.kind !== 'query'),
      );
      // Merge streams back into one, then forward ONCE
      return pipe(merge(queries$, others$), forward);
    };
  };
};Transforming Operations 
Modify operations before forwarding:
import { pipe, map } from '@mearie/core/stream';
import { type Exchange, type RequestOperation } from '@mearie/react';
const addMetadataExchange = (): Exchange => {
  return ({ forward }) => {
    return (ops$) => {
      return pipe(
        ops$,
        map((op) => {
          if (op.variant === 'request') {
            return {
              ...op,
              metadata: {
                ...op.metadata,
                clientVersion: '1.0.0',
                timestamp: Date.now(),
              },
            };
          }
          return op;
        }),
        forward,
      );
    };
  };
};Transforming Results 
Modify results coming back:
import { pipe, map } from '@mearie/core/stream';
import { type Exchange } from '@mearie/react';
const timestampExchange = (): Exchange => {
  return ({ forward }) => {
    return (ops$) => {
      return pipe(
        ops$,
        forward,
        map((result) => ({
          ...result,
          extensions: {
            ...result.extensions,
            receivedAt: new Date().toISOString(),
          },
        })),
      );
    };
  };
};Performance Monitoring 
Track operation timing:
import { pipe, tap } from '@mearie/core/stream';
import { type Exchange } from '@mearie/react';
const performanceExchange = (): Exchange => {
  const timings = new Map<string, number>();
  return ({ forward }) => {
    return (ops$) => {
      return pipe(
        ops$,
        tap((op) => {
          if (op.variant === 'request') {
            timings.set(op.key, performance.now());
          }
        }),
        forward,
        tap((result) => {
          const startTime = timings.get(result.operation.key);
          if (startTime !== undefined) {
            const duration = performance.now() - startTime;
            console.log(`Operation took ${duration.toFixed(2)}ms`);
            timings.delete(result.operation.key);
          }
        }),
      );
    };
  };
};Conditional Forwarding 
Execute different logic based on operation type:
import { pipe, filter, merge, tap } from '@mearie/core/stream';
import { type Exchange } from '@mearie/react';
const splitExchange = (): Exchange => {
  return ({ forward }) => {
    return (ops$) => {
      // Split mutations from other operations
      const mutations$ = pipe(
        ops$,
        filter((op) => op.variant === 'request' && op.artifact.kind === 'mutation'),
      );
      const others$ = pipe(
        ops$,
        filter((op) => op.variant !== 'request' || op.artifact.kind !== 'mutation'),
      );
      // Merge streams, forward ONCE, then handle results differently
      return pipe(
        merge(mutations$, others$),
        forward,
        tap((result) => {
          if (result.operation.variant === 'request' && result.operation.artifact.kind === 'mutation') {
            console.log('Mutation completed');
          }
        }),
      );
    };
  };
};Async Operations 
Handle async operations with mergeMap:
import { pipe, mergeMap } from '@mearie/core/stream';
import { type Exchange } from '@mearie/react';
const authExchange = (): Exchange => {
  return ({ forward }) => {
    return (ops$) => {
      return pipe(
        ops$,
        mergeMap(async (op) => {
          if (op.variant === 'request') {
            // Fetch token asynchronously
            const token = await getAuthToken();
            return {
              ...op,
              metadata: {
                ...op.metadata,
                authToken: token,
              },
            };
          }
          return op;
        }),
        forward,
      );
    };
  };
};Splitting and Merging Streams 
When you need to handle different operations separately, split the stream into multiple parts, process each independently, then merge them back before forwarding:
import { pipe, filter, merge } from '@mearie/core/stream';
import { type Exchange } from '@mearie/react';
const smartExchange = (): Exchange => {
  return ({ forward }) => {
    return (ops$) => {
      // Split stream into multiple parts
      const mutations$ = pipe(
        ops$,
        filter((op) => op.variant === 'request' && op.artifact.kind === 'mutation'),
      );
      const queries$ = pipe(
        ops$,
        filter((op) => op.variant === 'request' && op.artifact.kind === 'query'),
      );
      const teardowns$ = pipe(
        ops$,
        filter((op) => op.variant === 'teardown'),
      );
      // Merge all streams back into one
      const merged$ = merge(mutations$, queries$, teardowns$);
      // Forward the merged stream ONCE
      return pipe(merged$, forward);
    };
  };
};This ensures downstream exchanges maintain exactly one instance. See the built-in dedupExchange implementation for a real-world example.
Terminating Exchange 
A terminating exchange doesn't forward operations - it handles them directly:
import { pipe, filter, mergeMap, fromValue, merge } from '@mearie/core/stream';
import { type Exchange } from '@mearie/react';
const mockExchange = (mockData: Record<string, unknown>): Exchange => {
  return ({ forward }) => {
    return (ops$) => {
      // Handle queries with mock data
      const queries$ = pipe(
        ops$,
        filter((op) => op.variant === 'request' && op.artifact.kind === 'query'),
        mergeMap((op) =>
          fromValue({
            operation: op,
            data: mockData,
          }),
        ),
      );
      // Forward everything else
      const others$ = pipe(
        ops$,
        filter((op) => op.variant !== 'request' || op.artifact.kind !== 'query'),
        forward,
      );
      return merge(queries$, others$);
    };
  };
};
// Usage for testing
export const client = createClient({
  schema,
  exchanges: [mockExchange({ user: { id: '1', name: 'Test User' } })],
});Stream Operators 
Common operators for building exchanges:
- pipe- Compose operators
- filter- Filter operations/results
- map- Transform values synchronously
- mergeMap- Transform values to new streams
- tap- Side effects without transformation
- merge- Combine multiple streams
- share- Share subscription among subscribers
See Streams Reference for complete documentation.
Best Practices 
- Call forwardexactly once - Critical for maintaining single-instance exchanges
- Keep exchanges focused - Each should do one thing well
- Use share()when needed - Prevent duplicate subscriptions
- Handle teardown - Watch for variant: 'teardown'operations
- Consider ordering - Place exchanges logically in the chain
- Don't block - Avoid synchronous expensive operations
- Use mergeMapfor async - Never useawaitdirectly inmap
Exchange Placement 
Place custom exchanges strategically:
export const client = createClient({
  schema,
  exchanges: [
    loggingExchange(), // Monitoring - outermost
    performanceExchange(), // Performance tracking
    dedupExchange(), // Deduplication
    addMetadataExchange(), // Request transformation - after dedup, before cache
    cacheExchange(), // Caching
    httpExchange({ url: 'https://api.example.com/graphql' }), // Terminating
  ],
});General guidelines:
- Monitoring/logging - Outermost layer to see all operations
- Deduplication - Early in the chain to filter duplicates
- Request transformation - After dedup, before cache to affect cache keys
- Non-terminating exchanges - Before terminating exchanges
- Terminating exchanges - At the end
Next Steps 
- Streams Concept - Understand stream architecture
- Streams Reference - Complete stream API
- Exchanges Guide - Learn the exchange system
- HTTP Exchange - Built-in HTTP implementation