Decisions
ADR-003: Performance Improvements
Replace polling with PGLite live queries, add batch operations
ADR-003: Performance Improvements via Live Queries and Batching
Status: Proposed
Date: December 2024
Context
Current Swarm Mail implementation uses polling for real-time updates, which is inefficient:
getInbox()polls every 500ms to detect new messages- File reservation conflicts checked via repeated queries
- High CPU usage in background polling loops
- Latency: 250-500ms average between event and notification
Research shows PGLite provides two superior alternatives to polling:
- LISTEN/NOTIFY - PostgreSQL pub/sub via
pg.listen()andpg.unlisten() - Live Queries - Incremental query results via
live.incrementalQuery()(RECOMMENDED)
Additionally, batch operations can reduce transaction overhead:
- Current: Individual transactions per message (N transactions)
- Improved: Batched inserts via
.transaction()or.exec()(1 transaction)
Decision
Replace Polling with Live Queries
Why Live Queries over LISTEN/NOTIFY:
- Simpler API - returns incremental result sets automatically
- No manual query re-execution needed
- Built-in change detection
- Works with existing SQL queries
Implementation:
// Before (polling):
async function watchInbox(
agentName: string,
callback: (messages: Message[]) => void,
) {
const interval = setInterval(async () => {
const messages = await getInbox(agentName, { limit: 5 });
callback(messages);
}, 500);
return () => clearInterval(interval);
}
// After (live query):
import { live } from "@electric-sql/pglite/live";
async function watchInbox(
agentName: string,
callback: (messages: Message[]) => void,
) {
const liveQuery = await db.live.incrementalQuery(
`SELECT id, sender, subject, importance, received_at
FROM messages
WHERE recipient = $1 AND read_at IS NULL
ORDER BY importance DESC, received_at DESC
LIMIT 5`,
[agentName],
);
liveQuery.subscribe(({ rows }) => {
callback(rows as Message[]);
});
return () => liveQuery.unsubscribe();
}Projection Updates:
// Watch for new events in real-time
const liveEvents = await db.live.incrementalQuery(
`SELECT * FROM events WHERE sequence > $1 ORDER BY sequence ASC`,
[lastProcessedSequence],
);
liveEvents.subscribe(({ rows }) => {
rows.forEach((event) => updateProjection(event));
});Batch Operations for Multi-Insert
Use .transaction() for Multiple Related Operations:
// Before: N transactions
for (const message of messages) {
await sendMessage(message); // 1 transaction each
}
// After: 1 transaction
await db.transaction(async (tx) => {
for (const message of messages) {
await tx.query(
`INSERT INTO messages (sender, recipient, subject, body) VALUES ($1, $2, $3, $4)`,
[message.sender, message.recipient, message.subject, message.body],
);
}
});Use .exec() for Batch SQL:
// Atomic multi-statement execution
await db.exec(`
BEGIN;
INSERT INTO messages (sender, recipient, subject) VALUES ('Alice', 'Bob', 'Test 1');
INSERT INTO messages (sender, recipient, subject) VALUES ('Alice', 'Bob', 'Test 2');
INSERT INTO messages (sender, recipient, subject) VALUES ('Alice', 'Bob', 'Test 3');
COMMIT;
`);Connection Pooling Decision
NOT NEEDED for PGLite:
- PGLite runs in-process (WASM embedded database)
- Single-user design (no concurrent connections)
- Connection pooling is for client-server PostgreSQL
- Would add unnecessary complexity
Consequences
Easier
- Real-time updates - Sub-millisecond latency for new messages/events
- Lower CPU usage - No background polling loops
- Fewer queries - Live queries push changes, don't pull
- Atomic batches - Multi-message sends in 1 transaction
- Simplified code - No interval management, cleanup handled by unsubscribe
More Difficult
- Subscription management - Must track and unsubscribe live queries
- Memory usage - Live queries hold result sets in memory
- Testing complexity - Async subscriptions harder to test than sync polls
- Debugging - Push-based updates less visible in logs
Performance Impact (Estimated)
| Metric | Before (Polling) | After (Live Queries) | Improvement |
|---|---|---|---|
| Notification latency | 250-500ms | <10ms | 25-50x faster |
| CPU usage (idle) | 5-10% (polling) | <1% (event-driven) | 5-10x reduction |
| Queries per second | 2-4 (polling) | 0 (push) | Eliminated |
| Transaction overhead | N (individual) | 1 (batched) | N speedup |
| Memory usage | Low | Medium (result caching) | +10-20% |
Risks & Mitigations
| Risk | Impact | Mitigation |
|---|---|---|
| Memory leaks from subscriptions | High | Enforce unsubscribe in cleanup, add timeout guards |
| Live queries fail on syntax error | High | Validate SQL in tests, fallback to polling on error |
| Large result sets in memory | Medium | Hard limit on LIMIT clause (max 100 rows) |
| Subscription overhead at scale | Medium | Pool subscriptions, deduplicate queries |
| Debugging push updates | Low | Add subscription logging, event stream tracing |
Implementation Notes
Phase 1: Live Query Infrastructure (Week 1)
1.1 Create Live Query Wrapper
// src/streams/live-query.ts
import { live } from "@electric-sql/pglite/live";
import type { PGlite } from "@electric-sql/pglite";
interface LiveQueryOptions<T> {
db: PGlite;
query: string;
params: unknown[];
onUpdate: (rows: T[]) => void;
onError?: (error: Error) => void;
}
export async function createLiveQuery<T>(options: LiveQueryOptions<T>) {
const { db, query, params, onUpdate, onError } = options;
try {
const liveQuery = await db.live.incrementalQuery(query, params);
const unsubscribe = liveQuery.subscribe({
next: ({ rows }) => onUpdate(rows as T[]),
error: onError || ((err) => console.error("Live query error:", err)),
});
return {
unsubscribe,
refresh: () => liveQuery.refresh(),
};
} catch (error) {
if (onError) onError(error as Error);
throw error;
}
}1.2 Add Cleanup Tracking
// Track active subscriptions for cleanup
const activeSubscriptions = new Set<() => void>();
export function registerSubscription(unsubscribe: () => void) {
activeSubscriptions.add(unsubscribe);
}
export function cleanupAllSubscriptions() {
activeSubscriptions.forEach((unsub) => unsub());
activeSubscriptions.clear();
}
// Call on shutdown
process.on("SIGTERM", cleanupAllSubscriptions);Success Criteria
- All polling loops removed from codebase
- Live queries handle 100+ concurrent watchers without leaks
- Batch operations 10x faster than individual inserts (benchmark)
- Notification latency <50ms for new messages (99th percentile)
- CPU usage <1% in idle state (no polling)
- Memory usage increase <20% compared to polling
- Integration tests pass with live queries
- Cleanup functions properly unsubscribe all watchers
Migration Path
- Add live query infrastructure (non-breaking)
- Feature flag to toggle live queries vs polling (
ENABLE_LIVE_QUERIES=true) - Run both in parallel for 1 week, monitor metrics
- Default to live queries if metrics are better
- Remove polling code after 2 weeks of stable live queries
- Add batch operations (non-breaking performance improvement)
Fallback Strategy
If live queries prove unstable:
- Keep polling as fallback (
ENABLE_LIVE_QUERIES=false) - Add exponential backoff to polling (reduce CPU)
- Consider hybrid: live queries for critical paths, polling for non-critical