Type Alias: IMakaioBus<NamespaceDomain, Subjects, StrictNamespace>
Makaio Framework / bus-core / IMakaioBus
Type Alias: IMakaioBus<NamespaceDomain, Subjects, StrictNamespace>
Section titled “Type Alias: IMakaioBus<NamespaceDomain, Subjects, StrictNamespace>”IMakaioBus<
NamespaceDomain,Subjects,StrictNamespace> =object
Defined in: ../../../packages/bus-core/src/types/bus.ts:95
Type Parameters
Section titled “Type Parameters”NamespaceDomain
Section titled “NamespaceDomain”NamespaceDomain extends string | unknown = unknown
Subjects
Section titled “Subjects”Subjects extends SubjectDefinition = SubjectDefinition
StrictNamespace
Section titled “StrictNamespace”StrictNamespace = { $meta: { namespace: NamespaceDomain extends string ? NamespaceDomain : string; }; }
Properties
Section titled “Properties”__onAny
Section titled “__onAny”__onAny: (
handler) => () =>void
Defined in: ../../../packages/bus-core/src/types/bus.ts:500
Register a handler that receives ALL messages (events and requests) across all namespaces.
Debugging/Testing Only: Noops in production (process.env.NODE_ENV === ‘production’). Useful for logging, debugging, and test assertions that need visibility into all bus activity.
Handler receives complete metadata: type, subject, namespace, payload, messageId, correlationId.
Parameters
Section titled “Parameters”handler
Section titled “handler”AnyHandler
Function to invoke for every message
Returns
Section titled “Returns”Unsubscribe function (noop in production)
() => void
Example
Section titled “Example”const unsubscribe = bus.__onAny((context) => { console.debug(`[${context.type}] ${context.namespace}:${context.subject}`, context.payload);});__resetHandlers?
Section titled “__resetHandlers?”
optional__resetHandlers?: () =>void
Defined in: ../../../packages/bus-core/src/types/bus.ts:501
Returns
Section titled “Returns”void
namespace
Section titled “namespace”namespace:
NamespaceDomain
Defined in: ../../../packages/bus-core/src/types/bus.ts:104
readonlyready:Promise<void>
Defined in: ../../../packages/bus-core/src/types/bus.ts:550
Resolves when all transports registered at connect-time have completed
subscribe-sync. If connect was called with the default awaitReady: true,
this is already resolved when connect() returns. If awaitReady: false was used,
await this to ensure readiness before dispatching requests.
Resolves immediately if no transports are registered or connect() has not been called.
scoped
Section titled “scoped”scoped: <
Domain,Subjects,F,Sc>(input,context?) =>ScopedBus<Domain>
Defined in: ../../../packages/bus-core/src/types/bus.ts:457
Type Parameters
Section titled “Type Parameters”Domain
Section titled “Domain”Domain extends string
Subjects
Section titled “Subjects”Subjects extends SubjectRecord
F
Sc extends Record<string, SubjectSchema>
Parameters
Section titled “Parameters”BusNamespace<Domain, Subjects, F, Sc>
context?
Section titled “context?”Returns
Section titled “Returns”ScopedBus<Domain>
withFilter
Section titled “withFilter”withFilter: <
Payload>(filter) =>IFilteredBus<NamespaceDomainextendsstring?NamespaceDomain:string>
Defined in: ../../../packages/bus-core/src/types/bus.ts:480
Create a filtered bus with a base payload filter.
The filter is automatically applied to all on() and once() calls.
Optionally provide a type parameter for type-safe filter keys.
Type Parameters
Section titled “Type Parameters”Payload
Section titled “Payload”Payload = unknown
Parameters
Section titled “Parameters”filter
Section titled “filter”[unknown] extends [Payload] ? PayloadFilter : TypedPayloadFilter<Payload>
Base filter to apply to all subscriptions
Returns
Section titled “Returns”IFilteredBus<NamespaceDomain extends string ? NamespaceDomain : string>
FilteredBus with the specified filter
Example
Section titled “Example”// Untyped (loose) - any keys allowedconst agentBus = MakaioBus.withFilter({ agentId: this.agentId });
// Type-safe filter keysinterface AgentPayload { agentId: string; sessionId: string }const strictBus = MakaioBus.withFilter<AgentPayload>({ agentId: 'x' });Methods
Section titled “Methods”broadcast()
Section titled “broadcast()”broadcast<
Subject,IsRequest,IsChannel>(subject,payload,options?):Promise<BroadcastResult<IsRequestextendstrue?Subject["$meta"]["payload"]["response"] :never>[]>
Defined in: ../../../packages/bus-core/src/types/bus.ts:447
Execute a broadcast request and collect responses from ALL handlers.
Unlike request() which uses a middleware chain and returns the first result,
broadcast() executes all handlers in parallel and aggregates their responses.
Use for discovery patterns where multiple nodes may respond (e.g., fs.listSources).
Note: You cannot broadcast via wildcard patterns. Use concrete subject keys only. Handlers registered with wildcards will still match if the subject matches their pattern.
Handler Usage:
Handlers should call ctx.identify(nodeId) before ctx.setResult() to tag their response.
If identify() is not called, the response is tagged as ‘anonymous’.
Type Parameters
Section titled “Type Parameters”Subject
Section titled “Subject”Subject extends SubjectDefinition
IsRequest
Section titled “IsRequest”IsRequest = Subject["$meta"]["isRequest"]
IsChannel
Section titled “IsChannel”IsChannel = Subject["$meta"]["channel"]
Parameters
Section titled “Parameters”subject
Section titled “subject”IsChannel extends true ? never : IsRequest extends true ? Subject : never
Concrete request subject (wildcards not allowed)
payload
Section titled “payload”Subject["$meta"]["payload"]["request"]
Request payload
options?
Section titled “options?”RequestOptions
Request options (timeout, correlationId)
Returns
Section titled “Returns”Promise<BroadcastResult<IsRequest extends true ? Subject["$meta"]["payload"]["response"] : never>[]>
Array of { nodeId, payload } responses from all handlers
Example
Section titled “Example”// Discover all filesystem sources from all nodesconst results = await MakaioBus.broadcast(FileSystemSubjects.listSources, \{\});// results: [// \{ nodeId: 'local', payload: \{ sources: [...] \} \},// \{ nodeId: 'container-1', payload: \{ sources: [...] \} \},// ]
// Aggregate sourcesconst allSources = results.flatMap(r => r.payload.sources);connect()
Section titled “connect()”connect(
options?):Promise<void>
Defined in: ../../../packages/bus-core/src/types/bus.ts:566
Connect all registered transports and optionally await subscribe-sync readiness.
Calls transport.connect() on every transport registered by this bus instance.
If any transport fails to connect, all transports are disconnected and unregistered
(rollback) before the error is re-thrown. Pass { awaitReady: false } to resolve
as soon as sockets are open without waiting for the subscribe-sync handshake.
Concurrent calls are safe — a second in-flight call awaits the same promise. Once
sockets are open, subsequent calls are no-ops unless a prior
connect({ awaitReady: false }) left the readiness handshake pending; in that case,
default connect() still awaits bus.ready.
Parameters
Section titled “Parameters”options?
Section titled “options?”Connection options (see ConnectOptions)
Returns
Section titled “Returns”Promise<void>
Throws
Section titled “Throws”If any transport’s connect() or ready promise rejects (after rollback)
disconnect()
Section titled “disconnect()”disconnect():
void
Defined in: ../../../packages/bus-core/src/types/bus.ts:528
Disconnect all registered transports and clear the transport map.
Convenience method for tearing down a bus instance. The inverse of
passing transports to createBusInstance() or calling
registerTransport() individually.
Returns
Section titled “Returns”void
emit()
Section titled “emit()”emit<
Subject,IsRequest,IsWildcard,IsChannel>(subject,payload,options?):Promise<void>
Defined in: ../../../packages/bus-core/src/types/bus.ts:332
Emit an event to all registered handlers.
Events are fire-and-forget - all handlers execute in parallel. Handler errors are logged but don’t stop other handlers from executing.
Note: You cannot emit to wildcard patterns. Use concrete subject keys only. Handlers registered with wildcards will still receive the event if it matches.
Type Parameters
Section titled “Type Parameters”Subject
Section titled “Subject”Subject extends SubjectDefinition
IsRequest
Section titled “IsRequest”IsRequest = Subject["$meta"]["isRequest"]
IsWildcard
Section titled “IsWildcard”IsWildcard = Subject["subject"] extends "*" ? true : false
IsChannel
Section titled “IsChannel”IsChannel = Subject["$meta"]["channel"]
Parameters
Section titled “Parameters”subject
Section titled “subject”IsChannel extends true ? never : IsRequest extends false ? IsWildcard extends false ? Subject : never : never
Concrete event subject (wildcards not allowed)
payload
Section titled “payload”Subject["$meta"]["payload"]
Event payload
options?
Section titled “options?”EmitOptions
Emit options (messageId, correlationId, transports)
Transport Routing
Section titled “Transport Routing”transports: undefined- Send to ALL registered transports (default)transports: []- Local only, don’t send to any transportstransports: ['ws', 'nats']- Send only to specified transports
Returns
Section titled “Returns”Promise<void>
Example
Section titled “Example”const { subjects: AgentSubjects } = MakaioBus.registerNamespace('agent', { started: z.object({ agentId: z.string() }),});
// Send to all transports (default)await emit(AgentSubjects.started, { agentId: 'agent-123' });
// Local only, no transportsawait emit( AgentSubjects.started, { agentId: 'agent-123' }, { transports: [] });
// Send to specific transportsawait emit( AgentSubjects.started, { agentId: 'agent-123' }, { transports: ['websocket'] });
// With tracking IDs:await emit( AgentSubjects.started, { agentId: 'agent-123' }, { correlationId: 'user-action-123' });extendSubject()
Section titled “extendSubject()”extendSubject<
SD,Ext>(subject,extensions):ExtendedSubjectDefinition<SD,Ext>
Defined in: ../../../packages/bus-core/src/types/bus.ts:598
Extend a registered subject’s schema with additional fields.
Adds new root-level fields to the Zod schema used for dev-mode validation and widens the TypeScript payload type. Successive calls accumulate — two packages can independently extend the same subject without overwriting each other.
The returned value is the same runtime SubjectDefinition object — only the TypeScript type is widened. Bus routing is unaffected.
Type Parameters
Section titled “Type Parameters”SD extends SubjectDefinition
Ext extends RequestSubjectExtension | EventSubjectExtension
Parameters
Section titled “Parameters”subject
Section titled “subject”SD
SubjectDefinition from a registered namespace
extensions
Section titled “extensions”Ext
For request subjects: { request?: { field: z.string() }, response?: {...} }.
For event subjects: { field: z.string() } (flat record of additional Zod fields).
Returns
Section titled “Returns”ExtendedSubjectDefinition<SD, Ext>
The same SubjectDefinition with wider TypeScript types
getContext()
Section titled “getContext()”getContext():
MakaioBusContext
Defined in: ../../../packages/bus-core/src/types/bus.ts:568
Returns
Section titled “Returns”getSchema()
Section titled “getSchema()”getSchema<
T>(subject):SubjectSchema|undefined
Defined in: ../../../packages/bus-core/src/types/bus.ts:582
Get the schema for a registered subject, or undefined if not found.
Type Parameters
Section titled “Type Parameters”T extends SubjectDefinition
Parameters
Section titled “Parameters”subject
Section titled “subject”string | T
Returns
Section titled “Returns”SubjectSchema | undefined
intercept()
Section titled “intercept()”intercept<
Subject>(subject,handler,options?): () =>void
Defined in: ../../../packages/bus-core/src/types/bus.ts:167
Register an interceptor that runs BEFORE handlers (payload transform, blocking, priority).
Type Parameters
Section titled “Type Parameters”Subject
Section titled “Subject”Subject extends SubjectDefinition
Parameters
Section titled “Parameters”subject
Section titled “subject”Subject
handler
Section titled “handler”InterceptorHandler<Subject["$meta"]["payload"]>
options?
Section titled “options?”InterceptOptions
Returns
Section titled “Returns”() => void
on<
Subject,IsChannel>(subject,handler,options?): () =>void
Defined in: ../../../packages/bus-core/src/types/bus.ts:160
Register an event or request handler.
Events: Fire-and-forget - multiple handlers can listen and execute in parallel. Requests: Request-response with middleware support - handlers form a chain.
Wildcard Support: Use .$all property on subjects to match all subjects in a namespace.
Wildcard handlers receive unknown payload and must use type guards.
Type Parameters
Section titled “Type Parameters”Subject
Section titled “Subject”Subject extends SubjectDefinition
IsChannel
Section titled “IsChannel”IsChannel = Subject["$meta"]["channel"]
Parameters
Section titled “Parameters”subject
Section titled “subject”IsChannel extends true ? never : Subject
SubjectDefinition object (exact or wildcard)
handler
Section titled “handler”Subject extends SubjectDefinition ? HandlerForSubjectDefinition<Subject> : never
Handler function (EventHandler for events, RequestHandler for requests)
options?
Section titled “options?”Returns
Section titled “Returns”Unsubscribe function
() => void
Examples
Section titled “Examples”const { subjects: AgentSubjects } = MakaioBus.registerNamespace('agent', { started: z.object({ agentId: z.string() }),});
const unsubscribe = on(AgentSubjects.started, (context) => { context.payload.agentId; // ✅ string - fully typed console.debug('Agent started:', context.payload.agentId);});// Matches all subjects in namespaceon(AgentSubjects.$all, (context) => { context.payload; // unknown - must use type guards if ('agentId' in context.payload) { console.debug('Any agent event:', context.payload.agentId); }});const { subjects: AgentSubjects } = MakaioBus.registerNamespace('agent', { toolApprove: { request: z.object({ toolName: z.string() }), response: z.object({ approved: z.boolean() }), },});
on(AgentSubjects.toolApprove, (context) => { const { toolName } = context.payload; // ✅ fully typed context.setResult({ approved: true });});on(AgentSubjects.toolApprove, async (context) => { console.debug('Before approval'); await context.next(); console.debug('After approval');});once()
Section titled “once()”Call Signature
Section titled “Call Signature”once<
Subject,IsChannel>(subject,handler): () =>void
Defined in: ../../../packages/bus-core/src/types/bus.ts:235
Register a one-time event or request handler that auto-unsubscribes after first invocation.
Wraps the on() method to automatically unsubscribe after the handler fires once.
The handler is removed BEFORE being invoked to prevent re-entrance issues if the
handler triggers the same event.
Events: Fire-and-forget - handler executes once then auto-unsubscribes. Requests: Request-response - handler executes once then auto-unsubscribes.
Wildcard Support: Like on(), supports .$all property for namespace-level patterns.
Type Parameters
Section titled “Type Parameters”Subject
Section titled “Subject”Subject extends SubjectDefinition
IsChannel
Section titled “IsChannel”IsChannel = Subject["$meta"]["channel"]
Parameters
Section titled “Parameters”subject
Section titled “subject”IsChannel extends true ? never : Subject
SubjectDefinition object (exact or wildcard)
handler
Section titled “handler”Subject extends SubjectDefinition ? HandlerForSubjectDefinition<Subject> : never
Handler function (EventHandler for events, RequestHandler for requests)
Returns
Section titled “Returns”Unsubscribe function for manual cleanup if needed
() => void
Examples
Section titled “Examples”const { subjects: AgentSubjects } = MakaioBus.registerNamespace('agent', { started: z.object({ agentId: z.string() }),});
once(AgentSubjects.started, (context) => { context.payload.agentId; // ✅ string - fully typed console.debug('Agent started once:', context.payload.agentId);});
await emit(AgentSubjects.started, { agentId: 'agent-123' }); // Handler firesawait emit(AgentSubjects.started, { agentId: 'agent-456' }); // Handler does NOT fireconst unsubscribe = once(AgentSubjects.started, (context) => { console.debug('This will never run');});
unsubscribe(); // Manually unsubscribe before event firesawait emit(AgentSubjects.started, { agentId: 'agent-123' }); // Handler does NOT fireonce(AgentSubjects.$all, (context) => { context.payload; // unknown - must use type guards console.debug('First agent event:', context.payload);});const { subjects: AgentSubjects } = MakaioBus.registerNamespace('agent', { toolApprove: { request: z.object({ toolName: z.string() }), response: z.object({ approved: z.boolean() }), },});
once(AgentSubjects.toolApprove, (context) => { const { toolName } = context.payload; // ✅ fully typed context.setResult({ approved: true });});
await request(AgentSubjects.toolApprove, { toolName: 'deleteFile' }); // Handler firesawait request(AgentSubjects.toolApprove, { toolName: 'createFile' }); // Handler does NOT fireCall Signature
Section titled “Call Signature”once<
Subject,IsRequest,IsChannel>(subject,options?):Promise<SubjectextendsSubjectDefinition?Parameters<HandlerForSubjectDefinition<Subject>>[0] :never>
Defined in: ../../../packages/bus-core/src/types/bus.ts:277
Wait for an event to occur, returning a Promise.
Note: Request subjects are not supported with the promise version of once().
Use the callback version for request handlers: once(subject, handler)
Type Parameters
Section titled “Type Parameters”Subject
Section titled “Subject”Subject extends SubjectDefinition
IsRequest
Section titled “IsRequest”IsRequest = Subject["$meta"]["isRequest"]
IsChannel
Section titled “IsChannel”IsChannel = Subject["$meta"]["channel"]
Parameters
Section titled “Parameters”subject
Section titled “subject”IsChannel extends true ? never : IsRequest extends false ? Subject : never
Event SubjectDefinition object (exact or wildcard)
options?
Section titled “options?”Subject extends SubjectDefinition ? OnceOptions<Subject> : never
Options object with: timeoutMs (reject after timeout), filter (only resolve when filter returns true), signal (AbortSignal to cancel waiting)
Returns
Section titled “Returns”Promise<Subject extends SubjectDefinition ? Parameters<HandlerForSubjectDefinition<Subject>>[0] : never>
Promise that resolves with the event context
Examples
Section titled “Examples”const ctx = await bus.once(Subjects.init);console.debug('Event received:', ctx.payload);try { const ctx = await bus.once(Subjects.init, { timeoutMs: 5000 });} catch (err) { if (err instanceof OnceTimeoutError) { console.debug('Timed out waiting for event'); }}const ctx = await bus.once(Subjects.message, { filter: (payload) => payload.sessionId === expectedId});// Resolves only when a message with matching sessionId is receivedconst controller = new AbortController();const promise = bus.once(Subjects.init, { signal: controller.signal });controller.abort(); // Cancels the waitreconnect()
Section titled “reconnect()”reconnect():
Promise<void>
Defined in: ../../../packages/bus-core/src/types/bus.ts:540
Trigger an immediate reconnection attempt on all disconnected transports.
Delegates to each transport’s reconnect() method if available. For
transports with exponential-backoff reconnection (e.g. WebSocket), this
wakes the backoff sleep and returns once the attempt is initiated — not
once the connection is established. For one-shot transports it resolves
after the connect attempt completes. Failures are logged but do not reject
this promise. No-op when all transports are already connected.
Returns
Section titled “Returns”Promise<void>
registerNamespace()
Section titled “registerNamespace()”registerNamespace<
Domain,Schemas>(domain,schemas,options?):BusNamespace<Domain,SubjectRecordFromSchemaRecord<Schemas>,{ [KeyType in PropertyKey]: AllPropertiesOfUnion<FilterablePayload<SubjectRecordFromSchemaRecord<Schemas>[keyof Schemas & string]>>[KeyType] },Schemas>
Defined in: ../../../packages/bus-core/src/types/bus.ts:570
Type Parameters
Section titled “Type Parameters”Domain
Section titled “Domain”Domain extends string
Schemas
Section titled “Schemas”Schemas extends Record<string, SubjectSchema>
Parameters
Section titled “Parameters”domain
Section titled “domain”Domain
schemas
Section titled “schemas”Schemas
options?
Section titled “options?”Returns
Section titled “Returns”BusNamespace<Domain, SubjectRecordFromSchemaRecord<Schemas>, { [KeyType in PropertyKey]: AllPropertiesOfUnion<FilterablePayload<SubjectRecordFromSchemaRecord<Schemas>[keyof Schemas & string]>>[KeyType] }, Schemas>
registerTransport()
Section titled “registerTransport()”registerTransport(
transport):TransportRegistration
Defined in: ../../../packages/bus-core/src/types/bus.ts:511
Register a transport by its name property.
Convenience method that delegates to getContext().transportRegistry.registerTransport().
The transport’s name property is used as the registry key.
Parameters
Section titled “Parameters”transport
Section titled “transport”Transport to register
Returns
Section titled “Returns”Registration object with unregister and ready promise
request()
Section titled “request()”request<
Subject,IsRequest,IsChannel>(subject,payload,options?):Promise<IsRequestextendstrue?Subject["$meta"]["payload"]["response"] :never>
Defined in: ../../../packages/bus-core/src/types/bus.ts:390
Execute a request and wait for a response.
Requests follow a middleware chain pattern - handlers are called in order until one calls setResult() or all handlers complete.
Note: You cannot request via wildcard patterns. Use concrete subject keys only. Handlers registered with wildcards will still match if the subject matches their pattern.
Type Parameters
Section titled “Type Parameters”Subject
Section titled “Subject”Subject extends SubjectDefinition
IsRequest
Section titled “IsRequest”IsRequest = Subject["$meta"]["isRequest"]
IsChannel
Section titled “IsChannel”IsChannel = Subject["$meta"]["channel"]
Parameters
Section titled “Parameters”subject
Section titled “subject”IsChannel extends true ? never : IsRequest extends true ? Subject : never
Concrete request subject (wildcards not allowed)
payload
Section titled “payload”Subject["$meta"]["payload"]["request"]
Request payload
options?
Section titled “options?”RequestOptions
Request options (timeout, correlationId, transports)
Returns
Section titled “Returns”Promise<IsRequest extends true ? Subject["$meta"]["payload"]["response"] : never>
Response value
Throws
Section titled “Throws”{NoHandlerError} If no handler is registered
Throws
Section titled “Throws”{TimeoutError} If request times out
Throws
Section titled “Throws”{ValidationError} If payload validation fails
Throws
Section titled “Throws”{RequestError} If handler throws an error
Transport Routing
Section titled “Transport Routing”transports: undefined- Send to ALL registered transports (default)transports: []- Local only, don’t send to any transportstransports: ['ws', 'nats']- Send only to specified transports
Example
Section titled “Example”// ✅ Concrete subjectconst result = await request( AgentSubjects.toolApprove, { toolName: 'deleteFile', args: {}, toolCallId: 'call_123' }, { timeout: 10000 });console.debug(result.approved);
// ❌ Cannot use wildcards// await request('agent.*', { ... }); // Type error
// With specific transportsconst result = await request( AgentSubjects.toolApprove, { toolName: 'deleteFile', args: {}, toolCallId: 'call_123' }, { transports: ['websocket'], timeout: 10000 });requestOptional()
Section titled “requestOptional()”requestOptional<
Subject,IsRequest,IsChannel>(subject,payload,options?):Promise<OptionalResult<IsRequestextendstrue?Subject["$meta"]["payload"]["response"] :never>>
Defined in: ../../../packages/bus-core/src/types/bus.ts:406
Execute a request, returning a discriminated union instead of throwing for missing handlers.
Use for optional services. Only NoHandlerError is caught - other errors propagate.
Type Parameters
Section titled “Type Parameters”Subject
Section titled “Subject”Subject extends SubjectDefinition
IsRequest
Section titled “IsRequest”IsRequest = Subject["$meta"]["isRequest"]
IsChannel
Section titled “IsChannel”IsChannel = Subject["$meta"]["channel"]
Parameters
Section titled “Parameters”subject
Section titled “subject”IsChannel extends true ? never : IsRequest extends true ? Subject : never
payload
Section titled “payload”Subject["$meta"]["payload"]["request"]
options?
Section titled “options?”RequestOptions
Returns
Section titled “Returns”Promise<OptionalResult<IsRequest extends true ? Subject["$meta"]["payload"]["response"] : never>>
OptionalResult for return type details
unregisterTransport()
Section titled “unregisterTransport()”unregisterTransport(
name):void
Defined in: ../../../packages/bus-core/src/types/bus.ts:519
Unregister a transport by name.
No-op if no transport is registered under that name.
Parameters
Section titled “Parameters”string
Transport name to unregister
Returns
Section titled “Returns”void
Type Composition
Section titled “Type Composition”-
- `{ namespace: NamespaceDomain; /**
- Register an event or request handler.
- Events: Fire-and-forget - multiple handlers can listen and execute in parallel.
- Requests: Request-response with middleware support - handlers form a chain.
- Wildcard Support: Use
.$allproperty on subjects to match all subjects in a namespace. - Wildcard handlers receive
unknownpayload and must use type guards. - @param subject - SubjectDefinition object (exact or wildcard)
- @param handler - Handler function (EventHandler for events, RequestHandler for requests)
- @returns Unsubscribe function
- @example Basic event handler with typed payload
-
- const { subjects: AgentSubjects } = MakaioBus.registerNamespace(‘agent’, {
- started: z.object({ agentId: z.string() }),
- });
- const unsubscribe = on(AgentSubjects.started, (context) => {
- context.payload.agentId; // ✅ string - fully typed
- console.debug(‘Agent started:’, context.payload.agentId);
- });
-
- @example Wildcard handlers for namespace-level events
-
- // Matches all subjects in namespace
- on(AgentSubjects.$all, (context) => {
- context.payload; // unknown - must use type guards
- if (‘agentId’ in context.payload) {
-
console.debug('Any agent event:', context.payload.agentId);
- }
- });
-
- @example Request handler with typed request/response
-
- const { subjects: AgentSubjects } = MakaioBus.registerNamespace(‘agent’, {
- toolApprove: {
-
request: z.object({ toolName: z.string() }),
-
response: z.object({ approved: z.boolean() }),
- },
- });
- on(AgentSubjects.toolApprove, (context) => {
- const { toolName } = context.payload; // ✅ fully typed
- context.setResult({ approved: true });
- });
-
- @example Middleware pattern for request chain
-
- on(AgentSubjects.toolApprove, async (context) => {
- console.debug(‘Before approval’);
- await context.next();
- console.debug(‘After approval’);
- });
-
*/ on<Subject extends Subjects & StrictNamespace, IsChannel = Subject[‘$meta’][‘channel’]>( subject: IsChannel extends true ? never : Subject, handler: Subject extends SubjectDefinition ? HandlerForSubjectDefinition
: never, options?: OnOptions, ): () => void; /** Register an interceptor that runs BEFORE handlers (payload transform, blocking, priority). */ intercept<Subject extends Subjects & StrictNamespace>( subject: Subject, handler: InterceptorHandler<Subject[‘$meta’][‘payload’]>, options?: InterceptOptions, ): () => void;
/**
- Register a one-time event or request handler that auto-unsubscribes after first invocation.
- Wraps the
on()method to automatically unsubscribe after the handler fires once. - The handler is removed BEFORE being invoked to prevent re-entrance issues if the
- handler triggers the same event.
- Events: Fire-and-forget - handler executes once then auto-unsubscribes.
- Requests: Request-response - handler executes once then auto-unsubscribes.
- Wildcard Support: Like
on(), supports.$allproperty for namespace-level patterns. - @param subject - SubjectDefinition object (exact or wildcard)
- @param handler - Handler function (EventHandler for events, RequestHandler for requests)
- @returns Unsubscribe function for manual cleanup if needed
- @example Basic one-time event handler
-
- const { subjects: AgentSubjects } = MakaioBus.registerNamespace(‘agent’, {
- started: z.object({ agentId: z.string() }),
- });
- once(AgentSubjects.started, (context) => {
- context.payload.agentId; // ✅ string - fully typed
- console.debug(‘Agent started once:’, context.payload.agentId);
- });
- await emit(AgentSubjects.started, { agentId: ‘agent-123’ }); // Handler fires
- await emit(AgentSubjects.started, { agentId: ‘agent-456’ }); // Handler does NOT fire
-
- @example Manual unsubscribe before first fire
-
- const unsubscribe = once(AgentSubjects.started, (context) => {
- console.debug(‘This will never run’);
- });
- unsubscribe(); // Manually unsubscribe before event fires
- await emit(AgentSubjects.started, { agentId: ‘agent-123’ }); // Handler does NOT fire
-
- @example Wildcard handler for one-time namespace monitoring
-
- once(AgentSubjects.$all, (context) => {
- context.payload; // unknown - must use type guards
- console.debug(‘First agent event:’, context.payload);
- });
-
- @example One-time request handler
-
- const { subjects: AgentSubjects } = MakaioBus.registerNamespace(‘agent’, {
- toolApprove: {
-
request: z.object({ toolName: z.string() }),
-
response: z.object({ approved: z.boolean() }),
- },
- });
- once(AgentSubjects.toolApprove, (context) => {
- const { toolName } = context.payload; // ✅ fully typed
- context.setResult({ approved: true });
- });
- await request(AgentSubjects.toolApprove, { toolName: ‘deleteFile’ }); // Handler fires
- await request(AgentSubjects.toolApprove, { toolName: ‘createFile’ }); // Handler does NOT fire
-
*/ once<Subject extends Subjects & StrictNamespace, IsChannel = Subject[‘$meta’][‘channel’]>( subject: IsChannel extends true ? never : Subject, handler: Subject extends SubjectDefinition ? HandlerForSubjectDefinition
: never, ): () => void; /**
- Wait for an event to occur, returning a Promise.
- Note: Request subjects are not supported with the promise version of once().
- Use the callback version for request handlers:
once(subject, handler) - @param subject - Event SubjectDefinition object (exact or wildcard)
- @param options - Options object with: timeoutMs (reject after timeout), filter (only resolve when filter returns true), signal (AbortSignal to cancel waiting)
- @returns Promise that resolves with the event context
- @example Simple await
-
- const ctx = await bus.once(Subjects.init);
- console.debug(‘Event received:’, ctx.payload);
-
- @example With timeout
-
- try {
- const ctx = await bus.once(Subjects.init, { timeoutMs: 5000 });
- } catch (err) {
- if (err instanceof OnceTimeoutError) {
-
console.debug('Timed out waiting for event');
- }
- }
-
- @example With filter (waits for matching event)
-
- const ctx = await bus.once(Subjects.message, {
- filter: (payload) => payload.sessionId === expectedId
- });
- // Resolves only when a message with matching sessionId is received
-
- @example With AbortSignal
-
- const controller = new AbortController();
- const promise = bus.once(Subjects.init, { signal: controller.signal });
- controller.abort(); // Cancels the wait
-
*/ once< Subject extends Subjects & StrictNamespace, IsRequest = Subject[‘$meta’][‘isRequest’], IsChannel = Subject[‘$meta’][‘channel’],
( subject: IsChannel extends true ? never : IsRequest extends false ? Subject : never, options?: Subject extends SubjectDefinition ? OnceOptions
: never, ): Promise<Subject extends SubjectDefinition ? Parameters<HandlerForSubjectDefinition >[0] : never>; /** - Emit an event to all registered handlers.
- Events are fire-and-forget - all handlers execute in parallel.
- Handler errors are logged but don’t stop other handlers from executing.
- Note: You cannot emit to wildcard patterns. Use concrete subject keys only.
- Handlers registered with wildcards will still receive the event if it matches.
- @param subject - Concrete event subject (wildcards not allowed)
- @param payload - Event payload
- @param options - Emit options (messageId, correlationId, transports)
-
Transport Routing
Section titled “Transport Routing” -
transports: undefined- Send to ALL registered transports (default)
-
transports: []- Local only, don’t send to any transports
-
transports: ['ws', 'nats']- Send only to specified transports
- @example
-
- const { subjects: AgentSubjects } = MakaioBus.registerNamespace(‘agent’, {
- started: z.object({ agentId: z.string() }),
- });
- // Send to all transports (default)
- await emit(AgentSubjects.started, { agentId: ‘agent-123’ });
- // Local only, no transports
- await emit(
- AgentSubjects.started,
- { agentId: ‘agent-123’ },
- { transports: [] }
- );
- // Send to specific transports
- await emit(
- AgentSubjects.started,
- { agentId: ‘agent-123’ },
- { transports: [‘websocket’] }
- );
- // With tracking IDs:
- await emit(
- AgentSubjects.started,
- { agentId: ‘agent-123’ },
- { correlationId: ‘user-action-123’ }
- );
-
*/ emit< Subject extends Subjects & StrictNamespace, IsRequest = Subject[‘$meta’][‘isRequest’], IsWildcard = Subject[‘subject’] extends WildcardSubject ? true : false, IsChannel = Subject[‘$meta’][‘channel’],
( subject: IsChannel extends true ? never : IsRequest extends false ? IsWildcard extends false ? Subject : never : never, payload: Subject[‘$meta’][‘payload’], options?: EmitOptions, ): Promise
; /** - Execute a request and wait for a response.
- Requests follow a middleware chain pattern - handlers are called in order
- until one calls setResult() or all handlers complete.
- Note: You cannot request via wildcard patterns. Use concrete subject keys only.
- Handlers registered with wildcards will still match if the subject matches their pattern.
- @param subject - Concrete request subject (wildcards not allowed)
- @param payload - Request payload
- @param options - Request options (timeout, correlationId, transports)
- @returns Response value
- @throws {NoHandlerError} If no handler is registered
- @throws {TimeoutError} If request times out
- @throws {ValidationError} If payload validation fails
- @throws {RequestError} If handler throws an error
-
Transport Routing
Section titled “Transport Routing” -
transports: undefined- Send to ALL registered transports (default)
-
transports: []- Local only, don’t send to any transports
-
transports: ['ws', 'nats']- Send only to specified transports
- @example
-
- // ✅ Concrete subject
- const result = await request(
- AgentSubjects.toolApprove,
- { toolName: ‘deleteFile’, args: {}, toolCallId: ‘call_123’ },
- { timeout: 10000 }
- );
- console.debug(result.approved);
- // ❌ Cannot use wildcards
- // await request(‘agent.*’, { … }); // Type error
- // With specific transports
- const result = await request(
- AgentSubjects.toolApprove,
- { toolName: ‘deleteFile’, args: {}, toolCallId: ‘call_123’ },
- { transports: [‘websocket’], timeout: 10000 }
- );
-
*/ request< Subject extends Subjects & StrictNamespace, IsRequest = Subject[‘$meta’][‘isRequest’], IsChannel = Subject[‘$meta’][‘channel’],
( subject: IsChannel extends true ? never : IsRequest extends true ? Subject : never, payload: Subject[‘$meta’][‘payload’][‘request’], options?: RequestOptions, ): Promise<IsRequest extends true ? Subject[‘$meta’][‘payload’][‘response’] : never>;
/**
- Execute a request, returning a discriminated union instead of throwing for missing handlers.
- Use for optional services. Only NoHandlerError is caught - other errors propagate.
- @see {@link OptionalResult} for return type details */ requestOptional< Subject extends Subjects & StrictNamespace, IsRequest = Subject[‘$meta’][‘isRequest’], IsChannel = Subject[‘$meta’][‘channel’],
( subject: IsChannel extends true ? never : IsRequest extends true ? Subject : never, payload: Subject[‘$meta’][‘payload’][‘request’], options?: RequestOptions, ): Promise<OptionalResult<IsRequest extends true ? Subject[‘$meta’][‘payload’][‘response’] : never>>;
/**
- Execute a broadcast request and collect responses from ALL handlers.
- Unlike
request()which uses a middleware chain and returns the first result, broadcast()executes all handlers in parallel and aggregates their responses.- Use for discovery patterns where multiple nodes may respond (e.g., fs.listSources).
- Note: You cannot broadcast via wildcard patterns. Use concrete subject keys only.
- Handlers registered with wildcards will still match if the subject matches their pattern.
- Handler Usage:
- Handlers should call
ctx.identify(nodeId)beforectx.setResult()to tag their response. - If
identify()is not called, the response is tagged as ‘anonymous’. - @param subject - Concrete request subject (wildcards not allowed)
- @param payload - Request payload
- @param options - Request options (timeout, correlationId)
- @returns Array of { nodeId, payload } responses from all handlers
- @example
-
- // Discover all filesystem sources from all nodes
- const results = await MakaioBus.broadcast(FileSystemSubjects.listSources, {});
- // results: [
- // { nodeId: ‘local’, payload: { sources: […] } },
- // { nodeId: ‘container-1’, payload: { sources: […] } },
- // ]
- // Aggregate sources
- const allSources = results.flatMap(r => r.payload.sources);
-
*/ broadcast< Subject extends Subjects & StrictNamespace, IsRequest = Subject[‘$meta’][‘isRequest’], IsChannel = Subject[‘$meta’][‘channel’],
( subject: IsChannel extends true ? never : IsRequest extends true ? Subject : never, payload: Subject[‘$meta’][‘payload’][‘request’], options?: RequestOptions, ): Promise<BroadcastResult<IsRequest extends true ? Subject[‘$meta’][‘payload’][‘response’] : never>[]>;
scoped: <Domain extends string, Subjects extends SubjectRecord, F, Sc extends Record<string, SubjectSchema>>( input: BusNamespace<Domain, Subjects, F, Sc>, context?: MakaioBusContext, ) => ScopedBus
; /**
- Create a filtered bus with a base payload filter.
- The filter is automatically applied to all
on()andonce()calls. - Optionally provide a type parameter for type-safe filter keys.
- @param filter - Base filter to apply to all subscriptions
- @returns FilteredBus with the specified filter
- @example
-
- // Untyped (loose) - any keys allowed
- const agentBus = MakaioBus.withFilter({ agentId: this.agentId });
- // Type-safe filter keys
- interface AgentPayload { agentId: string; sessionId: string }
- const strictBus = MakaioBus.withFilter
({ agentId: ‘x’ }); -
*/ withFilter: <Payload = unknown>( filter: [unknown] extends [Payload] ? PayloadFilter : TypedPayloadFilter
, ) => IFilteredBus<NamespaceDomain extends string ? NamespaceDomain : string>; /**
- Register a handler that receives ALL messages (events and requests) across all namespaces.
- Debugging/Testing Only: Noops in production (process.env.NODE_ENV === ‘production’).
- Useful for logging, debugging, and test assertions that need visibility into all bus activity.
- Handler receives complete metadata: type, subject, namespace, payload, messageId, correlationId.
- @param handler - Function to invoke for every message
- @returns Unsubscribe function (noop in production)
- @example
-
- const unsubscribe = bus.__onAny((context) => {
- console.debug(
[${context.type}] ${context.namespace}:${context.subject}, context.payload); - });
-
*/ __onAny: (handler: AnyHandler) => () => void; __resetHandlers?: () => void;
/**
- Register a transport by its name property.
- Convenience method that delegates to
getContext().transportRegistry.registerTransport(). - The transport’s
nameproperty is used as the registry key. - @param transport - Transport to register
- @returns Registration object with
unregisterandreadypromise */ registerTransport(transport: BusTransport): TransportRegistration;
/**
- Unregister a transport by name.
- No-op if no transport is registered under that name.
- @param name - Transport name to unregister */ unregisterTransport(name: string): void;
/**
- Disconnect all registered transports and clear the transport map.
- Convenience method for tearing down a bus instance. The inverse of
- passing
transportstocreateBusInstance()or calling registerTransport()individually. */ disconnect(): void;
/**
- Trigger an immediate reconnection attempt on all disconnected transports.
- Delegates to each transport’s
reconnect()method if available. For - transports with exponential-backoff reconnection (e.g. WebSocket), this
- wakes the backoff sleep and returns once the attempt is initiated — not
- once the connection is established. For one-shot transports it resolves
- after the connect attempt completes. Failures are logged but do not reject
- this promise. No-op when all transports are already connected.
*/
reconnect(): Promise
;
/**
- Resolves when all transports registered at connect-time have completed
- subscribe-sync. If {@link connect} was called with the default
awaitReady: true, - this is already resolved when
connect()returns. IfawaitReady: falsewas used, - await this to ensure readiness before dispatching requests.
- Resolves immediately if no transports are registered or
connect()has not been called. */ readonly ready: Promise;
/**
- Connect all registered transports and optionally await subscribe-sync readiness.
- Calls
transport.connect()on every transport registered by this bus instance. - If any transport fails to connect, all transports are disconnected and unregistered
- (rollback) before the error is re-thrown. Pass
{ awaitReady: false }to resolve - as soon as sockets are open without waiting for the subscribe-sync handshake.
- Concurrent calls are safe — a second in-flight call awaits the same promise. Once
- sockets are open, subsequent calls are no-ops unless a prior
connect({ awaitReady: false })left the readiness handshake pending; in that case,- default
connect()still awaitsbus.ready. - @param options - Connection options (see {@link ConnectOptions})
- @throws If any transport’s
connect()orreadypromise rejects (after rollback) */ connect(options?: ConnectOptions): Promise;
getContext(): MakaioBusContext;
registerNamespace<Domain extends string, Schemas extends Record<string, SubjectSchema>>( domain: Domain, schemas: Schemas, options?: NamespaceRegistrationOptions, ): BusNamespace< Domain, SubjectRecordFromSchemaRecord
, FilterablePayloadIntersection<SubjectRecordFromSchemaRecord >, Schemas ;
/** Get the schema for a registered subject, or undefined if not found. */ getSchema
(subject: T | string): SubjectSchema | undefined; /**
- Extend a registered subject’s schema with additional fields.
- Adds new root-level fields to the Zod schema used for dev-mode validation and
- widens the TypeScript payload type. Successive calls accumulate — two packages
- can independently extend the same subject without overwriting each other.
- The returned value is the same runtime SubjectDefinition object — only the
- TypeScript type is widened. Bus routing is unaffected.
- @param subject - SubjectDefinition from a registered namespace
- @param extensions - For request subjects:
{ request?: { field: z.string() }, response?: {...} }. - For event subjects:
{ field: z.string() }(flat record of additional Zod fields). - @returns The same SubjectDefinition with wider TypeScript types
*/
extendSubject<SD extends Subjects & StrictNamespace, Ext extends import(’../extend-subject.js’).SubjectExtension
>( subject: SD, extensions: Ext, ): import(’../extend-subject.js’).ExtendedSubjectDefinition<SD, Ext>; }`
Resolved Shape
Section titled “Resolved Shape”type IMakaioBus = { namespace: NamespaceDomain; on: <Subject extends Subjects & StrictNamespace, IsChannel = Subject['$meta']['channel']>(subject: IsChannel extends true ? never : Subject, handler: Subject extends SubjectDefinition ? HandlerForSubjectDefinition<Subject> : never, options?: OnOptions) => () => void; intercept: <Subject extends Subjects & StrictNamespace>(subject: Subject, handler: InterceptorHandler<Subject['$meta']['payload']>, options?: InterceptOptions) => () => void; once: { <Subject extends Subjects & StrictNamespace, IsChannel = Subject['$meta']['channel']>(subject: IsChannel extends true ? never : Subject, handler: Subject extends SubjectDefinition ? HandlerForSubjectDefinition<Subject> : never): () => void; <Subject extends Subjects & StrictNamespace, IsRequest = Subject['$meta']['isRequest'], IsChannel = Subject['$meta']['channel']>(subject: IsChannel extends true ? never : IsRequest extends false ? Subject : never, options?: Subject extends SubjectDefinition ? OnceOptions<Subject> : never): Promise<Subject extends SubjectDefinition ? Parameters<HandlerForSubjectDefinition<Subject>>[0] : never>; }; emit: <Subject extends Subjects & StrictNamespace, IsRequest = Subject['$meta']['isRequest'], IsWildcard = Subject['subject'] extends '*' ? true : false, IsChannel = Subject['$meta']['channel']>(subject: IsChannel extends true ? never : IsRequest extends false ? IsWildcard extends false ? Subject : never : never, payload: Subject['$meta']['payload'], options?: EmitOptions) => Promise<void>; request: <Subject extends Subjects & StrictNamespace, IsRequest = Subject['$meta']['isRequest'], IsChannel = Subject['$meta']['channel']>(subject: IsChannel extends true ? never : IsRequest extends true ? Subject : never, payload: Subject['$meta']['payload']['request'], options?: RequestOptions) => Promise<IsRequest extends true ? Subject['$meta']['payload']['response'] : never>; requestOptional: <Subject extends Subjects & StrictNamespace, IsRequest = Subject['$meta']['isRequest'], IsChannel = Subject['$meta']['channel']>(subject: IsChannel extends true ? never : IsRequest extends true ? Subject : never, payload: Subject['$meta']['payload']['request'], options?: RequestOptions) => Promise<OptionalResult<IsRequest extends true ? Subject['$meta']['payload']['response'] : never>>; broadcast: <Subject extends Subjects & StrictNamespace, IsRequest = Subject['$meta']['isRequest'], IsChannel = Subject['$meta']['channel']>(subject: IsChannel extends true ? never : IsRequest extends true ? Subject : never, payload: Subject['$meta']['payload']['request'], options?: RequestOptions) => Promise<BroadcastResult<IsRequest extends true ? Subject['$meta']['payload']['response'] : never>[]>; scoped: <Domain extends string, Subjects extends SubjectRecord, F, Sc extends Record<string, SubjectSchema>>(input: BusNamespace<Domain, Subjects, F, Sc>, context?: MakaioBusContext) => ScopedBus<Domain>; withFilter: <Payload = unknown>(filter: [unknown] extends [Payload] ? PayloadFilter : TypedPayloadFilter<Payload>) => IFilteredBus<NamespaceDomain extends string ? NamespaceDomain : string>; __onAny: (handler: AnyHandler) => () => void; __resetHandlers?: (() => void) | undefined; registerTransport: (transport: BusTransport) => TransportRegistration; unregisterTransport: (name: string) => void; disconnect: () => void; reconnect: () => Promise<void>; ready: Promise<void>; connect: (options?: ConnectOptions) => Promise<void>; getContext: () => MakaioBusContext; registerNamespace: <Domain extends string, Schemas extends Record<string, SubjectSchema>>(domain: Domain, schemas: Schemas, options?: NamespaceRegistrationOptions) => BusNamespace<Domain, SubjectRecordFromSchemaRecord<Schemas>, FilterablePayloadIntersection<SubjectRecordFromSchemaRecord<Schemas>>, Schemas>; getSchema: <T extends SubjectDefinition>(subject: T | string) => SubjectSchema | undefined; extendSubject: <SD extends Subjects & StrictNamespace, Ext extends import("../extend-subject.js").SubjectExtension<SD>>(subject: SD, extensions: Ext) => import("../extend-subject.js").ExtendedSubjectDefinition<SD, Ext>;};