Documentation Index
Fetch the complete documentation index at: https://effect-ts-effect-smol.mintlify.app/llms.txt
Use this file to discover all available pages before exploring further.
Overview
The effect/unstable/workflow modules provide tools for building durable workflows that:
- Survive process restarts
- Handle long-running operations
- Maintain state across interruptions
- Coordinate distributed activities
What are workflows?
Workflows are durable execution contexts that can:
- Persist state - Workflow state is stored and restored across restarts
- Handle delays - Wait for hours, days, or weeks without blocking
- Coordinate activities - Orchestrate multiple operations across services
- Recover from failures - Automatically retry failed activities
Defining a workflow
import { Effect, Schema } from "effect"
import { Workflow } from "effect/unstable/workflow"
class OrderWorkflow extends Schema.Class<OrderWorkflow>("OrderWorkflow")({
orderId: Schema.String,
status: Schema.String
}) {}
const processOrderWorkflow = Workflow.make("ProcessOrder", OrderWorkflow,
Effect.gen(function*() {
const workflow = yield* Workflow.Workflow
// Reserve inventory
yield* workflow.activity("reserveInventory")
// Wait for payment (could be hours or days)
const paymentReceived = yield* workflow.signal("paymentReceived")
if (!paymentReceived) {
yield* workflow.activity("releaseInventory")
return { orderId: workflow.input.orderId, status: "cancelled" }
}
// Ship order
yield* workflow.activity("shipOrder")
return { orderId: workflow.input.orderId, status: "shipped" }
})
)
Activities
Activities are the units of work performed by workflows:
import { Activity } from "effect/unstable/workflow"
const reserveInventory = Activity.make(
"reserveInventory",
Effect.gen(function*() {
const inventory = yield* InventoryService
yield* inventory.reserve(orderId)
})
)
const shipOrder = Activity.make(
"shipOrder",
Effect.gen(function*() {
const shipping = yield* ShippingService
yield* shipping.createShipment(orderId)
})
)
Signals
Signals allow external events to communicate with workflows:
// In workflow
const approved = yield* workflow.signal("approvalReceived")
if (approved) {
yield* workflow.activity("processApprovedOrder")
}
// Send signal from outside
const client = yield* WorkflowClient
yield* client.signal(workflowId, "approvalReceived", true)
Durable delays
Workflows can wait without consuming resources:
// Wait 24 hours
yield* workflow.sleep("24 hours")
// Wait until a specific time
yield* workflow.sleepUntil(reminderDate)
Starting workflows
import { Effect } from "effect"
import { WorkflowEngine } from "effect/unstable/workflow"
const startOrder = Effect.gen(function*() {
const engine = yield* WorkflowEngine.WorkflowEngine
const workflowId = yield* engine.start(processOrderWorkflow, {
orderId: "order-123",
status: "pending"
})
yield* Effect.log(`Started workflow: ${workflowId}`)
return workflowId
})
Querying workflows
const getOrderStatus = Effect.gen(function*() {
const engine = yield* WorkflowEngine.WorkflowEngine
const workflow = yield* engine.get(workflowId)
return workflow.state.status
})
Workflow state
Workflows maintain durable state:
const workflow = Effect.gen(function*() {
const wf = yield* Workflow.Workflow
// Read state
const currentCount = wf.state.count
// Update state
wf.state.count = currentCount + 1
// State is automatically persisted
})
Error handling
Activities can be retried automatically:
const activity = Activity.make(
"sendEmail",
sendEmailEffect,
{
retries: 3,
backoff: "exponential"
}
)
Workflow proxy
Call workflows like regular functions:
import { WorkflowProxy } from "effect/unstable/workflow"
const proxy = yield* WorkflowProxy.make(processOrderWorkflow)
const result = yield* proxy({
orderId: "order-123",
status: "pending"
})
Complete example
import { Effect, Layer, Schema } from "effect"
import { Activity, Workflow, WorkflowEngine } from "effect/unstable/workflow"
class OrderInput extends Schema.Class<OrderInput>("OrderInput")({
orderId: Schema.String,
items: Schema.Array(Schema.String)
}) {}
class OrderResult extends Schema.Class<OrderResult>("OrderResult")({
orderId: Schema.String,
status: Schema.String
}) {}
// Define activities
const reserveInventory = Activity.make(
"reserveInventory",
(input: OrderInput) => Effect.gen(function*() {
yield* Effect.log(`Reserving inventory for ${input.orderId}`)
yield* Effect.sleep("1 second")
})
)
const chargePayment = Activity.make(
"chargePayment",
(input: OrderInput) => Effect.gen(function*() {
yield* Effect.log(`Charging payment for ${input.orderId}`)
yield* Effect.sleep("1 second")
})
)
const shipOrder = Activity.make(
"shipOrder",
(input: OrderInput) => Effect.gen(function*() {
yield* Effect.log(`Shipping order ${input.orderId}`)
yield* Effect.sleep("1 second")
})
)
// Define workflow
const processOrder = Workflow.make(
"ProcessOrder",
OrderInput,
Effect.gen(function*() {
const workflow = yield* Workflow.Workflow
const input = workflow.input
yield* Effect.log(`Processing order ${input.orderId}`)
// Execute activities sequentially
yield* workflow.execute(reserveInventory(input))
yield* workflow.execute(chargePayment(input))
// Wait for fulfillment delay
yield* workflow.sleep("2 seconds")
yield* workflow.execute(shipOrder(input))
return OrderResult.make({
orderId: input.orderId,
status: "shipped"
})
})
)
// Start workflow
const program = Effect.gen(function*() {
const engine = yield* WorkflowEngine.WorkflowEngine
const workflowId = yield* engine.start(processOrder, {
orderId: "order-456",
items: ["item1", "item2"]
})
yield* Effect.log(`Workflow started: ${workflowId}`)
// Wait for completion
const result = yield* engine.await(workflowId)
yield* Effect.log(`Order ${result.orderId} ${result.status}`)
})
Use cases
- Order processing - Handle multi-step order fulfillment
- Approval flows - Wait for human approval in business processes
- Scheduled tasks - Run tasks at specific times or intervals
- Sagas - Coordinate distributed transactions
- Event-driven processes - React to events over long periods
See also
- Cluster - Distribute workflows across nodes
- RPC - Define workflow activities