Interface: BusTransport
Makaio Framework / bus-core / BusTransport
Interface: BusTransport
Section titled “Interface: BusTransport”Defined in: ../../../packages/bus-core/src/types/transports.ts:260
Transport interface for bus communication.
Transports handle serialization, wire protocol, and connection management for cross-process communication.
Example
Section titled “Example”class WebSocketTransport implements BusTransport { readonly name = 'websocket';
async send(message: BusMessage): Promise<void> { this.ws.send(JSON.stringify(message)); }
onReceive(handler: BusReceiveHandler): () => void { const listener = (data) => { const message = JSON.parse(data) as BusMessage; const context: TransportReceiveContext = { transportName: this.name }; void handler(message, context); }; this.ws.on('message', listener); return () => this.ws.off('message', listener); }
async connect(): Promise<void> { await this.ws.connect(); }
async disconnect(): Promise<void> { await this.ws.close(); }}Properties
Section titled “Properties”name:
string
Defined in: ../../../packages/bus-core/src/types/transports.ts:274
Unique transport name used as the registry key.
The name is used by IMakaioBus.registerTransport() to key the transport
in the internal registry. It must be unique within a single bus instance.
Implementations should declare this as a readonly literal or class field.
Example
Section titled “Example”class MyTransport implements BusTransport { readonly name = 'my-transport';}onConnected?
Section titled “onConnected?”
optionalonConnected?: () =>void
Defined in: ../../../packages/bus-core/src/types/transports.ts:408
Optional callback set by the transport registry during registration. Called each time the transport establishes a connection (initial or reconnect), after authentication and subscription replay are complete.
Returns
Section titled “Returns”void
onDisconnected?
Section titled “onDisconnected?”
optionalonDisconnected?: () =>void
Defined in: ../../../packages/bus-core/src/types/transports.ts:415
Optional callback set by the transport registry during registration.
Called when the transport loses its connection unexpectedly.
Not called on an explicit disconnect().
Returns
Section titled “Returns”void
onNewReadySession?
Section titled “onNewReadySession?”
optionalonNewReadySession?: (promise) =>void
Defined in: ../../../packages/bus-core/src/types/transports.ts:401
Optional callback set by the transport registry during registration. Transports that support reconnection call this at the start of each new session so the registry can track the new ready promise for dispatch gating.
Parameters
Section titled “Parameters”promise
Section titled “promise”Promise<void>
The new ready promise for the current session
Returns
Section titled “Returns”void
ready?
Section titled “ready?”
optionalready?:Promise<void>
Defined in: ../../../packages/bus-core/src/types/transports.ts:392
Optional: Promise that resolves when the transport is fully operational (end-to-end, including remote handler availability).
Resolved after the bus has sent the initial subscribe sync to the transport
and the transport has received the BusSubscribeSyncCompleteMessage
handshake. Before this resolves, remoteRequestHandlers for this transport
may be incomplete and requests should not be routed through it.
Transports that do not implement ready are considered immediately ready.
Remarks
Section titled “Remarks”Transports that do not implement onNewReadySession are single-use:
after disconnect() completes, a later connect() call has undefined
behavior. Reconnectable transports must call onNewReadySession each time
they create a new ready promise so the registry can update its gating.
Methods
Section titled “Methods”cancelRequest()?
Section titled “cancelRequest()?”
optionalcancelRequest(correlationId,error?):void
Defined in: ../../../packages/bus-core/src/types/transports.ts:346
Optional: cancel/cleanup pending correlation state for an in-flight request.
Used when the caller aborts while the transport-level correlation promise is
still pending (e.g. timeout 0 + AbortSignal). Implementations should reject
and remove the correlation entry if present.
Parameters
Section titled “Parameters”correlationId
Section titled “correlationId”string
Correlation ID to cancel
error?
Section titled “error?”Error
Optional cancellation error to propagate
Returns
Section titled “Returns”void
connect()
Section titled “connect()”connect():
Promise<void>
Defined in: ../../../packages/bus-core/src/types/transports.ts:358
Connect the transport.
Returns
Section titled “Returns”Promise<void>
disconnect()
Section titled “disconnect()”disconnect():
Promise<void>
Defined in: ../../../packages/bus-core/src/types/transports.ts:363
Disconnect the transport.
Returns
Section titled “Returns”Promise<void>
getSubscriptions()?
Section titled “getSubscriptions()?”
optionalgetSubscriptions():Set<string>
Defined in: ../../../packages/bus-core/src/types/transports.ts:440
Optional: Report which subjects this transport is interested in. Enables transport-level filtering for efficiency.
If not implemented, the transport will receive all messages (broadcast mode). If implemented, only messages matching the subscription patterns will be sent.
Patterns can include wildcards (e.g., ‘adapter.*’ matches ‘adapter.log’, ‘adapter.init’, etc.)
Returns
Section titled “Returns”Set<string>
Set of subscription patterns (can include wildcards)
isReady()?
Section titled “isReady()?”
optionalisReady():boolean
Defined in: ../../../packages/bus-core/src/types/transports.ts:428
Optional: Synchronous readiness check for transport-level gating.
When implemented and returning false, the bus skips this transport
for outbound sends (requests, broadcasts, events). The transport
continues to receive inbound messages via onReceive.
Complements the async ready promise: isReady() enables non-blocking
skip decisions in hot paths, while ready supports await-based flows.
Returns
Section titled “Returns”boolean
true if the transport can send messages, false otherwise
onBroadcastResults()?
Section titled “onBroadcastResults()?”
optionalonBroadcastResults(correlationId,results,error?):void
Defined in: ../../../packages/bus-core/src/types/transports.ts:491
Receive aggregated broadcast results from the transport registry.
Called by the transport registry after executing local handlers and relaying
to other transports for a broadcast that arrived from this transport via
onReceive. Replaces the legacy send({ type: 'broadcast-response' })
side-channel when implemented.
Transports that manage their own peer-level fan-out (e.g., ServerTransport
with multiple WebSocket clients) implement this to receive registry results
without abusing the send() contract.
Parameters
Section titled “Parameters”correlationId
Section titled “correlationId”string
Correlation ID of the originating broadcast
results
Section titled “results”readonly object[]
Aggregated results from local handlers and relay transports
error?
Section titled “error?”Optional structured error when broadcast processing failed; results may be empty or partial in this case
Returns
Section titled “Returns”void
onReceive()
Section titled “onReceive()”onReceive(
handler): () =>void
Defined in: ../../../packages/bus-core/src/types/transports.ts:353
Register a handler for incoming messages.
Parameters
Section titled “Parameters”handler
Section titled “handler”Handler function
Returns
Section titled “Returns”Unsubscribe function
() => void
reconnect()?
Section titled “reconnect()?”
optionalreconnect():Promise<void>
Defined in: ../../../packages/bus-core/src/types/transports.ts:374
Optional: Trigger an immediate reconnection attempt, bypassing any backoff wait.
If the transport is currently waiting in an exponential-backoff delay, calling this wakes it immediately so a new connection attempt starts without waiting. If reconnection is disabled on the transport, a one-shot connect attempt is made. No-op when the transport is already connected.
Returns
Section titled “Returns”Promise<void>
Promise that resolves when the attempt is initiated or completes
send()
Section titled “send()”send<
TMessage>(message,timeout?):Promise<TMessageextendsBusRequestMessage?unknown:TMessageextendsBusBroadcastMessage?object[] :boolean>
Defined in: ../../../packages/bus-core/src/types/transports.ts:326
Send a message over the transport.
Behavior by message type:
- Requests (
BusRequestMessage): Returns the response payload from the handler. Throws error if no clients available to handle request (server mode). - Events (
BusEventMessage): Returns delivery status boolean. true: Delivered to at least one recipientfalse: No recipients available (not an error)- Other messages (heartbeat, subscribe, etc.): Returns
trueif sent successfully.
Error handling:
- Throws when transport is disconnected
- Throws when sending requests with no connected clients (server mode only)
- Never throws for events - returns
falseinstead
Timeout contract:
The timeout value flows from the caller’s bus.request({ timeout }) option
through the dispatch layer to the transport’s correlation tracker.
A value of 0 means no automatic timeout — the promise stays open until
resolved or rejected externally (e.g. by the caller’s own AbortSignal).
Type Parameters
Section titled “Type Parameters”TMessage
Section titled “TMessage”TMessage extends BusMessage
Parameters
Section titled “Parameters”message
Section titled “message”TMessage
Message to send
timeout?
Section titled “timeout?”number
Correlation timeout in milliseconds; 0 means no automatic timeout
Returns
Section titled “Returns”Promise<TMessage extends BusRequestMessage ? unknown : TMessage extends BusBroadcastMessage ? object[] : boolean>
Promise resolving to response payload (requests) or delivery status (events/other)
Example
Section titled “Example”// Request: type-safe response handlingconst request: BusRequestMessage = { type: 'request', subject: 'user.get', namespace: 'api', payload: { id: 123 }, correlationId: 'req-1', messageId: 'msg-1',};const response = await transport.send(request, 5000); // response: unknown
// Event: delivery statusconst event: BusEventMessage = { type: 'event', subject: 'user.created', namespace: 'api', payload: { id: 123 }, messageId: 'evt-1',};const delivered = await transport.send(event, 0); // delivered: booleanif (!delivered) { console.warn('Event not delivered - no subscribers');}subscribe()
Section titled “subscribe()”subscribe(
subject,filter?,priorities?):Promise<void>
Defined in: ../../../packages/bus-core/src/types/transports.ts:461
Subscribe to a subject for smart-routing.
Transports use this to track which subjects have active handlers, enabling targeted message delivery instead of broadcast.
When priorities is provided the transport should include the priority
information in the next outbound subscribe wire message for this subject,
enabling cross-transport priority-based dispatch on the remote side.
Calling this method again with a new priorities array replaces the
previously advertised set for that subject (re-subscribe semantics).
Transports that do not need subscription management (e.g. local-only loopback transports) should provide a no-op implementation.
Parameters
Section titled “Parameters”subject
Section titled “subject”string
Subject pattern to subscribe to (can include wildcards)
filter?
Section titled “filter?”PayloadFilter
Optional payload filter for fine-grained routing
priorities?
Section titled “priorities?”number[]
Handler priorities registered for this subject; an empty array signals event-only handlers that do not participate in priority dispatch
Returns
Section titled “Returns”Promise<void>
unsubscribe()
Section titled “unsubscribe()”unsubscribe(
subject):Promise<void>
Defined in: ../../../packages/bus-core/src/types/transports.ts:473
Unsubscribe from a subject.
Called when the last handler for a subject is removed, allowing the transport to stop routing messages for that subject.
Transports that do not need subscription management should provide a no-op implementation.
Parameters
Section titled “Parameters”subject
Section titled “subject”string
Subject pattern to unsubscribe from
Returns
Section titled “Returns”Promise<void>