Durable Streams Primitives
Deep dive into DurableCursor, DurableDeferred, DurableLock, DurableMailbox, and the ask pattern
Durable Streams Primitives
The foundational building blocks of Swarm Mail. Inspired by Kyle Matthews' Durable Streams protocol.
DurableCursor - Positioned Event Stream Consumption
Purpose: Read events from a stream with resumable position tracking.
Key Concept: Event streams are append-only logs. Cursors track the "last read position" (sequence number) and checkpoint it to the database. If an agent crashes, it resumes from the last committed position.
API
const cursor =
yield *
cursorService.create({
stream: "projects/foo/events", // Stream identifier
checkpoint: "agents/bar/position", // Unique checkpoint name
batchSize: 100, // Read 100 events at a time
});
// Consume events as async iterable
for await (const msg of cursor.consume()) {
yield * handleMessage(msg.value);
yield * msg.commit(); // Checkpoint this position
}Implementation Details
- Schema:
cursorstable with(stream, checkpoint)UNIQUE constraint - Batching: Reads events in batches (default 100) for efficiency
- Commit: Updates
positionin database + in-memoryRef - Resumability: On restart, loads last committed position from DB
Database Schema
CREATE TABLE cursors (
id SERIAL PRIMARY KEY,
stream TEXT NOT NULL, -- Stream name (e.g., "projects/foo/events")
checkpoint TEXT NOT NULL, -- Checkpoint name (e.g., "agents/bar/position")
position BIGINT NOT NULL, -- Last committed sequence number
updated_at BIGINT NOT NULL,
UNIQUE(stream, checkpoint)
);
CREATE INDEX idx_cursors_checkpoint ON cursors(checkpoint);Use Cases
- Inbox consumption - agent reads messages from stream
- Event processing - workers consume tasks from event log
- Exactly-once semantics - commit after processing, skip on replay
Example: Resumable Event Processing
import { DurableCursor } from "swarm-mail/streams/effect/cursor";
import { DurableCursorLive } from "swarm-mail/streams/effect/layers";
const program = Effect.gen(function* () {
const cursorService = yield* DurableCursor;
const cursor = yield* cursorService.create({
stream: "projects/foo/events",
checkpoint: "workers/event-processor",
batchSize: 50,
});
console.log("Starting from position:", yield* cursor.getPosition());
for await (const msg of cursor.consume()) {
console.log("Processing event:", msg.value);
// Simulate processing
yield* Effect.sleep("100 millis");
// Checkpoint position
yield* msg.commit();
}
});
Effect.runPromise(program.pipe(Effect.provide(DurableCursorLive)));
// If this crashes, next run starts from last committed positionEvent Stream Structure
events table (append-only log):
┌────┬──────────┬──────────────┬───────────────┬─────────────────────────────┐
│ id │ sequence │ type │ timestamp │ payload │
├────┼──────────┼──────────────┼───────────────┼─────────────────────────────┤
│ 1 │ 1 │ agent_reg... │ 1704000000000 │ { agent: "AgentA", ... } │
│ 2 │ 2 │ message_sent │ 1704000001000 │ { from: "A", to: ["B"] } │
│ 3 │ 3 │ file_reserv. │ 1704000002000 │ { agent: "A", paths: [...]} │
│ 4 │ 4 │ message_read │ 1704000003000 │ { message_id: 2, agent: "B"}│
│ 5 │ 5 │ file_releas. │ 1704000004000 │ { agent: "A", paths: [...]} │
└────┴──────────┴──────────────┴───────────────┴─────────────────────────────┘
▲
│
│ DurableCursor reads from here
│ Checkpoints last sequence number
│
┌───────────────────┐
│ cursors table │
├───────────────────┤
│ checkpoint: "..." │
│ position: 4 │ ← Resume from seq=5
└───────────────────┘DurableDeferred - Distributed Promises
Purpose: Create a "distributed promise" that can be resolved from anywhere (think of it as a URL-addressable future value).
Key Concept: You create a deferred with a unique URL, pass that URL to another agent, and block waiting for the response. The other agent resolves the deferred by URL, unblocking you.
API
// Agent A: Create deferred and send request
const deferred =
yield *
deferredService.create<Response>({
ttlSeconds: 60,
});
yield *
mailbox.send("agent-b", {
payload: { task: "getData" },
replyTo: deferred.url, // URL like "deferred:abc123"
});
const response = yield * deferred.value; // Blocks until resolved or timeout
// Agent B: Resolve deferred
yield * deferredService.resolve(envelope.replyTo, { data: "result" });Implementation Details
- Schema:
deferredtable withurl UNIQUE,resolved BOOLEAN,value JSONB - In-memory registry:
Map<url, Effect.Deferred>for instant resolution (no polling) - Fallback polling: If in-memory deferred missing (agent restart), polls database every 100ms
- TTL expiry: Auto-cleanup of expired deferreds
- Errors:
TimeoutErrorif TTL expires,NotFoundErrorif URL doesn't exist
Database Schema
CREATE TABLE deferred (
id SERIAL PRIMARY KEY,
url TEXT NOT NULL UNIQUE, -- Unique identifier (e.g., "deferred:abc123")
resolved BOOLEAN NOT NULL DEFAULT FALSE,
value JSONB, -- Resolution value
error TEXT, -- Rejection error message
expires_at BIGINT NOT NULL, -- TTL expiry timestamp
created_at BIGINT NOT NULL
);
CREATE INDEX idx_deferred_url ON deferred(url);
CREATE INDEX idx_deferred_expires ON deferred(expires_at);Use Cases
- Request/response - ask pattern (see below)
- RPC over streams - synchronous-style calls between agents
- Coordination - agent waits for signal from another agent
DurableLock - Distributed Mutual Exclusion
Purpose: Acquire exclusive locks on resources using Compare-And-Swap (CAS) pattern.
Key Concept: CAS (seq=0) pattern - try to INSERT (no lock exists) or UPDATE (lock expired or we already hold it). Uses exponential backoff for retries on contention.
API
// Acquire lock with retry
const lock =
yield *
lockService.acquire("my-resource", {
ttlSeconds: 30,
maxRetries: 10,
});
try {
// Critical section - only one agent here at a time
yield * doWork();
} finally {
yield * lock.release();
}
// Or use helper
yield * lockService.withLock("my-resource", Effect.succeed(42));Implementation Details
- Schema:
lockstable withresource UNIQUE,holder TEXT,seq INTEGER - CAS logic:
- Try
INSERT(no lock exists) → success - If INSERT fails, try
UPDATE WHERE expires_at < now OR holder = me→ success if stale/reentrant - If UPDATE returns 0 rows → contention, retry with backoff
- Try
- Exponential backoff: 50ms base delay, doubles each retry (50ms, 100ms, 200ms...)
- Auto-expiry: TTL stored in
expires_at, stale locks can be claimed - Errors:
LockTimeoutif max retries exceeded,LockNotHeldif release by wrong holder
Database Schema
CREATE TABLE locks (
id SERIAL PRIMARY KEY,
resource TEXT NOT NULL UNIQUE, -- Resource being locked
holder TEXT NOT NULL, -- Agent holding lock
seq INTEGER NOT NULL, -- CAS sequence number
acquired_at BIGINT NOT NULL,
expires_at BIGINT NOT NULL,
UNIQUE(resource)
);Use Cases
- File reservations - prevent edit conflicts (Swarm Mail uses this)
- Critical sections - only one agent modifying shared state
- Leader election - first to acquire lock becomes leader
File Reservation Protocol
┌──────────┐ ┌──────────┐
│ Agent A │ │ Agent B │
└────┬─────┘ └────┬─────┘
│ │
│ 1. Reserve src/auth.ts (exclusive) │
├────────────────────────────────────────────┐ │
│ │ │
│ 2. DurableLock.acquire("src/auth.ts") │ │
│ → Granted (no conflicts) │ │
│ │ │
│ 3. Edit src/auth.ts │ │
│ │ │ 4. Reserve src/auth.ts
│ │ ├──────────────────────┐
│ │ │ │
│ │ │ 5. Lock contention │
│ │ │ → Warned (Agent A │
│ │ │ holds lock) │
│ │ │ │
│ 6. Release src/auth.ts │ │ │
├────────────────────────────────────────────┤ │ │
│ │ │ │
│ 7. DurableLock.release() │ │ 8. Retry acquire │
│ │ │ → Granted │
│ │ │ │File Reservation Protocol (CAS Lock)
┌──────────┐ ┌──────────┐
│ Agent A │ │ Agent B │
└────┬─────┘ └────┬─────┘
│ │
│ 1. Reserve src/auth.ts │
├─────────────────────────────┐ │
│ │ │
│ 2. DurableLock.acquire() │ │
│ INSERT INTO locks │ │
│ VALUES ('src/auth.ts', │ │
│ 'AgentA', │ │
│ seq=0) │ │
│ → SUCCESS │ │
│ │ │
│ 3. Edit src/auth.ts │ │
│ │ │
│ │ 4. Reserve src/auth.ts
│ │ ├──────────────────────┐
│ │ │ │
│ │ │ 5. TRY INSERT │
│ │ │ → CONFLICT │
│ │ │ │
│ │ │ 6. TRY UPDATE │
│ │ │ WHERE expires_at │
│ │ │ < now OR │
│ │ │ holder='AgentB' │
│ │ │ → 0 rows │
│ │ │ │
│ │ │ 7. Retry with │
│ │ │ exponential │
│ │ │ backoff... │
│ │ │ │
│ 8. Release lock │ │ │
├─────────────────────────────┤ │ │
│ │ │ │
│ 9. DELETE FROM locks │ │ 10. Retry succeeds │
│ WHERE resource=... │ │ UPDATE (0 rows │
│ AND holder='AgentA' │ │ before, now │
│ │ │ lock is free) │
│ │ │ → SUCCESS │
│ │ │ │DurableMailbox - Actor-Style Messaging
Purpose: Send/receive envelopes between agents using cursor-based positioned consumption.
Key Concept: Combines DurableCursor (positioned reading) + Envelope pattern (payload + metadata). Each agent has a named mailbox, messages are filtered by recipient during consumption.
API
const mailbox =
yield *
mailboxService.create({
agent: "worker-1",
projectKey: "proj-123",
});
// Send message with optional reply channel
yield *
mailbox.send("worker-2", {
payload: { task: "process-data" },
replyTo: "deferred:xyz", // For request/response
threadId: "bd-123", // Conversation tracking
});
// Receive messages (filters to only messages for this agent)
for await (const envelope of mailbox.receive()) {
console.log(envelope.payload);
if (envelope.replyTo) {
yield * DurableDeferred.resolve(envelope.replyTo, result);
}
yield * envelope.commit(); // Checkpoint position
}Implementation Details
- Cursor creation: Creates
DurableCursorwith filtertypes: ["message_sent"] - Filtering:
eventToEnvelope()skips messages not addressed to this agent - Envelope structure:
{ payload: T, // Your message data replyTo?: string, // Deferred URL for response sender: string, // From agent messageId: number, // Message ID threadId?: string, // Conversation grouping commit: () => Effect // Checkpoint this message } - Storage: Messages stored as
message_sentevents in event stream
Database Schema
messages - Materialized Message View
CREATE TABLE messages (
id SERIAL PRIMARY KEY,
project_key TEXT NOT NULL,
from_agent TEXT NOT NULL,
subject TEXT NOT NULL,
body TEXT NOT NULL,
thread_id TEXT,
importance TEXT DEFAULT 'normal',
created_at BIGINT NOT NULL,
read_by JSONB DEFAULT '[]', -- Array of agent names who read it
acked_by JSONB DEFAULT '[]' -- Array of agent names who acked it
);message_recipients - Message Routing
CREATE TABLE message_recipients (
id SERIAL PRIMARY KEY,
message_id INTEGER NOT NULL REFERENCES messages(id),
agent_name TEXT NOT NULL,
UNIQUE(message_id, agent_name)
);
CREATE INDEX idx_message_recipients_agent ON message_recipients(agent_name);reservations - Active File Locks
CREATE TABLE reservations (
id SERIAL PRIMARY KEY,
project_key TEXT NOT NULL,
agent_name TEXT NOT NULL,
path_pattern TEXT NOT NULL, -- File path or glob pattern
exclusive BOOLEAN NOT NULL,
reason TEXT,
expires_at BIGINT NOT NULL
);
CREATE INDEX idx_reservations_agent ON reservations(agent_name);
CREATE INDEX idx_reservations_path ON reservations(path_pattern);Use Cases
- Agent inbox - receive tasks, status updates, blockers
- Broadcast - send message to multiple agents
- Request/response - combine with DurableDeferred (see ask pattern)
Message Flow Between Agents
┌───────────┐ ┌──────────────┐ ┌───────────┐
│ Agent A │ │ Event Stream │ │ Agent B │
└─────┬─────┘ └──────┬───────┘ └─────┬─────┘
│ │ │
│ 1. Send message │ │
├────────────────────────────────>│ │
│ { to: "agent-b", │ 2. Append message_sent event │
│ payload: {...} } │ (id=42, seq=100) │
│ │ │
│ │ 3. Update message view │
│ │ INSERT INTO messages (...) │
│ │ │
│ │ │
│ │ 4. Agent B consumes │
│ │<─────────────────────────────────┤
│ │ DurableCursor.consume() │
│ │ afterSequence=99 │
│ │ │
│ │ 5. Return events │
│ ├─────────────────────────────────>│
│ │ [{ seq=100, type=msg_sent }] │
│ │ │
│ │ 6. Process message │
│ │<─────────────────────────────────┤
│ │ msg.commit() │
│ │ │
│ │ 7. Checkpoint position=100 │
│ │<─────────────────────────────────┤
│ │ │The Ask Pattern - Request/Response over Streams
Purpose: Synchronous-style RPC between agents using async streams.
How It Works
┌─────────┐ ┌─────────┐
│ Agent A │ │ Agent B │
└────┬────┘ └────┬────┘
│ │
│ 1. Create deferred │
│ url = "deferred:abc123" │
│ │
│ 2. Send message with replyTo=url │
├──────────────────────────────────────────────────────────>│
│ │
│ 3. Block on deferred.value │ 4. Process request
│ (waits...) │
│ │
│ 5. Resolve deferred(url) │
│<───────────────────────────────────────────────────────────┤
│ │
│ 6. Unblocked, return response │
│ │Code Example
// Agent A (caller)
const response =
yield *
ask<Request, Response>({
mailbox: myMailbox,
to: "worker-2",
payload: { query: "getUserData", userId: 123 },
ttlSeconds: 30,
});
// Agent B (responder)
for await (const envelope of mailbox.receive()) {
const result = processRequest(envelope.payload);
if (envelope.replyTo) {
yield * DurableDeferred.resolve(envelope.replyTo, result);
}
yield * envelope.commit();
}Why This Matters
- Synchronous feel, async reality - code looks like RPC, but it's event-driven
- Resilient - if responder crashes, caller gets timeout (not hung forever)
- Auditable - all request/response pairs in event log
- Type-safe - full TypeScript inference for request/response types
Example: Request/Response with Ask Pattern
import { ask, respond } from "swarm-mail/streams/effect/ask";
import { DurableAskLive } from "swarm-mail/streams/effect/layers";
// Agent A (requester)
const agentA = Effect.gen(function* () {
const mailboxService = yield* DurableMailbox;
const mailbox = yield* mailboxService.create({
agent: "agent-a",
projectKey: "proj",
});
console.log("Requesting user data...");
const response = yield* ask<Request, Response>({
mailbox,
to: "agent-b",
payload: { userId: 123 },
ttlSeconds: 30,
});
console.log("Got response:", response);
});
// Agent B (responder)
const agentB = Effect.gen(function* () {
const mailboxService = yield* DurableMailbox;
const mailbox = yield* mailboxService.create({
agent: "agent-b",
projectKey: "proj",
});
console.log("Listening for requests...");
for await (const envelope of mailbox.receive<Request>()) {
console.log("Processing request:", envelope.payload);
const result = { name: "John", id: envelope.payload.userId };
yield* respond(envelope, result);
yield* envelope.commit();
}
});
// Run both agents in parallel
const program = Effect.all([agentA, agentB], { concurrency: "unbounded" });
Effect.runPromise(program.pipe(Effect.provide(DurableAskLive)));Ask Pattern Sequence Diagram
Summary
Swarm Mail provides four foundational primitives:
- DurableCursor - Resumable event stream consumption with checkpointing
- DurableDeferred - Distributed promises for request/response
- DurableLock - CAS-based mutual exclusion for file reservations
- DurableMailbox - Actor-style messaging with positioned consumption
These compose into higher-level patterns like the ask pattern for RPC-style communication between agents.
All primitives are:
- ✅ Event-sourced - full audit trail
- ✅ Resumable - checkpoint positions survive crashes
- ✅ Type-safe - full TypeScript + Effect-TS inference
- ✅ Local-first - embedded PGLite, no network dependencies
See the main documentation for architecture overview and integration guides.