🐝Swarm Tools
Decisions

ADR-003: Performance Improvements

Replace polling with PGLite live queries, add batch operations

ADR-003: Performance Improvements via Live Queries and Batching

Status: Proposed
Date: December 2024

Context

Current Swarm Mail implementation uses polling for real-time updates, which is inefficient:

  • getInbox() polls every 500ms to detect new messages
  • File reservation conflicts checked via repeated queries
  • High CPU usage in background polling loops
  • Latency: 250-500ms average between event and notification

Research shows PGLite provides two superior alternatives to polling:

  1. LISTEN/NOTIFY - PostgreSQL pub/sub via pg.listen() and pg.unlisten()
  2. Live Queries - Incremental query results via live.incrementalQuery() (RECOMMENDED)

Additionally, batch operations can reduce transaction overhead:

  • Current: Individual transactions per message (N transactions)
  • Improved: Batched inserts via .transaction() or .exec() (1 transaction)

Decision

Replace Polling with Live Queries

Why Live Queries over LISTEN/NOTIFY:

  • Simpler API - returns incremental result sets automatically
  • No manual query re-execution needed
  • Built-in change detection
  • Works with existing SQL queries

Implementation:

// Before (polling):
async function watchInbox(
  agentName: string,
  callback: (messages: Message[]) => void,
) {
  const interval = setInterval(async () => {
    const messages = await getInbox(agentName, { limit: 5 });
    callback(messages);
  }, 500);
  return () => clearInterval(interval);
}

// After (live query):
import { live } from "@electric-sql/pglite/live";

async function watchInbox(
  agentName: string,
  callback: (messages: Message[]) => void,
) {
  const liveQuery = await db.live.incrementalQuery(
    `SELECT id, sender, subject, importance, received_at 
     FROM messages 
     WHERE recipient = $1 AND read_at IS NULL
     ORDER BY importance DESC, received_at DESC
     LIMIT 5`,
    [agentName],
  );

  liveQuery.subscribe(({ rows }) => {
    callback(rows as Message[]);
  });

  return () => liveQuery.unsubscribe();
}

Projection Updates:

// Watch for new events in real-time
const liveEvents = await db.live.incrementalQuery(
  `SELECT * FROM events WHERE sequence > $1 ORDER BY sequence ASC`,
  [lastProcessedSequence],
);

liveEvents.subscribe(({ rows }) => {
  rows.forEach((event) => updateProjection(event));
});

Batch Operations for Multi-Insert

Use .transaction() for Multiple Related Operations:

// Before: N transactions
for (const message of messages) {
  await sendMessage(message); // 1 transaction each
}

// After: 1 transaction
await db.transaction(async (tx) => {
  for (const message of messages) {
    await tx.query(
      `INSERT INTO messages (sender, recipient, subject, body) VALUES ($1, $2, $3, $4)`,
      [message.sender, message.recipient, message.subject, message.body],
    );
  }
});

Use .exec() for Batch SQL:

// Atomic multi-statement execution
await db.exec(`
  BEGIN;
  INSERT INTO messages (sender, recipient, subject) VALUES ('Alice', 'Bob', 'Test 1');
  INSERT INTO messages (sender, recipient, subject) VALUES ('Alice', 'Bob', 'Test 2');
  INSERT INTO messages (sender, recipient, subject) VALUES ('Alice', 'Bob', 'Test 3');
  COMMIT;
`);

Connection Pooling Decision

NOT NEEDED for PGLite:

  • PGLite runs in-process (WASM embedded database)
  • Single-user design (no concurrent connections)
  • Connection pooling is for client-server PostgreSQL
  • Would add unnecessary complexity

Consequences

Easier

  • Real-time updates - Sub-millisecond latency for new messages/events
  • Lower CPU usage - No background polling loops
  • Fewer queries - Live queries push changes, don't pull
  • Atomic batches - Multi-message sends in 1 transaction
  • Simplified code - No interval management, cleanup handled by unsubscribe

More Difficult

  • Subscription management - Must track and unsubscribe live queries
  • Memory usage - Live queries hold result sets in memory
  • Testing complexity - Async subscriptions harder to test than sync polls
  • Debugging - Push-based updates less visible in logs

Performance Impact (Estimated)

MetricBefore (Polling)After (Live Queries)Improvement
Notification latency250-500ms<10ms25-50x faster
CPU usage (idle)5-10% (polling)<1% (event-driven)5-10x reduction
Queries per second2-4 (polling)0 (push)Eliminated
Transaction overheadN (individual)1 (batched)N speedup
Memory usageLowMedium (result caching)+10-20%

Risks & Mitigations

RiskImpactMitigation
Memory leaks from subscriptionsHighEnforce unsubscribe in cleanup, add timeout guards
Live queries fail on syntax errorHighValidate SQL in tests, fallback to polling on error
Large result sets in memoryMediumHard limit on LIMIT clause (max 100 rows)
Subscription overhead at scaleMediumPool subscriptions, deduplicate queries
Debugging push updatesLowAdd subscription logging, event stream tracing

Implementation Notes

Phase 1: Live Query Infrastructure (Week 1)

1.1 Create Live Query Wrapper

// src/streams/live-query.ts
import { live } from "@electric-sql/pglite/live";
import type { PGlite } from "@electric-sql/pglite";

interface LiveQueryOptions<T> {
  db: PGlite;
  query: string;
  params: unknown[];
  onUpdate: (rows: T[]) => void;
  onError?: (error: Error) => void;
}

export async function createLiveQuery<T>(options: LiveQueryOptions<T>) {
  const { db, query, params, onUpdate, onError } = options;

  try {
    const liveQuery = await db.live.incrementalQuery(query, params);

    const unsubscribe = liveQuery.subscribe({
      next: ({ rows }) => onUpdate(rows as T[]),
      error: onError || ((err) => console.error("Live query error:", err)),
    });

    return {
      unsubscribe,
      refresh: () => liveQuery.refresh(),
    };
  } catch (error) {
    if (onError) onError(error as Error);
    throw error;
  }
}

1.2 Add Cleanup Tracking

// Track active subscriptions for cleanup
const activeSubscriptions = new Set<() => void>();

export function registerSubscription(unsubscribe: () => void) {
  activeSubscriptions.add(unsubscribe);
}

export function cleanupAllSubscriptions() {
  activeSubscriptions.forEach((unsub) => unsub());
  activeSubscriptions.clear();
}

// Call on shutdown
process.on("SIGTERM", cleanupAllSubscriptions);

Success Criteria

  • All polling loops removed from codebase
  • Live queries handle 100+ concurrent watchers without leaks
  • Batch operations 10x faster than individual inserts (benchmark)
  • Notification latency <50ms for new messages (99th percentile)
  • CPU usage <1% in idle state (no polling)
  • Memory usage increase <20% compared to polling
  • Integration tests pass with live queries
  • Cleanup functions properly unsubscribe all watchers

Migration Path

  1. Add live query infrastructure (non-breaking)
  2. Feature flag to toggle live queries vs polling (ENABLE_LIVE_QUERIES=true)
  3. Run both in parallel for 1 week, monitor metrics
  4. Default to live queries if metrics are better
  5. Remove polling code after 2 weeks of stable live queries
  6. Add batch operations (non-breaking performance improvement)

Fallback Strategy

If live queries prove unstable:

  • Keep polling as fallback (ENABLE_LIVE_QUERIES=false)
  • Add exponential backoff to polling (reduce CPU)
  • Consider hybrid: live queries for critical paths, polling for non-critical

On this page