Skip to content

Transport

The bus is a singleton within a single process. Transports extend it across process boundaries — between the server and browser clients, between the main thread and workers, or between machines via a relay.

This guide covers how transports work, the shipped implementations, the subscribe-sync handshake, priority-cursor dispatch, and how to build a custom transport. For the bus itself, see the Bus System guide.

The BusTransport Interface

Every transport implements BusTransport (../packages/bus-core/src/types/transports.ts):

interface BusTransport {
name: string;
// Lifecycle
connect(): Promise<void>;
disconnect(): Promise<void>;
reconnect?(): Promise<void>;
// Messaging
send<TMessage extends BusMessage>(message: TMessage, timeout?: number): Promise<...>;
onReceive(handler: (message: BusMessage, context?: TransportReceiveContext) => Promise<void>): () => void;
onBroadcastResults?(correlationId: string, results: ReadonlyArray<...>, error?: BusTransportError): void;
// Subscription management
subscribe(subject: string, filter?: PayloadFilter, priorities?: number[]): Promise<void>;
unsubscribe(subject: string): Promise<void>;
// Readiness
ready?: Promise<void>;
onNewReadySession?: (promise: Promise<void>) => void;
isReady?(): boolean;
// Registry callbacks and optional filtering
onConnected?: () => void;
onDisconnected?: () => void;
getSubscriptions?(): Set<string>;
cancelRequest?(correlationId: string, error?: Error): void;
}

The contract has three concerns:

Messagingsend() pushes a BusMessage to the remote peer. onReceive() registers the callback that handles incoming messages. The bus calls onReceive() once during registration and routes all incoming messages through it.

Subscriptionssubscribe() and unsubscribe() advertise which subjects this node can handle. Request priorities populate the peer’s remote handler registry for priority-cursor dispatch. Event-only subscriptions are advertised with an empty priority list and can be used by multiplexing transports, such as the WebSocket server transport, for per-client delivery filtering.

Readinessready is a Promise that resolves after the subscribe-sync handshake completes. bus.connect() awaits this by default. If a caller uses connect({ awaitReady: false }), request dispatch performs a bounded lazy wait for pending ready promises before building the remote handler chain. isReady() is the hot-path synchronous check used to skip transports whose connection or auth session is not currently usable.

Shipped Transports

WebSocket Transport

Package: ../transports/ws/

The primary transport for server-to-client communication. Ships in both client and server modes:

import { createWebSocketTransport, HmacAuth } from '@makaio/bus-transport-websocket';
// Server mode — wraps a WebSocketServer
const serverTransport = createWebSocketTransport({
mode: 'server',
websocket: wss,
auth: new HmacAuth({ secret }),
});
// Client mode — wraps a pre-created WebSocket
const clientTransport = createWebSocketTransport({
mode: 'client',
websocket: ws,
auth: new HmacAuth({ secret }),
});

For reconnection and socket lifecycle management, use WebSocketClientTransport directly:

import { HmacAuth, WebSocketClientTransport } from '@makaio/bus-transport-websocket';
const transport = new WebSocketClientTransport({
url: 'ws://localhost:6252/bus',
auth: new HmacAuth({ secret }),
autoReconnect: { baseMs: 1_000, maxMs: 10_000 },
});

The WebSocket transport is duck-typed — it works with the browser’s native WebSocket, the ws library, or any object matching the WebSocketLike / WebSocketServerLike interfaces.

Authentication options:

AuthPurpose
HmacAuthHMAC-SHA256 shared secret (loopback / trusted networks)
E2EAuthEnd-to-end encrypted (machine-to-machine)
E2ERelayAuthE2E via relay server (browser-to-machine through cloud)
DispatchingAuthHot-swaps between auth strategies at runtime

MessagePort Transport

Package: ../transports/message-channel/

For communication between threads, workers, and iframes using the MessageChannel / MessagePort API:

import { createMessagePortTransport } from '@makaio/bus-transport-message-channel';
// In the main thread
const channel = new MessageChannel();
const transport = createMessagePortTransport({
port: channel.port1,
name: 'worker',
});
// Pass channel.port2 to the worker
worker.postMessage({ port: channel.port2 }, [channel.port2]);

The transport wraps any MessagePortLike — browser MessagePort, Node.js worker_threads.MessagePort, or a custom adapter. It supports optional channel enveloping ({ channel: 'bus', message }) for multiplexed ports like SharedWorker connections.

Loopback Transport

Package: ../packages/bus-server/src/loopback-transport.ts

An outbound-only proxy that delegates send() to an existing transport under a different name. Used for same-server relay routing — when a request arrives on the WebSocket transport and needs to be routed back through the same server-side transport under a different identity:

import { LoopbackTransport } from '@makaio/bus-server';
const loopback = new LoopbackTransport({
target: serverTransport,
name: 'electron',
});
bus.registerTransport(loopback);
// Requests arriving on 'websocket' can now relay to 'electron'

Lifecycle methods (connect, disconnect) are no-ops — the target transport owns its own lifecycle.

Registration and Connection

Transport registration follows a strict sequence:

// 1. Register transports (before connect)
const registration = bus.registerTransport(transport);
// 2. Connect all transports
await bus.connect();
// 3. Wait for readiness (already resolved by default connect())
await bus.ready;

During registration, the bus calls transport.onReceive(handler) to wire the inbound message handler. It then calls syncAllSubjectsToTransport() to push all currently registered local handler subjects to the new transport.

During connect, the bus calls transport.connect() on each registered transport, then waits for subscribe-sync readiness unless connect({ awaitReady: false }) is used.

Registration after connect has started is blocked — all transports must be registered before calling bus.connect().

Subscribe-Sync Handshake

When a transport connects, the bus and its remote peer need to synchronize their handler registrations. The handshake works like this:

Server Client
| |
| <-- subscribe(subject, prios) -- | (client tells server what it handles)
| <-- subscribe(subject, prios) -- |
| <-- subscribe(subject, prios) -- |
| |
| -- subscribe(subject, prios) --> | (server tells client what it handles)
| -- subscribe(subject, prios) --> |
| |
| -- subscribe-sync-complete ----> | (server signals sync is done)
| <-- subscribe-sync-complete ---- | (client signals sync is done)
| |
| transport.ready resolves | transport.ready resolves

Both sides push their full advertised handler state, then send subscribe-sync-complete. The transport.ready Promise resolves when the peer’s sync-complete arrives. With the default bus.connect() behavior, callers start dispatch only after this readiness promise resolves. If a caller opts into connect({ awaitReady: false }), request dispatch checks the registry’s pending ready promises, waits within the caller’s timeout/signal budget, then rebuilds the remote handler chain before dispatching.

This prevents a race condition where a request would be evaluated before the remote side has advertised its handlers, resulting in a false NoHandlerError.

Advertised State and Routing

Subscriptions are the wire format for handler advertisement, but they do not mean every bus send is subscription-filtered. Routing depends on message kind.

Requests: Advertised-State Dispatch

When bus.on(subject, handler) is called locally, the bus recomputes the advertised state for each target transport. The advertised state for a (transport, subject) pair is the union of:

  • local request handler priorities for that subject
  • foreign remote handler priorities from other transports, excluding the target to avoid loops
  • event-handler presence, represented as an empty priority array

This computation happens in pushAdvertisedSubject() (../packages/bus-core/src/registries/advertised-state.ts). The function calls transport.subscribe(subject, filter, priorities) with the aggregated data, or transport.unsubscribe(subject) when no handlers remain.

When a subscribe wire message arrives, the bus updates remoteRequestHandlers for request priorities and remoteEventHandlers for event-only presence. Request dispatch then merges local entries with matching remote request entries in priority order.

Events and Broadcasts: Relay Plus Transport Filtering

The bus sends outbound events to all ready transports unless the caller provides an explicit transport list or the subject is local-only. Inbound event relay also forwards to all other ready transports, excluding the source transport.

Broadcasts execute local handlers and send to ready transports. Inbound broadcast relay is intentionally not subscription-gated at the transport registry layer; otherwise a stale subscription snapshot could silently drop a valid responder and produce an incomplete result array.

ServerTransport adds per-client filtering inside a WebSocket server transport: event fan-out and server-initiated broadcast fan-out use each client’s subscriptions and payload filters. Server-initiated requests are not filtered out; clients are tried in subscription-priority order, and a NoHandlerError or timeout advances to the next client.

Priority-Cursor Dispatch

The dispatch chain for request() merges local and remote handlers into a single list sorted by priority (highest first):

Priority 100 Local handler A
Priority 80 Remote handler (transport: 'websocket')
Priority 50 Local handler B
Priority 0 Remote handler (transport: 'worker')

The dispatcher walks this list from highest to lowest priority:

  1. Local entry — execute the handler directly. If it calls setResult(), done. If it calls next(), advance to the next entry.
  2. Remote entry — send a BusRequestMessage to that transport with a priority cursor. The remote node starts its own dispatch chain from that priority downward.
  3. If the remote returns NoHandlerError, the local chain continues to the next entry.
  4. If no handler in the entire chain produces a result, NoHandlerError is thrown (or { handled: false } for requestOptional).

The priority cursor is what makes cross-transport dispatch correct. Without it, the remote node would start from its highest-priority handler and potentially re-execute work that a higher-priority local handler already did.

Relay Behavior

The bus server acts as a relay for multi-client topologies. When a message arrives from one transport, the server:

Events — Relayed to all other ready transports (excluding the source). Local handlers also execute. In ServerTransport, same-transport WebSocket client fan-out is handled per client with subscription and payload filtering.

Requests — Dispatched through the merged priority chain. If a remote entry points to a different transport than the source, the request is forwarded there.

Broadcasts — Local handlers execute and relay sends to all other ready transports. Results are aggregated as { nodeId, payload } entries from local handlers and relay transports. ServerTransport filters only its internal per-client fan-out.

Security: Local Subject Enforcement

Messages targeting localSubject() subjects are dropped at the transport boundary. The transport registry checks isLocalSubject() for inbound events and rejects them with a warning. This prevents remote peers from invoking process-internal subjects.

Module Augmentation

Transport implementations declare their name in the BusTransportRegistry interface via TypeScript declaration merging:

// In the WebSocket transport package
declare module '@makaio/bus-core' {
interface BusTransportRegistry {
websocket: BusTransport;
}
}
// In the MessagePort transport package
declare module '@makaio/bus-core' {
interface BusTransportRegistry {
'message-channel': BusTransport;
}
}

This makes transport names type-safe. When accessing a transport by name, only declared names are accepted:

const registry = bus.getContext().transportRegistry;
const ws = registry.getTransport('websocket'); // typed

Implementing a Custom Transport

To build a new transport (e.g., gRPC, Unix socket, SharedWorker):

1. Implement BusTransport

import type { BusTransport, BusMessage, PayloadFilter } from '@makaio/bus-core';
import {
CorrelationTracker,
DEFAULT_REQUEST_TIMEOUT_MS,
handleCorrelationResponse,
trackMessageCorrelation,
} from '@makaio/bus-core';
export class MyTransport implements BusTransport {
public readonly name = 'my-transport';
public readonly ready: Promise<void>;
private receiveHandler?: (message: BusMessage, context?: TransportReceiveContext) => Promise<void>;
private readonly correlations = new CorrelationTracker();
private readyResolve!: () => void;
public constructor() {
this.ready = new Promise((resolve) => {
this.readyResolve = resolve;
});
}
public onReceive(handler: (message: BusMessage, context?: TransportReceiveContext) => Promise<void>): () => void {
this.receiveHandler = handler;
return () => { this.receiveHandler = undefined; };
}
public async connect(): Promise<void> {
// Open your connection here
// When you receive a message from the remote peer:
// await this.receiveHandler?.(message);
// When you receive subscribe-sync-complete:
// this.readyResolve();
}
public async disconnect(): Promise<void> {
// Close your connection
}
public async send<T extends BusMessage>(message: T, timeout?: number): Promise<...> {
// Serialize and send to remote peer
// For requests, use CorrelationTracker:
return trackMessageCorrelation(message, this.correlations, timeout ?? DEFAULT_REQUEST_TIMEOUT_MS);
}
public async subscribe(subject: string, filter?: PayloadFilter, priorities?: number[]): Promise<void> {
// Send a subscribe message to the remote peer
}
public async unsubscribe(subject: string): Promise<void> {
// Send an unsubscribe message to the remote peer
}
}

2. Declare the Type

declare module '@makaio/bus-core' {
interface BusTransportRegistry {
'my-transport': BusTransport;
}
}

3. Register and Connect

const transport = new MyTransport();
bus.registerTransport(transport);
await bus.connect();

Key Integration Points

  • CorrelationTracker — Matches request messages to their responses via correlationId. Use trackMessageCorrelation() to register a pending request and handleCorrelationResponse() to resolve it when the response arrives.

  • subscribe-sync-complete — After sending your initial handler set via subscribe() calls, send a { type: 'subscribe-sync-complete' } message. Resolve your ready promise when you receive one from the remote peer.

  • transport.ready / isReady()bus.connect() awaits ready by default, and request dispatch performs a bounded lazy wait if readiness is still pending. Implement isReady() when the transport has a cheap connection/auth check that should skip sends on the hot path.

  • Error serialization — Transport errors must be serializable. Use serializeTransportError() from @makaio/bus-core to convert Error objects to wire format.

Message Types

Messages on the wire are discriminated by type:

TypeKey fieldsPurpose
eventsubject, namespace, payload, messageId, correlationId?Fire-and-forget event delivery
requestsubject, namespace, payload, correlationId, messageId, timeout?, priority?, deadline?RPC request with priority cursor and optional hop deadline
responsecorrelationId, result?, error?Reply to a request
broadcastsubject, namespace, payload, correlationId, messageId, timeout?Multi-handler request
broadcast-responsecorrelationId, results?, error?Broadcast results as { nodeId, payload }[], or a structured error
subscribesubjects, filters?Advertise subject priorities with Record<string, number[]>; empty arrays mean event-only handlers
unsubscribesubjectsRemove subject advertisements with Record<string, number[]>
subscribe-sync-completenone beyond typeEnd of initial handler sync
heartbeattimestampConnection keepalive

Only event, request, and broadcast messages carry namespace and messageId. Responses are keyed by correlationId; subscription, handshake, and heartbeat messages use their own compact shapes.