import './thread-polyfill.ts' import inspector from 'node:inspector' if (process.execArgv.includes('--inspect') === true) { inspector.open() inspector.waitForDebugger() } import type * as otel from '@opentelemetry/api' import type { SyncOptions } from '@livestore/common' import { LogConfig, UnknownError } from '@livestore/common' import type { StreamEventsOptions } from '@livestore/common/leader-thread' import { Eventlog, LeaderThreadCtx, streamEventsWithSyncState } from '@livestore/common/leader-thread' import type { LiveStoreSchema } from '@livestore/common/schema' import { LiveStoreEvent } from '@livestore/common/schema' import { loadSqlite3Wasm } from '@livestore/sqlite-wasm/load-wasm' import { sqliteDbFactory } from '@livestore/sqlite-wasm/node' import { Effect, FetchHttpClient, Layer, OtelTracer, Schema, Stream, WorkerRunner } from '@livestore/utils/effect' import { PlatformNode } from '@livestore/utils/node' import type { TestingOverrides } from './leader-thread-shared.ts' import { makeLeaderThread } from './leader-thread-shared.ts' import * as WorkerSchema from './worker-schema.ts' export type WorkerOptions = { schema: LiveStoreSchema sync?: SyncOptions syncPayloadSchema?: Schema.Schema otelOptions?: { tracer?: otel.Tracer /** @default 'livestore-node-leader-thread' */ serviceName?: string } testing?: TestingOverrides } & LogConfig.WithLoggerOptions export const getWorkerArgs = () => Schema.decodeSync(WorkerSchema.WorkerArgv)(process.argv[2]!) export const makeWorker = (options: WorkerOptions) => { makeWorkerEffect(options).pipe(PlatformNode.NodeRuntime.runMain) } export const makeWorkerEffect = (options: WorkerOptions) => { const TracingLive = options.otelOptions?.tracer !== undefined ? Layer.unwrapEffect(Effect.map(OtelTracer.make, Layer.setTracer)).pipe( Layer.provideMerge(Layer.succeed(OtelTracer.OtelTracer, options.otelOptions.tracer)), ) : undefined // Merge the runtime dependencies once so we can provide them together without chaining Effect.provide. const runtimeLayer = Layer.mergeAll( FetchHttpClient.layer, PlatformNode.NodeFileSystem.layer, TracingLive ?? Layer.empty, ) return WorkerRunner.layerSerialized(WorkerSchema.LeaderWorkerInnerRequest, { InitialMessage: (args) => Effect.gen(function* () { const sqlite3 = yield* Effect.promise(() => loadSqlite3Wasm()).pipe( Effect.withSpan('@livestore/adapter-node:leader-thread:loadSqlite3Wasm'), ) const makeSqliteDb = yield* sqliteDbFactory({ sqlite3 }) return yield* makeLeaderThread({ ...args, syncOptions: options.sync, schema: options.schema, testing: options.testing, makeSqliteDb, syncPayloadEncoded: args.syncPayloadEncoded, syncPayloadSchema: options.syncPayloadSchema, }) }).pipe(Layer.unwrapScoped), PushToLeader: ({ batch }) => Effect.andThen(LeaderThreadCtx, (_) => _.syncProcessor.push( batch.map((item) => new LiveStoreEvent.Client.EncodedWithMeta(item)), // We'll wait in order to keep back pressure on the client session { waitForProcessing: true }, ), ).pipe(Effect.uninterruptible, Effect.withSpan('@livestore/adapter-node:worker:PushToLeader')), BootStatusStream: () => Effect.andThen(LeaderThreadCtx, (_) => Stream.fromQueue(_.bootStatusQueue)).pipe(Stream.unwrap), PullStream: ({ cursor }) => Effect.gen(function* () { const { syncProcessor } = yield* LeaderThreadCtx return syncProcessor.pull({ cursor }) }).pipe(Stream.unwrapScoped), StreamEvents: (options: WorkerSchema.LeaderWorkerInnerStreamEvents) => LeaderThreadCtx.pipe( Effect.map(({ dbEventlog, syncProcessor }) => { const { _tag: _ignored, ...payload } = options const streamOptions = payload as StreamEventsOptions return streamEventsWithSyncState({ dbEventlog, syncState: syncProcessor.syncState, options: streamOptions, }) }), Stream.unwrapScoped, Stream.withSpan('@livestore/adapter-node:worker:StreamEvents'), ), Export: () => Effect.andThen(LeaderThreadCtx, (_) => _.dbState.export()).pipe( Effect.withSpan('@livestore/adapter-node:worker:Export'), ), ExportEventlog: () => Effect.andThen(LeaderThreadCtx, (_) => _.dbEventlog.export()).pipe( Effect.withSpan('@livestore/adapter-node:worker:ExportEventlog'), ), GetLeaderHead: Effect.fn('@livestore/adapter-node:worker:GetLeaderHead')(function* () { const workerCtx = yield* LeaderThreadCtx return Eventlog.getClientHeadFromDb(workerCtx.dbEventlog) }), GetLeaderSyncState: Effect.fn('@livestore/adapter-node:worker:GetLeaderSyncState')(function* () { const workerCtx = yield* LeaderThreadCtx return yield* workerCtx.syncProcessor.syncState }), SyncStateStream: () => Effect.gen(function* () { const workerCtx = yield* LeaderThreadCtx return workerCtx.syncProcessor.syncState.changes }).pipe(Stream.unwrapScoped), GetNetworkStatus: Effect.fn('@livestore/adapter-node:worker:GetNetworkStatus')(function* () { const workerCtx = yield* LeaderThreadCtx return yield* workerCtx.networkStatus }), NetworkStatusStream: () => Effect.gen(function* () { const workerCtx = yield* LeaderThreadCtx return workerCtx.networkStatus.changes }).pipe(Stream.unwrapScoped), GetRecreateSnapshot: Effect.fn('@livestore/adapter-node:worker:GetRecreateSnapshot')(function* () { const workerCtx = yield* LeaderThreadCtx // const result = yield* Deferred.await(workerCtx.initialSetupDeferred) // NOTE we can only return the cached snapshot once as it's transferred (i.e. disposed), so we need to set it to undefined // const cachedSnapshot = // result._tag === 'Recreate' ? yield* Ref.getAndSet(result.snapshotRef, undefined) : undefined // return cachedSnapshot ?? workerCtx.db.export() const snapshot = workerCtx.dbState.export() return { snapshot, migrationsReport: workerCtx.initialState.migrationsReport } }), Shutdown: Effect.fn('@livestore/adapter-node:worker:Shutdown')(function* () { // const { db, dbEventlog } = yield* LeaderThreadCtx yield* Effect.logDebug('[@livestore/adapter-node:worker] Shutdown') // if (devtools.enabled) { // yield* FiberSet.clear(devtools.connections) // } // db.close() // dbEventlog.close() // Buy some time for Otel to flush // TODO find a cleaner way to do this // yield* Effect.sleep(1000) }), ExtraDevtoolsMessage: ({ message }) => Effect.andThen(LeaderThreadCtx, (_) => _.extraIncomingMessagesQueue.offer(message)).pipe( Effect.withSpan('@livestore/adapter-node:worker:ExtraDevtoolsMessage'), ), }).pipe( Layer.provide(PlatformNode.NodeWorkerRunner.layer), WorkerRunner.launch, Effect.scoped, Effect.tapCauseLogPretty, Effect.annotateLogs({ thread: options.otelOptions?.serviceName ?? 'livestore-node-leader-thread', processId: process.pid, }), LogConfig.withLoggerConfig( { logger: options.logger, logLevel: options.logLevel }, { threadName: options.otelOptions?.serviceName ?? 'livestore-node-leader-thread' }, ), // TODO bring back with Effect 4 once it's easier to work with replacing loggers. // We basically only want to provide this logger if it's replacing the default logger, not if there's a custom logger already provided. // Effect.provide(Logger.prettyWithThread(options.otelOptions?.serviceName ?? 'livestore-node-leader-thread')), Effect.provide(runtimeLayer), ) }