Skip to content

Abstract Class: DiscoveryOrchestrator<TRecord, TState>

Makaio Framework


Makaio Framework / ai-adapters-core/node / DiscoveryOrchestrator

Abstract Class: DiscoveryOrchestrator<TRecord, TState>

Section titled “Abstract Class: DiscoveryOrchestrator<TRecord, TState>”

Defined in: ../../../adapters/core/src/log-importer/discovery-orchestrator.ts:46

Abstract discovery orchestrator for shallow session discovery.

Overrides handleFileChange to call importer.extractDiscoveryMetadata(filePath) directly, bypassing the parseFile → validateRecords → extractSessionContext pipeline. Emits only adapter.session.discovered and saves a stub cursor (bytesRead: 0) so the full import starts from byte 0 when the user triggers lazy-load pick-up.

For files that have already been imported, the orchestrator delegates 'modified' events to the base class for incremental processing and manages the 'imported' ↔ 'tracking' status lifecycle.

BaseLogOrchestrator - Parent class with full import behavior

TRecord

The adapter’s native log record type

TState = unknown

The adapter’s resumable state type (default: unknown)

protected new DiscoveryOrchestrator<TRecord, TState>(config, importer): DiscoveryOrchestrator<TRecord, TState>

Defined in: ../../../adapters/core/src/log-importer/discovery-orchestrator.ts:69

LogOrchestratorConfig

LogImporter<TRecord, TState>

DiscoveryOrchestrator<TRecord, TState>

BaseLogOrchestrator.constructor

protected readonly config: Required<Omit<LogOrchestratorConfig, "directory" | "checkMakaioManaged">> & object

Defined in: ../../../adapters/core/src/log-importer/base-orchestrator.ts:42

optional checkMakaioManaged?: (sessionId) => Promise<boolean>

string

Promise<boolean>

optional directory?: string

BaseLogOrchestrator.config


protected readonly eventQueue: LogImportEventQueue

Defined in: ../../../adapters/core/src/log-importer/base-orchestrator.ts:48

BaseLogOrchestrator.eventQueue


protected readonly importer: LogImporter<TRecord, TState>

Defined in: ../../../adapters/core/src/log-importer/base-orchestrator.ts:54

Importer instance - subclasses create and manage their own typed instance.

BaseLogOrchestrator.importer


protected readonly lastSeenMtimeMs: Map<string, number>

Defined in: ../../../adapters/core/src/log-importer/discovery-orchestrator.ts:64

Last observed mtime (ms) per tracked file path. Used by the polled handler to detect inactivity without re-reading disk.


abstract protected readonly logPrefix: string

Defined in: ../../../adapters/core/src/log-importer/base-orchestrator.ts:51

Log prefix for console output - set by subclass

BaseLogOrchestrator.logPrefix


protected readonly trackingFilePaths: Set<string>

Defined in: ../../../adapters/core/src/log-importer/discovery-orchestrator.ts:51

Paths of files whose adapter sessions are currently in 'tracking' status. Maintained locally to avoid per-poll DB lookups.


protected readonly trackingInactiveCount: Map<string, number>

Defined in: ../../../adapters/core/src/log-importer/discovery-orchestrator.ts:58

Per-file count of consecutive poll cycles with no mtime change while in 'tracking' status. Reset on each 'modified' event; incremented by the polled handler when mtime is unchanged.


protected readonly watcher: LogImportWatcher

Defined in: ../../../adapters/core/src/log-importer/base-orchestrator.ts:47

BaseLogOrchestrator.watcher

protected buildCursorSessionContext(context): object & object

Defined in: ../../../adapters/core/src/log-importer/base-orchestrator.ts:147

Builds the serialized session context for cursor persistence.

LogImportSessionContext<TState>

The import session context to serialize.

Serialized cursor session context.

BaseLogOrchestrator.buildCursorSessionContext


protected clearTrackingState(filePath): void

Defined in: ../../../adapters/core/src/log-importer/discovery-orchestrator.ts:77

Clear all in-memory tracking collections for a single file path.

string

File path to remove from local tracking state

void


dispose(): Promise<void>

Defined in: ../../../adapters/core/src/log-importer/discovery-orchestrator.ts:136

Promise<void>

BaseLogOrchestrator.dispose


abstract protected getLogFilePattern(): string

Defined in: ../../../adapters/core/src/log-importer/base-orchestrator.ts:98

Get the glob pattern for log files to watch.

string

BaseLogOrchestrator.getLogFilePattern


protected getMaxRecords(): number | undefined

Defined in: ../../../adapters/core/src/log-importer/base-orchestrator.ts:138

Returns the maximum number of records to parse per file. Override in subclasses to limit parsing (e.g., discovery mode).

number | undefined

Maximum record count, or undefined for no limit.

BaseLogOrchestrator.getMaxRecords


protected handleFileChange(event): Promise<void>

Defined in: ../../../adapters/core/src/log-importer/discovery-orchestrator.ts:161

Simplified file change handler for discovery mode.

Dispatches to two paths:

  1. No cursor exists → lightweight discovery (emit adapter.session.discovered, write stub cursor).
  2. Cursor exists + changeType === 'modified' + session status is 'imported' or 'tracking' → delegate to base class for incremental import, then transition status to 'tracking' if it was 'imported'.

All other cases (e.g., cursor exists but status is 'discovered', or changeType is not 'modified') are silently skipped.

LogFileChangeEvent

File change event from the watcher

Promise<void>

BaseLogOrchestrator.handleFileChange


protected handleFirstRead(filePath, records, bytesRead, mtime, isJsonFormat, startOffset, emitLifecycleEvents?): Promise<void>

Defined in: ../../../adapters/core/src/log-importer/base-orchestrator.ts:351

Handle the first read of a log file.

Extracts session context, emits session lifecycle events, processes records, and saves the cursor. Subclasses may override to change behavior (e.g., DiscoveryOrchestrator skips message processing).

string

Path to the log file

TRecord[]

Records parsed from the file

number

Bytes read during this parse

Date

File modification time

boolean

Whether the file uses JSON (mtime-based) format

number

Byte offset this read started from

boolean = true

Whether to emit session and started lifecycle events for this pass

Promise<void>

BaseLogOrchestrator.handleFirstRead


protected handleIncrementalRead(filePath, records, cursorContext, bytesRead, mtime, isJsonFormat, startOffset): Promise<void>

Defined in: ../../../adapters/core/src/log-importer/base-orchestrator.ts:418

Handle an incremental read of a log file.

Processes new records since the last cursor position. Subclasses may override to skip incremental processing (e.g., DiscoveryOrchestrator).

string

Path to the log file

TRecord[]

New records since last read

object & object

Existing cursor session context

number

Bytes read during this parse

Date

File modification time

boolean

Whether the file uses JSON (mtime-based) format

number

Byte offset this read started from

Promise<void>

BaseLogOrchestrator.handleIncrementalRead


isEnabled(): boolean

Defined in: ../../../adapters/core/src/log-importer/base-orchestrator.ts:173

boolean

BaseLogOrchestrator.isEnabled


isRunning(): boolean

Defined in: ../../../adapters/core/src/log-importer/base-orchestrator.ts:177

boolean

BaseLogOrchestrator.isRunning


protected isSessionSkipped(adapterSessionId): Promise<boolean>

Defined in: ../../../adapters/core/src/log-importer/base-orchestrator.ts:514

Determine whether an adapter session should be skipped from import.

Delegates to the managed-session cache so concurrent checks for the same adapterSessionId share one storage lookup and tracked skips update stats.

string

External adapter session ID being evaluated

Promise<boolean>

Promise resolving to true when the session is Makaio-managed and should be skipped

BaseLogOrchestrator.isSessionSkipped


protected maybeUpdateCursor(filePath, bytesRead, startOffset, mtime, isJsonFormat, sessionContext?): Promise<void>

Defined in: ../../../adapters/core/src/log-importer/base-orchestrator.ts:493

string

number

number

Date

boolean

object & object

Promise<void>

BaseLogOrchestrator.maybeUpdateCursor


protected onPollCycle(trackedFilePaths): Promise<void>

Defined in: ../../../adapters/core/src/log-importer/discovery-orchestrator.ts:338

React to a completed poll cycle for inactivity-based tracking → imported transitions.

For each file in trackingFilePaths, checks if the watcher’s last-known mtime matches our stored lastSeenMtimeMs. If unchanged, increments the inactive counter. When the counter reaches the tracking inactivity threshold, the adapter session reverts to 'imported' and the file is removed from tracking.

ReadonlySet<string>

All file paths observed by the watcher in this cycle

Promise<void>


abstract protected parseFile(filePath, startOffset, maxRecords?): Promise<ParseFileResult<TRecord>>

Defined in: ../../../adapters/core/src/log-importer/base-orchestrator.ts:118

Parse a log file from the given byte offset.

string

Path to the log file

number

Byte offset to start reading from

number

Optional maximum number of records to return (for shallow discovery)

Promise<ParseFileResult<TRecord>>

BaseLogOrchestrator.parseFile


protected persistImportedStatus(filePath): Promise<boolean>

Defined in: ../../../adapters/core/src/log-importer/discovery-orchestrator.ts:88

Persist the steady-state imported status for a tracked file.

string

Absolute path to the tracked log file

Promise<boolean>

true when the file can be removed from local tracking state


protected queueCursorUpdate(filePath, bytesRead, startOffset, mtime, isJsonFormat, sessionContext?, precedingEventPromises?): Promise<void>

Defined in: ../../../adapters/core/src/log-importer/base-orchestrator.ts:549

Enqueue a cursor write as a PQueue task so it executes after all previously queued events for the same batch have been emitted.

PQueue runs tasks with concurrency=1 in FIFO order, so placing the cursor write at the back of the queue after all queueEvent calls guarantees that the cursor advances only once every preceding event in the batch has been delivered. This prevents the race where a process exit after queueEvent but before queue drain would leave events lost yet the cursor advanced.

string

Path to the log file

number

Total bytes read at the new cursor position

number

Byte offset this read started from

Date

File modification time

boolean

Whether the file uses JSON (mtime-based) format

object & object

Serialized session context for the cursor

Promise<void>[] = []

Emission promises queued before this cursor write

Promise<void>

Promise that resolves when the cursor write has completed

BaseLogOrchestrator.queueCursorUpdate


protected queueEvent(event): Promise<void>

Defined in: ../../../adapters/core/src/log-importer/base-orchestrator.ts:527

Queue a normalized event and return its delivery promise.

NormalizedEvent

Normalized event to emit

Promise<void>

Promise that resolves after delivery or rejects on emit failure

BaseLogOrchestrator.queueEvent


protected restorePersistedTrackingState(): Promise<void>

Defined in: ../../../adapters/core/src/log-importer/discovery-orchestrator.ts:193

Rehydrate tracking file state from persisted adapter-session rows after startup.

The watcher state is process-local, but adapter-session status is persisted. Without this rehydration, a restart can strand sessions in 'tracking' forever because no in-memory file state exists to drive the inactivity timeout.

Promise<void>


protected shouldSkipFile(_filePath): boolean

Defined in: ../../../adapters/core/src/log-importer/base-orchestrator.ts:108

Determine whether a discovered file should be skipped before processing.

Default implementation never skips. Subclasses may override to exclude files by name (e.g., ephemeral compaction summary files).

string

Absolute path to the candidate file

boolean

true to skip the file entirely, false to process normally

BaseLogOrchestrator.shouldSkipFile


start(): Promise<void>

Defined in: ../../../adapters/core/src/log-importer/discovery-orchestrator.ts:113

Promise<void>

BaseLogOrchestrator.start


stop(): Promise<void>

Defined in: ../../../adapters/core/src/log-importer/discovery-orchestrator.ts:130

Promise<void>

BaseLogOrchestrator.stop


protected trackFileChange(event): void

Defined in: ../../../adapters/core/src/log-importer/base-orchestrator.ts:327

Track a watcher-triggered file import so shutdown can wait for any cursor work that the handler enqueues before draining the shared FIFO queue.

LogFileChangeEvent

File change event to process

void

BaseLogOrchestrator.trackFileChange


protected trackImportedSession(adapterSessionId): void

Defined in: ../../../adapters/core/src/log-importer/base-orchestrator.ts:401

Record that a session has been imported, updating import statistics.

Protected to allow subclasses to update stats when processing additional sessions (e.g., compress child sessions from compacted files).

string

The session ID that was imported

void

BaseLogOrchestrator.trackImportedSession


protected updateCursor(filePath, bytesRead, mtime, sessionContext?): Promise<void>

Defined in: ../../../adapters/core/src/log-importer/base-orchestrator.ts:251

string

number

Date

object & object

Promise<void>

BaseLogOrchestrator.updateCursor


protected usesJsonFormat(): boolean

Defined in: ../../../adapters/core/src/log-importer/base-orchestrator.ts:164

Check if this orchestrator uses JSON format (mtime-based cursor).

boolean

True if JSON format, false for JSONL

BaseLogOrchestrator.usesJsonFormat


protected validateRecords(records): TRecord[]

Defined in: ../../../adapters/core/src/log-importer/base-orchestrator.ts:129

Validate and filter parsed records. Default: returns records as-is.

TRecord[]

Raw parsed records

TRecord[]

Validated/filtered records

BaseLogOrchestrator.validateRecords


static createDefaultCheckMakaioManaged(): (sessionId) => Promise<boolean>

Defined in: ../../../adapters/core/src/log-importer/base-orchestrator.ts:243

Function that checks if a session is Makaio-managed.

(sessionId) => Promise<boolean>

BaseLogOrchestrator.createDefaultCheckMakaioManaged