Documentation Index
Fetch the complete documentation index at: https://effect-ts-effect-smol.mintlify.app/llms.txt
Use this file to discover all available pages before exploring further.
The Stream module provides a powerful abstraction for working with sequences of values that are produced over time. Streams are pull-based with backpressure, making them ideal for processing large datasets, event streams, and continuous data sources.
Overview
A Stream<A, E, R> describes a program that can:
- Emit many values of type
A
- Fail with an error of type
E
- Require services of type
R
Streams are pull-based with backpressure, emit chunks to amortize effect evaluation, and support monadic composition similar to Effect, but adapted for multiple values.
Creating Streams
From Iterables
import { Stream } from "effect"
// From an array
const numbers = Stream.fromIterable([1, 2, 3, 4, 5])
// From a range
const range = Stream.range(1, 10)
// Create a stream with specific values
const stream = Stream.make(1, 2, 3)
From Effects
import { Effect, Stream } from "effect"
// Create a stream from a single Effect
const fromEffect = Stream.fromEffect(
Effect.succeed(42)
)
// Repeat an Effect with a schedule
const polled = Stream.fromEffectSchedule(
Effect.succeed(new Date()),
Schedule.spaced("1 second")
)
From Async Sources
import { Stream } from "effect"
// From an async iterable
const fromAsyncIterable = Stream.fromAsyncIterable(
(async function* () {
yield 1
yield 2
yield 3
})()
)
// From a callback-based API
const fromCallback = Stream.callback<string>((emit) => {
const interval = setInterval(() => {
emit.single("tick")
}, 1000)
return () => clearInterval(interval)
})
Paginated Data
import { Effect, Stream } from "effect"
// Create a stream from paginated API
const paginatedStream = Stream.paginate(1, (page) =>
Effect.gen(function*() {
const response = yield* fetchPage(page)
if (response.items.length === 0) {
return [response.items, undefined] // No more pages
}
return [response.items, page + 1] // Next page
})
)
import { Stream } from "effect"
const stream = Stream.range(1, 10).pipe(
// Transform each element
Stream.map((n) => n * 2),
// Filter elements
Stream.filter((n) => n > 5),
// Take first N elements
Stream.take(5),
// Drop first N elements
Stream.drop(2)
)
import { Effect, Stream } from "effect"
const stream = Stream.fromIterable([1, 2, 3]).pipe(
// Map with an Effect
Stream.mapEffect((n) =>
Effect.gen(function*() {
yield* Effect.sleep("100 millis")
return n * 2
})
),
// Process with concurrency
Stream.mapEffectPar((n) => fetchData(n), { concurrency: 5 })
)
FlatMap and Chaining
import { Stream } from "effect"
const stream = Stream.make(1, 2, 3).pipe(
Stream.flatMap((n) =>
Stream.range(1, n) // Creates n streams
)
)
// Emits: 1, 1, 2, 1, 2, 3
Consuming Streams
Running to Collection
import { Effect, Stream } from "effect"
const program = Effect.gen(function*() {
const stream = Stream.make(1, 2, 3, 4, 5)
// Collect all elements into an array
const array = yield* Stream.runCollect(stream)
console.log(array) // [1, 2, 3, 4, 5]
})
Running with Effects
import { Console, Effect, Stream } from "effect"
const program = Stream.make(1, 2, 3).pipe(
// Process each element with an Effect
Stream.runForEach((n) => Console.log(`Processing: ${n}`))
)
Folding and Reducing
import { Effect, Stream } from "effect"
const sum = Stream.range(1, 10).pipe(
Stream.runFold(0, (acc, n) => acc + n)
)
// Returns Effect<number> with sum of 1..10
const concatenated = Stream.make("a", "b", "c").pipe(
Stream.runFoldEffect("", (acc, s) =>
Effect.succeed(acc + s)
)
)
Error Handling
import { Effect, Stream } from "effect"
const stream = Stream.make(1, 2, 3).pipe(
Stream.mapEffect((n) =>
n === 2
? Effect.fail(new Error("Failed at 2"))
: Effect.succeed(n)
),
// Catch and recover from errors
Stream.catchAll((error) =>
Stream.make(-1)
)
)
Merging and Combining
Merge Streams
import { Stream } from "effect"
const stream1 = Stream.make(1, 2, 3)
const stream2 = Stream.make(4, 5, 6)
// Merge streams concurrently
const merged = Stream.merge(stream1, stream2)
// Merge with specific strategy
const mergedEither = Stream.mergeEither(stream1, stream2)
Zip Streams
import { Stream } from "effect"
const left = Stream.make(1, 2, 3)
const right = Stream.make("a", "b", "c")
// Zip element-wise
const zipped = Stream.zip(left, right)
// Emits: [1, "a"], [2, "b"], [3, "c"]
Concat Streams
import { Stream } from "effect"
const first = Stream.make(1, 2, 3)
const second = Stream.make(4, 5, 6)
// Concatenate streams sequentially
const concatenated = Stream.concat(first, second)
// Emits: 1, 2, 3, 4, 5, 6
Grouping and Chunking
import { Stream } from "effect"
const stream = Stream.range(1, 100).pipe(
// Group into chunks of 10
Stream.grouped(10),
// Group within time windows
Stream.groupedWithin(10, "1 second")
)
Scheduling and Timing
import { Schedule, Stream } from "effect"
// Throttle stream emissions
const throttled = Stream.range(1, 100).pipe(
Stream.schedule(Schedule.spaced("100 millis"))
)
// Debounce stream
const debounced = stream.pipe(
Stream.debounce("500 millis")
)
// Add delays between elements
const delayed = Stream.make(1, 2, 3).pipe(
Stream.schedule(Schedule.exponential("100 millis"))
)
Integration with PubSub
import { Effect, PubSub, Stream } from "effect"
const program = Effect.gen(function*() {
const pubsub = yield* PubSub.bounded<number>(10)
// Create a stream from PubSub
const stream = Stream.fromPubSub(pubsub)
// Publish to PubSub
yield* PubSub.publish(pubsub, 1)
yield* PubSub.publish(pubsub, 2)
// Consume stream
yield* Stream.take(stream, 2).pipe(
Stream.runForEach((n) => Console.log(n))
)
})
Best Practices
- Use Stream.runForEach for side effects: Process each element with effects
- Leverage chunking: Use
grouped or groupedWithin for batch processing
- Control concurrency: Use
mapEffectPar with appropriate concurrency limits
- Handle backpressure: Streams naturally handle backpressure with pull-based semantics
- Compose streams: Build complex streams from simpler ones
- Use
Stream.buffer to improve throughput
- Process in chunks when possible to amortize effect costs
- Use
mapEffectPar for concurrent processing of independent elements
- Consider
Stream.throttle to control emission rate
Next Steps
- Learn about Effect for handling individual values
- Explore PubSub for broadcasting messages
- Understand Queue for concurrent processing