Documentation Index
Fetch the complete documentation index at: https://effect-ts-effect-smol.mintlify.app/llms.txt
Use this file to discover all available pages before exploring further.
The PubSub module provides utilities for building publish-subscribe systems where publishers can send messages to many subscribers concurrently.
Overview
A PubSub<A> is an asynchronous message hub where:
- Publishers can publish messages of type
A
- Subscribers can subscribe to receive messages
- Multiple subscribers can receive the same messages
- Supports various backpressure strategies
- Handles concurrent access safely
Creating PubSub
Bounded PubSub
Creates a PubSub with backpressure - publishers wait when full:
import { Effect, PubSub } from "effect"
const program = Effect.gen(function*() {
// Create a bounded PubSub with capacity 10
const pubsub = yield* PubSub.bounded<string>(10)
// Publish messages
yield* PubSub.publish(pubsub, "Hello")
yield* PubSub.publish(pubsub, "World")
})
Unbounded PubSub
Creates a PubSub without capacity limits:
import { Effect, PubSub } from "effect"
const program = Effect.gen(function*() {
// Unbounded - never blocks publishers
const pubsub = yield* PubSub.unbounded<number>()
yield* PubSub.publish(pubsub, 42)
})
Dropping PubSub
Drops new messages when full:
import { Effect, PubSub } from "effect"
const program = Effect.gen(function*() {
// Drops messages when capacity is reached
const pubsub = yield* PubSub.dropping<string>(100)
// This returns false if dropped
const published = yield* PubSub.publish(pubsub, "message")
})
Sliding PubSub
Drops oldest messages when full:
import { Effect, PubSub } from "effect"
const program = Effect.gen(function*() {
// Removes oldest messages when full
const pubsub = yield* PubSub.sliding<string>(10)
yield* PubSub.publish(pubsub, "message")
})
Publishing Messages
Publish Single Message
import { Effect, PubSub } from "effect"
const program = Effect.gen(function*() {
const pubsub = yield* PubSub.bounded<string>(10)
// Publish returns when message is accepted
yield* PubSub.publish(pubsub, "Hello")
yield* Effect.log("Message published")
})
Publish Multiple Messages
import { Effect, PubSub } from "effect"
const program = Effect.gen(function*() {
const pubsub = yield* PubSub.bounded<string>(10)
// Publish all messages
yield* PubSub.publishAll(pubsub, [
"Message 1",
"Message 2",
"Message 3"
])
})
Subscribing to Messages
Basic Subscription
import { Effect, PubSub } from "effect"
const program = Effect.gen(function*() {
const pubsub = yield* PubSub.bounded<string>(10)
// Subscribe within a scope for automatic cleanup
yield* Effect.scoped(Effect.gen(function*() {
const subscription = yield* PubSub.subscribe(pubsub)
// Take messages from subscription
const message1 = yield* PubSub.take(subscription)
const message2 = yield* PubSub.take(subscription)
console.log(message1, message2)
}))
})
Multiple Subscribers
import { Effect, PubSub } from "effect"
const program = Effect.gen(function*() {
const pubsub = yield* PubSub.bounded<number>(10)
// Multiple subscribers receive the same messages
yield* Effect.scoped(Effect.gen(function*() {
const sub1 = yield* PubSub.subscribe(pubsub)
const sub2 = yield* PubSub.subscribe(pubsub)
yield* PubSub.publish(pubsub, 42)
const value1 = yield* PubSub.take(sub1)
const value2 = yield* PubSub.take(sub2)
console.log(value1, value2) // Both are 42
}))
})
Taking Messages
Take Single Message
import { Effect, PubSub } from "effect"
const subscriber = Effect.gen(function*() {
const subscription = yield* PubSub.subscribe(pubsub)
// Blocks until a message is available
const message = yield* PubSub.take(subscription)
return message
})
Take Multiple Messages
import { Effect, PubSub } from "effect"
const subscriber = Effect.gen(function*() {
const subscription = yield* PubSub.subscribe(pubsub)
// Take up to N messages (non-blocking)
const messages = yield* PubSub.takeUpTo(subscription, 5)
console.log(messages) // Array with 0-5 messages
})
Take All Available
import { Effect, PubSub } from "effect"
const subscriber = Effect.gen(function*() {
const subscription = yield* PubSub.subscribe(pubsub)
// Take all currently available messages
const messages = yield* PubSub.takeAll(subscription)
return messages
})
Integration with Streams
import { Effect, PubSub, Stream } from "effect"
const program = Effect.gen(function*() {
const pubsub = yield* PubSub.bounded<number>(10)
// Convert subscription to Stream
const stream = Stream.fromPubSub(pubsub)
// Process messages as a stream
yield* Stream.take(stream, 5).pipe(
Stream.runForEach((n) => Console.log(`Received: ${n}`))
)
})
Broadcasting Domain Events
import { Effect, Layer, PubSub, Schema, ServiceMap } from "effect"
// Define domain event
class UserCreated extends Schema.Class<UserCreated>()("UserCreated", {
userId: Schema.Number,
email: Schema.String
}) {}
type DomainEvent = UserCreated
// Event bus service
class EventBus extends ServiceMap.Service<EventBus, {
publish(event: DomainEvent): Effect.Effect<void>
subscribe(): Effect.Effect<PubSub.Subscription<DomainEvent>, never, Scope>
}>()("EventBus") {
static readonly layer = Layer.effect(
EventBus,
Effect.gen(function*() {
const pubsub = yield* PubSub.unbounded<DomainEvent>()
return EventBus.of({
publish: (event) => PubSub.publish(pubsub, event),
subscribe: () => PubSub.subscribe(pubsub)
})
})
)
}
// Publisher
const createUser = Effect.gen(function*() {
const eventBus = yield* EventBus
yield* eventBus.publish(
new UserCreated({ userId: 123, email: "user@example.com" })
)
})
// Subscriber
const userEventHandler = Effect.gen(function*() {
const eventBus = yield* EventBus
yield* Effect.scoped(Effect.gen(function*() {
const subscription = yield* eventBus.subscribe()
yield* Effect.forever(
Effect.gen(function*() {
const event = yield* PubSub.take(subscription)
if (event._tag === "UserCreated") {
yield* Effect.log(`User created: ${event.email}`)
}
})
)
}))
})
Shutdown and Cleanup
import { Effect, PubSub } from "effect"
const program = Effect.gen(function*() {
const pubsub = yield* PubSub.bounded<string>(10)
// Shutdown the PubSub
yield* PubSub.shutdown(pubsub)
// Further operations will fail
const result = yield* Effect.exit(
PubSub.publish(pubsub, "message")
)
console.log(result._tag) // "Failure"
})
Capacity and Size
import { Effect, PubSub } from "effect"
const program = Effect.gen(function*() {
const pubsub = yield* PubSub.bounded<string>(10)
// Get current size
const size = yield* PubSub.size(pubsub)
console.log(size)
// Get capacity
const capacity = yield* PubSub.capacity(pubsub)
console.log(capacity) // 10
// Check if full
const full = yield* PubSub.isFull(pubsub)
console.log(full)
})
Best Practices
- Use scoped subscriptions: Always subscribe within Effect.scoped for automatic cleanup
- Choose appropriate capacity: Size PubSub based on producer/consumer rates
- Select the right strategy: Use bounded for backpressure, dropping/sliding for lossy scenarios
- Handle shutdown gracefully: Shutdown PubSub when no longer needed
- Use with Streams: Convert to Stream for powerful composition
- Avoid blocking subscribers: Process messages asynchronously to prevent blocking
Common Patterns
Fan-out Pattern
import { Effect, PubSub } from "effect"
// One producer, multiple consumers
const fanOut = Effect.gen(function*() {
const pubsub = yield* PubSub.bounded<number>(10)
// Producer
const producer = Effect.gen(function*() {
for (let i = 0; i < 100; i++) {
yield* PubSub.publish(pubsub, i)
}
})
// Multiple consumers
const consumer = Effect.scoped(Effect.gen(function*() {
const subscription = yield* PubSub.subscribe(pubsub)
yield* Effect.forever(
Effect.gen(function*() {
const value = yield* PubSub.take(subscription)
yield* processValue(value)
})
)
}))
// Run producer with multiple consumers
yield* Effect.all(
[producer, consumer, consumer, consumer],
{ concurrency: "unbounded" }
)
})
Next Steps
- Learn about Stream for processing message sequences
- Explore Queue for point-to-point messaging
- Understand Effect for composing concurrent operations