🐝Swarm Tools
Decisions

ADR-004: Message Queue Features

Priority queues, DLQ, TTL, pub/sub, saga orchestration

ADR-004: Advanced Message Queue Features

Status: Proposed
Date: December 2024

Context

Current Swarm Mail message queue is basic:

  • FIFO message delivery
  • No priority ordering
  • No retry/dead-letter handling
  • No TTL/expiration
  • No pub/sub or saga patterns

Research shows PGLite supports all enterprise message queue patterns:

  • Priority queues via ORDER BY priority DESC
  • Dead Letter Queues (DLQ) with retry tracking
  • Time-To-Live (TTL) with expires_at timestamps
  • Pub/sub via topic filtering
  • Saga orchestration with compensation

Decision

Add enterprise message queue features using standard SQL patterns.

1. Priority Queues

ALTER TABLE messages ADD COLUMN priority INTEGER DEFAULT 1;
CREATE INDEX idx_messages_priority ON messages(recipient, priority DESC, received_at DESC);

-- Fetch highest priority first
SELECT * FROM messages
WHERE recipient = $1 AND read_at IS NULL
ORDER BY priority DESC, received_at DESC
LIMIT 5;

Priority levels:

  • 0 = urgent (blocks, errors)
  • 1 = high (task completion, conflicts)
  • 2 = normal (progress updates)
  • 3 = low (info, FYI)

2. Dead Letter Queue (DLQ)

CREATE TABLE failed_messages (
  id SERIAL PRIMARY KEY,
  original_message_id INTEGER REFERENCES messages(id),
  retry_count INTEGER NOT NULL,
  failure_reason TEXT NOT NULL,
  failed_at TIMESTAMP NOT NULL DEFAULT NOW(),
  will_retry_at TIMESTAMP,  -- NULL = no retry
  metadata JSONB
);

-- Move to DLQ after 3 retries
INSERT INTO failed_messages (original_message_id, retry_count, failure_reason)
VALUES ($1, $2, $3);

3. Time-To-Live (TTL)

ALTER TABLE messages ADD COLUMN expires_at TIMESTAMP;

-- Background cleanup job
DELETE FROM messages
WHERE expires_at IS NOT NULL AND expires_at < NOW() AND read_at IS NULL;

-- Fetch only non-expired messages
SELECT * FROM messages
WHERE recipient = $1 AND read_at IS NULL
  AND (expires_at IS NULL OR expires_at > NOW())
ORDER BY priority DESC, received_at DESC;

4. Pub/Sub Topics

ALTER TABLE messages ADD COLUMN topic VARCHAR(255);
CREATE INDEX idx_messages_topic ON messages(topic, received_at DESC);

-- Subscribe to topic pattern
SELECT * FROM messages
WHERE topic LIKE 'builds.%' AND received_at > $1
ORDER BY received_at ASC;

-- Wildcard subscriptions
WHERE topic LIKE 'agent.%.error'  -- All agent errors
WHERE topic LIKE 'bead.%.status'  -- All bead status updates

5. Saga Orchestration Pattern

CREATE TABLE saga_instances (
  id SERIAL PRIMARY KEY,
  saga_id VARCHAR(255) UNIQUE NOT NULL,  -- e.g., "epic-bd-123"
  coordinator VARCHAR(255) NOT NULL,
  status VARCHAR(50) NOT NULL,  -- pending, in_progress, completed, failed, compensating
  data JSONB NOT NULL,
  created_at TIMESTAMP NOT NULL DEFAULT NOW(),
  updated_at TIMESTAMP NOT NULL DEFAULT NOW()
);

CREATE TABLE saga_steps (
  id SERIAL PRIMARY KEY,
  saga_id VARCHAR(255) REFERENCES saga_instances(saga_id),
  step_name VARCHAR(255) NOT NULL,
  agent VARCHAR(255) NOT NULL,
  status VARCHAR(50) NOT NULL,  -- pending, completed, failed, compensated
  compensation_for INTEGER REFERENCES saga_steps(id),  -- Undo for this step
  data JSONB,
  completed_at TIMESTAMP
);

Saga workflow:

  1. Coordinator creates saga_instance
  2. Sends step messages to worker agents
  3. Workers complete steps, update saga_steps
  4. On failure, coordinator sends compensation messages
  5. Workers execute compensation (undo) logic

Consequences

Easier

  • Priority handling - Critical messages processed first
  • Resilience - Failed messages retry or move to DLQ
  • TTL enforcement - Stale messages auto-expire
  • Event broadcasting - Pub/sub topics for loosely coupled agents
  • Long-running workflows - Saga pattern for multi-agent coordination

More Difficult

  • Schema complexity - More tables, indexes, constraints
  • Background jobs - TTL cleanup requires cron/scheduler
  • Testing - More failure modes to test (retries, DLQ, compensations)
  • Debugging - Saga state tracking adds visibility requirements

Implementation Notes

Phase 1: Priority Queues

  • Add priority column with default=1 (existing messages unaffected)
  • Update getInbox() to ORDER BY priority DESC
  • Add priority parameter to sendMessage()

Phase 2: DLQ + Retries

  • Create failed_messages table
  • Add retry logic to message processing
  • Exponential backoff: 1min, 5min, 30min, DLQ

Phase 3: TTL

  • Add expires_at column
  • Background cleanup job (run every 5 minutes)
  • Filter expired messages in queries

Phase 4: Pub/Sub

  • Add topic column
  • Update sendMessage() to accept topic
  • Add subscribeToTopic() using live queries

Phase 5: Sagas (Future)

  • Create saga tables
  • Add saga coordinator logic
  • Implement compensation pattern

Success Criteria

  • Priority messages processed before normal
  • Failed messages retry 3x before DLQ
  • Expired messages cleaned up within 5 minutes
  • Topic subscriptions work with wildcards
  • Saga pattern demonstrated with 3+ step workflow

On this page