Bus Patterns
Advanced bus features that give extensions and services fine-grained control over message routing, event transformation, and handler composition.
Scoped Bus
A scoped bus restricts operations to a single namespace. It validates that
every on(), emit(), and request() call targets a subject owned by that
namespace:
const agentBus = await AgentNamespace.scopedBus();
// These work — AgentSubjects belongs to the agent namespaceagentBus.on(AgentSubjects.started, handler);await agentBus.emit(AgentSubjects.started, payload);
// This would be a type error — AdapterSubjects belongs to a different namespace// agentBus.on(AdapterSubjects.initialized, handler);Get a scoped bus from a namespace:
const scopedBus = await MyNamespace.scopedBus();The scopedBus() method preserves full type safety — including typed
withFilter() keys derived from the namespace’s payload schemas.
When to use: Adapter implementations use scoped buses so they cannot accidentally subscribe to or emit on subjects they do not own. Extensions that register their own namespace should do the same — it is a guardrail, not a limitation.
Filtered Bus
A filtered bus pre-applies a payload filter to every subscription. Without
it, every handler would need to pass a { filter: { agentId } } option
individually — repetitive and easy to forget. A filtered bus applies the filter
once and every handler registered through it inherits it automatically:
const agentBus = bus.withFilter({ agentId: 'agent-1' });
// All three handlers only fire when payload.agentId === 'agent-1'// — no per-handler filter option neededagentBus.on(AgentSubjects.sendMessage, handleMessage);agentBus.on(AgentSubjects.getCapabilities, handleCapabilities);agentBus.on(AgentSubjects.cwd.change, handleCwdChange);Each agent creates a filtered bus scoped to its own agentId, then registers
all its handlers through it. The filter is type-safe —
FilterablePayloadIntersection computes the union of all payload fields across
all subjects in the namespace, so TypeScript knows which fields are valid filter
keys.
Filters compose:
const sessionBus = bus.withFilter({ sessionId: 'session-1' });const agentSessionBus = sessionBus.withFilter({ agentId: 'agent-1' });// Both filters apply: sessionId AND agentId must matchWhen to use: Any time your extension handles events for a specific entity
(agent, session, adapter). Instead of checking if (payload.agentId === myId)
in every handler, create a filtered bus once and register all handlers through it.
Interceptors
Interceptors are event-only middleware that run before handlers. They can transform payloads or block propagation:
const unsub = bus.intercept(AgentSubjects.message_delta, (ctx) => { // Transform the payload for all downstream handlers ctx.replacePayload({ ...ctx.payload, text: sanitize(ctx.payload.text), });});
// Or block the event entirelybus.intercept(AgentSubjects.started, (ctx) => { if (isBlocked(ctx.payload.agentId)) { ctx.stopPropagation(); }}, { priority: 1000 }); // Higher priority = runs firstInterceptors run sequentially in priority order. Continuation is implicit —
each interceptor runs unless a previous one called stopPropagation().
stopPropagation() prevents all subsequent interceptors and all event handlers
from firing.
Extension use cases:
- Content filtering: Sanitize or redact sensitive data in streaming deltas before other subscribers see it
- Rate limiting: Block events that exceed a threshold
- Telemetry enrichment: Add metadata to events before they reach handlers
- Feature gating: Suppress events for disabled features
Interceptors are event-only. For request/response middleware, use the
ctx.next() chaining pattern described below.
Handler Priority and Chaining
Handlers run in priority order (highest first). A handler can call ctx.next()
to delegate to the next handler in the chain:
// Priority 100 — runs first, can wrap lower-priority handlersbus.on(SessionStorageSubjects.get, async (ctx) => { const cached = cache.get(ctx.payload.sessionId); if (cached) { ctx.setResult({ session: cached }); return; } // Delegate to the next handler (e.g., database) await ctx.next();}, { priority: 100 });
// Priority 0 (default) — the database handlerbus.on(SessionStorageSubjects.get, async (ctx) => { const session = await db.query(ctx.payload.sessionId); ctx.setResult({ session });});This is how the framework layers caching, enrichment, and storage without coupling them. The priority chain is also how request handlers interleave across local and remote transports.
Extension use cases:
- Caching layer: Your extension adds a cache in front of an existing storage
handler — register at a higher priority, serve from cache when possible,
ctx.next()on miss - Validation layer: Validate or enrich request payloads before the primary handler sees them
- Audit logging: Observe and log requests at high priority, then always
ctx.next()to let the real handler run - Override behavior: Replace a framework handler’s response entirely for
specific payloads by setting the result without calling
ctx.next()
Combining Patterns
These patterns compose naturally:
// Scoped to your namespace + filtered to your entity + high priorityconst myBus = await MyExtensionNamespace.scopedBus();const entityBus = myBus.withFilter({ entityId: myEntity.id });
entityBus.on(MyExtensionSubjects.process, async (ctx) => { // Only fires for your namespace + your entity // Can still use ctx.next() for priority chaining await ctx.next();}, { priority: 50 });The bus does not force you into one pattern — scope where you want isolation, filter where you want entity targeting, chain where you want layered behavior.