Documentation Index
Fetch the complete documentation index at: https://effect-ts-effect-smol.mintlify.app/llms.txt
Use this file to discover all available pages before exploring further.
The Queue module provides asynchronous, concurrent queues for message passing between producers and consumers with built-in backpressure support.
Overview
A Queue<A, E> is an asynchronous queue that:
- Can be offered to (enqueue) and taken from (dequeue)
- Supports concurrent producers and consumers
- Provides backpressure strategies (suspend, drop, slide)
- Can signal completion or failure
- Handles concurrent access safely
Creating Queues
Bounded Queue
Creates a queue with backpressure when full:
import { Effect, Queue } from "effect"
const program = Effect.gen(function*() {
// Create a bounded queue with capacity 10
const queue = yield* Queue.bounded<string>(10)
// Offer items to the queue
yield* Queue.offer(queue, "hello")
yield* Queue.offer(queue, "world")
// Take items from the queue
const item1 = yield* Queue.take(queue)
const item2 = yield* Queue.take(queue)
console.log([item1, item2]) // ["hello", "world"]
})
Unbounded Queue
Creates a queue without capacity limits:
import { Effect, Queue } from "effect"
const program = Effect.gen(function*() {
// Unbounded queue - never blocks on offer
const queue = yield* Queue.unbounded<number>()
yield* Queue.offer(queue, 42)
})
Dropping Queue
Drops new items when full:
import { Effect, Queue } from "effect"
const program = Effect.gen(function*() {
// Drops items when capacity is reached
const queue = yield* Queue.dropping<string>(5)
// Returns false if item was dropped
const accepted = yield* Queue.offer(queue, "message")
})
Sliding Queue
Removes oldest items when full:
import { Effect, Queue } from "effect"
const program = Effect.gen(function*() {
// Removes oldest items to make room for new ones
const queue = yield* Queue.sliding<number>(10)
yield* Queue.offer(queue, 42)
})
Offering Items
Offer Single Item
import { Effect, Queue } from "effect"
const program = Effect.gen(function*() {
const queue = yield* Queue.bounded<string>(10)
// Offer blocks if queue is full (for bounded queues)
yield* Queue.offer(queue, "message")
})
Offer Multiple Items
import { Effect, Queue } from "effect"
const program = Effect.gen(function*() {
const queue = yield* Queue.bounded<number>(10)
// Offer all items
yield* Queue.offerAll(queue, [1, 2, 3, 4, 5])
})
Taking Items
Take Single Item
import { Effect, Queue } from "effect"
const consumer = Effect.gen(function*() {
const queue = yield* Queue.bounded<string>(10)
// Blocks until an item is available
const item = yield* Queue.take(queue)
return item
})
Take Multiple Items
import { Effect, Queue } from "effect"
const consumer = Effect.gen(function*() {
const queue = yield* Queue.bounded<number>(10)
// Take up to N items (non-blocking)
const items = yield* Queue.takeUpTo(queue, 5)
console.log(items) // Array with 0-5 items
})
Take All Available
import { Effect, Queue } from "effect"
const consumer = Effect.gen(function*() {
const queue = yield* Queue.bounded<string>(10)
// Take all currently available items
const items = yield* Queue.takeAll(queue)
return items
})
Producer-Consumer Pattern
import { Effect, Queue } from "effect"
const program = Effect.gen(function*() {
const queue = yield* Queue.bounded<number>(10)
// Producer: generates items
const producer = Effect.gen(function*() {
for (let i = 0; i < 100; i++) {
yield* Queue.offer(queue, i)
yield* Effect.sleep("10 millis")
}
// Signal completion
yield* Queue.done(queue)
})
// Consumer: processes items
const consumer = Effect.gen(function*() {
while (true) {
const item = yield* Queue.take(queue)
yield* Effect.log(`Processing: ${item}`)
}
})
// Run producer and consumer concurrently
yield* Effect.all(
[producer, consumer],
{ concurrency: "unbounded" }
)
})
Multiple Consumers
import { Effect, Queue } from "effect"
const program = Effect.gen(function*() {
const queue = yield* Queue.bounded<number>(100)
// Producer
const producer = Effect.gen(function*() {
for (let i = 0; i < 1000; i++) {
yield* Queue.offer(queue, i)
}
})
// Consumer worker
const worker = (id: number) =>
Effect.gen(function*() {
yield* Effect.forever(
Effect.gen(function*() {
const item = yield* Queue.take(queue)
yield* Effect.log(`Worker ${id} processing: ${item}`)
yield* Effect.sleep("100 millis")
})
)
})
// Run with multiple workers
yield* Effect.all(
[
producer,
worker(1),
worker(2),
worker(3)
],
{ concurrency: "unbounded" }
)
})
Queue State and Inspection
Check Size and Capacity
import { Effect, Queue } from "effect"
const program = Effect.gen(function*() {
const queue = yield* Queue.bounded<number>(10)
yield* Queue.offerAll(queue, [1, 2, 3])
// Get current size
const size = yield* Queue.size(queue)
console.log(size) // 3
// Get capacity
const capacity = yield* Queue.capacity(queue)
console.log(capacity) // 10
// Check if empty
const empty = yield* Queue.isEmpty(queue)
console.log(empty) // false
// Check if full
const full = yield* Queue.isFull(queue)
console.log(full) // false
})
Completion and Errors
Signal Completion
import { Effect, Queue } from "effect"
const program = Effect.gen(function*() {
const queue = yield* Queue.bounded<string>(10)
// Signal that queue is done
yield* Queue.done(queue)
// Subsequent takes will fail
const result = yield* Effect.exit(Queue.take(queue))
console.log(result._tag) // "Failure"
})
Signal Failure
import { Cause, Effect, Queue } from "effect"
const program = Effect.gen(function*() {
const queue = yield* Queue.bounded<string, string>(10)
// Signal failure
yield* Queue.fail(queue, "Processing error")
// Subsequent operations will fail with this error
const result = yield* Effect.exit(Queue.take(queue))
if (result._tag === "Failure") {
console.log(Cause.failures(result.cause)) // ["Processing error"]
}
})
Queue as a Service
import { Effect, Layer, Queue, ServiceMap } from "effect"
class TaskQueue extends ServiceMap.Service<TaskQueue, {
submit(task: string): Effect.Effect<void>
process(): Effect.Effect<string>
}>()("TaskQueue") {
static readonly layer = Layer.effect(
TaskQueue,
Effect.gen(function*() {
const queue = yield* Queue.bounded<string>(100)
return TaskQueue.of({
submit: (task) => Queue.offer(queue, task),
process: () => Queue.take(queue)
})
})
)
}
// Usage
const program = Effect.gen(function*() {
const taskQueue = yield* TaskQueue
// Submit tasks
yield* taskQueue.submit("task-1")
yield* taskQueue.submit("task-2")
// Process tasks
const task = yield* taskQueue.process()
yield* Effect.log(`Processing: ${task}`)
}).pipe(
Effect.provide(TaskQueue.layer)
)
Integration with Streams
import { Effect, Queue, Stream } from "effect"
const program = Effect.gen(function*() {
const queue = yield* Queue.bounded<number>(10)
// Convert queue to stream
const stream = Stream.fromQueue(queue)
// Process as stream
yield* Stream.take(stream, 5).pipe(
Stream.runForEach((n) => Console.log(`Received: ${n}`))
)
})
Advanced Patterns
Priority Queue Simulation
import { Effect, Queue } from "effect"
class PriorityTask {
constructor(
readonly priority: number,
readonly task: string
) {}
}
const program = Effect.gen(function*() {
const highPriorityQueue = yield* Queue.bounded<string>(10)
const lowPriorityQueue = yield* Queue.bounded<string>(10)
const submit = (task: PriorityTask) =>
task.priority > 5
? Queue.offer(highPriorityQueue, task.task)
: Queue.offer(lowPriorityQueue, task.task)
const process = Effect.gen(function*() {
// Try high priority first
const highPriorityItems = yield* Queue.takeUpTo(highPriorityQueue, 1)
if (highPriorityItems.length > 0) {
return highPriorityItems[0]
}
// Fall back to low priority
return yield* Queue.take(lowPriorityQueue)
})
})
Rate Limiting
import { Effect, Queue, Schedule } from "effect"
const rateLimitedQueue = Effect.gen(function*() {
const queue = yield* Queue.bounded<string>(100)
// Consumer with rate limiting
const consumer = Effect.gen(function*() {
yield* Effect.forever(
Effect.gen(function*() {
const item = yield* Queue.take(queue)
yield* processItem(item)
}).pipe(
Effect.repeat(Schedule.spaced("100 millis"))
)
)
})
return { queue, consumer }
})
Best Practices
- Choose the right queue type: Use bounded for backpressure, unbounded for fire-and-forget
- Signal completion: Use Queue.done() to gracefully shutdown consumers
- Handle errors properly: Use Queue.fail() to propagate errors to consumers
- Avoid blocking: Process items asynchronously to prevent blocking
- Use appropriate capacity: Size queues based on producer/consumer rates
- Monitor queue depth: Track queue size to detect bottlenecks
Next Steps
- Learn about PubSub for broadcasting messages
- Explore Stream for processing sequences
- Understand Effect for concurrent operations