import { ValueShape, DocShape, TypedDoc, Mutable, PathBuilder, PathSelector, Infer } from '@loro-extended/change'; export * from '@loro-extended/change'; import { Logger } from '@logtape/logtape'; import { PeerID, LoroDoc, VersionVector, Container, EphemeralStore, Listener } from 'loro-crdt'; export { PeerID } from 'loro-crdt'; import * as Emittery from 'emittery'; import Emittery__default from 'emittery'; import { Patch } from 'mutative'; type DocId = string; type ChannelId = number; type AdapterType = string; type DocContent = Record; type LoroDocMutator = (doc: LoroDoc) => void; type PeerIdentityDetails = { peerId: PeerID; name?: string; type: "user" | "bot" | "service"; }; type ReadyStateChannelMeta = ChannelMeta & { state: "established" | "connected"; }; type ReadyState = { docId: DocId; identity: PeerIdentityDetails; channels: ReadyStateChannelMeta[]; status: "pending" | "synced" | "absent"; }; /** * Discriminated union for peer document awareness. * - "unknown": We don't know if the peer has this document * - "absent": Peer explicitly doesn't have this document * - "pending": Peer has this document but we don't know their version yet * (e.g., they announced via new-doc but we haven't synced yet) * - "synced": Peer has this document with a known version */ type PeerDocSyncState = { status: "unknown"; lastUpdated: Date; } | { status: "absent"; lastUpdated: Date; } | { status: "pending"; lastUpdated: Date; } | { status: "synced"; lastKnownVersion: VersionVector; lastUpdated: Date; }; type PeerState = { identity: PeerIdentityDetails; docSyncStates: Map; subscriptions: Set; channels: Set; }; /** * Pending network request waiting for storage to be consulted. */ type PendingNetworkRequest = { channelId: ChannelId; requesterDocVersion: VersionVector; }; type DocState = { doc: LoroDoc; docId: DocId; /** * Storage channels we're waiting to hear from before responding to network requests. * When this set becomes empty, we process pendingNetworkRequests. * * - undefined or empty: No pending storage check * - non-empty: Waiting for these storage channels to respond */ pendingStorageChannels?: Set; /** * Network sync-requests waiting for storage to be consulted. * When all storage channels have responded (pendingStorageChannels is empty), * we send sync-responses to all of these. */ pendingNetworkRequests?: PendingNetworkRequest[]; }; /** * Creates a new DocState with a LoroDoc configured with the given peerId. * * The peerId is required to ensure proper UndoManager behavior and change attribution. * Each LoroDoc must have its peerId set to match the Repo's identity.peerId so that: * 1. UndoManager correctly identifies which changes belong to the local peer * 2. Changes are properly attributed in the oplog * 3. External tools that rely on PeerID matching work correctly * * @param docId - The document ID * @param peerId - The peer ID to set on the LoroDoc (must be a valid numeric string) */ declare function createDocState({ docId, peerId, }: { docId: DocId; peerId: PeerID; }): DocState; type SyncTransmission = { type: "up-to-date"; version: VersionVector; } | { type: "snapshot"; data: Uint8Array; version: VersionVector; } | { type: "update"; data: Uint8Array; version: VersionVector; } | { type: "unavailable"; }; /** * BARE network message types * * These contain individual message type data, but no senderId nor targetIds. */ type ChannelMsgEstablishRequest = { type: "channel/establish-request"; identity: PeerIdentityDetails; }; type ChannelMsgEstablishResponse = { type: "channel/establish-response"; identity: PeerIdentityDetails; }; /** * Per-peer ephemeral store data; always associated with a docId (room). * Used in all ephemeral-related messages (sync-request, sync-response, channel/ephemeral). */ type EphemeralStoreData = { peerId: PeerID; data: Uint8Array; /** * Namespace for the store (e.g., 'presence', 'cursors', 'mouse'). * Required for the unified ephemeral store model. */ namespace: string; }; /** * Request to sync a single document with a peer. * * When multiple documents need to be synced, wrap multiple sync-request * messages in a channel/batch message. */ type ChannelMsgSyncRequest = { type: "channel/sync-request"; docId: DocId; requesterDocVersion: VersionVector; /** Requester's ephemeral state for this doc (my presence data) */ ephemeral?: EphemeralStoreData[]; /** * Whether the receiver should send a reciprocal sync-request back. * - initiating sync-request should set bidirectional to `true` * - reciprocal sync-request should set bidirectional to `false` * * Set to false to prevent infinite loops when sending reciprocal requests. */ bidirectional: boolean; }; type ChannelMsgSyncResponse = { type: "channel/sync-response"; docId: DocId; transmission: SyncTransmission; /** Responder's ephemeral snapshot (all known peers' presence data) */ ephemeral?: EphemeralStoreData[]; }; type ChannelMsgUpdate = { type: "channel/update"; docId: DocId; transmission: SyncTransmission; }; type ChannelMsgDirectoryRequest = { type: "channel/directory-request"; docIds?: DocId[]; }; type ChannelMsgDirectoryResponse = { type: "channel/directory-response"; docIds: DocId[]; }; /** * Announce new documents to peers. * * This is an unsolicited message sent when a new document is created locally. * Peers can then decide whether to request the document data via sync-request. * * Note: This is different from directory-response, which is a response to * directory-request (for glob-based document discovery). */ type ChannelMsgNewDoc = { type: "channel/new-doc"; docIds: DocId[]; }; type ChannelMsgDeleteRequest = { type: "channel/delete-request"; docId: DocId; }; type ChannelMsgDeleteResponse = { type: "channel/delete-response"; docId: DocId; status: "deleted" | "ignored"; }; type ChannelMsgEphemeral = { type: "channel/ephemeral"; docId: DocId; hopsRemaining: number; /** Per-peer store data. Each entry is one peer's presence data for the document. */ stores: EphemeralStoreData[]; }; /** * Batch multiple established messages into a single network transmission. * * This is a transport optimization that allows sending multiple messages * to the same peer in a single network payload. The receiver will dispatch * each inner message individually. * * Use cases: * - Batching multiple sync-requests after connection establishment * - Batching ephemeral messages for heartbeat (one batch per peer) * - Any scenario where multiple messages go to the same peer * * Note: Nested batches are not allowed (messages cannot contain ChannelMsgBatch). */ type ChannelMsgBatch = { type: "channel/batch"; messages: BatchableMsg[]; }; /** * Messages that can be included in a batch. * Excludes ChannelMsgBatch to prevent nested batches. */ type BatchableMsg = ChannelMsgSyncRequest | ChannelMsgSyncResponse | ChannelMsgUpdate | ChannelMsgDirectoryRequest | ChannelMsgDirectoryResponse | ChannelMsgNewDoc | ChannelMsgDeleteRequest | ChannelMsgDeleteResponse | ChannelMsgEphemeral; type AddressedEstablishmentEnvelope = { toChannelIds: ChannelId[]; message: EstablishmentMsg; }; /** * A channel message wrapped in target channelIds to send the message to * * These augment bare network messages with targetIds, giving the message addressable recipients. */ type AddressedEstablishedEnvelope = { toChannelIds: ChannelId[]; message: EstablishedMsg; }; type AddressedEnvelope = { toChannelIds: ChannelId[]; message: ChannelMsg; }; type ReturnEnvelope = { fromChannelId: ChannelId; message: ChannelMsg; }; /** * BARE, ADDRESSED, and REGULAR network message union types * * These are probably what you're looking for--a way to annotate the type of message. */ /** * Message type unions based on valid channel states */ /** Messages valid during the establishment phase (ConnectedChannel) */ type EstablishmentMsg = ChannelMsgEstablishRequest | ChannelMsgEstablishResponse; /** Messages valid after establishment is complete (Channel with peerId) */ type EstablishedMsg = ChannelMsgSyncRequest | ChannelMsgSyncResponse | ChannelMsgUpdate | ChannelMsgDirectoryRequest | ChannelMsgDirectoryResponse | ChannelMsgNewDoc | ChannelMsgDeleteRequest | ChannelMsgDeleteResponse | ChannelMsgEphemeral | ChannelMsgBatch; /** All channel messages */ type ChannelMsg = EstablishmentMsg | EstablishedMsg; /** * Type predicate to check if a message is an establishment message. */ declare function isEstablishmentMsg(msg: ChannelMsg): msg is EstablishmentMsg; /** * Type predicate to check if a message requires an established channel. */ declare function isEstablishedMsg(msg: ChannelMsg): msg is EstablishedMsg; /** * A `GeneratedChannel` is created by an adapter's generate() method. * It has metadata and actions but no connection to the synchronizer yet. * * biome-ignore format: left-align */ type GeneratedChannel = ChannelMeta & ChannelActions; /** * A `ConnectedChannel` is registered with the synchronizer and can send/receive messages. * It has a channelId and an onReceive handler. * * biome-ignore format: left-align */ type ConnectedChannel = GeneratedChannel & ChannelIdentity & { type: 'connected'; /** * Receive handler for incoming messages. * Set by the Synchronizer when the channel is added. */ onReceive: (msg: ChannelMsg) => void; /** * Type-safe send for establishment phase messages. * Only establishment messages can be sent before the channel is established. */ send: (msg: EstablishmentMsg) => void; }; /** * A `Channel` is a ConnectedChannel that has completed the establish handshake * and knows which peer it's connected to. * * Examples of different kinds of channels: * - storage: we need to send a request to a database to get a document out of storage * - network: we need to send a request for a sync from a peer * * biome-ignore format: left-align */ type EstablishedChannel = GeneratedChannel & ChannelIdentity & { type: 'established'; peerId: PeerID; /** * Receive handler for incoming messages. * Set by the Synchronizer when the channel is added. */ onReceive: (msg: ChannelMsg) => void; /** * Type-safe send for established channel messages. * Only sync/directory/delete messages can be sent after establishment. */ send: (msg: EstablishedMsg) => void; }; type Channel = ConnectedChannel | EstablishedChannel; /** * Type guard to check if a Channel has been established with a peer. */ declare function isEstablished(channel: Channel): channel is EstablishedChannel; type ChannelMeta = { kind: ChannelKind; adapterType: AdapterType; }; type ChannelIdentity = { channelId: ChannelId; }; type ChannelActions = { /** * Generic send method for channel messages. * * ⚠️ WARNING: This method does not enforce type safety at compile time. * Prefer using `sendEstablishment()` or `sendEstablished()` for type-safe sends. * * This method is kept for internal use where the caller is responsible for * ensuring messages are sent to channels in the correct state. */ send: (msg: ChannelMsg) => void; stop: () => void; }; /** * The minimal return type for adapter's generateActions() method. * Adapters only need to provide send/stop - the adapter base class * adds kind and adapterType automatically. */ type GeneratedChannelActions = ChannelActions; type ChannelKind = "storage" | "network" | "other"; type ReceiveFn = (msg: ChannelMsg) => void; /** * @deprecated Channels are now ready-on-creation, no lifecycle callbacks needed * A set of callbacks for the channel to report its lifecycle events * to the Synchronizer. This allows the Synchronizer to manage connection state. */ interface ChannelLifecycle { /** The channel is now connected and ready to send messages. */ onReady: () => void; /** An error occurred in the channel. */ onError: (error: Error) => void; /** The channel has disconnected. */ onDisconnect: () => void; } type GenerateFn = (context: G) => GeneratedChannel; type ChannelDirectoryHooks = { onChannelAdded: (channel: Channel) => void; onChannelRemoved: (channel: Channel) => void; }; declare class ChannelDirectory { readonly generate: GenerateFn; private readonly channels; private onChannelAdded?; private onChannelRemoved?; constructor(generate: GenerateFn); [Symbol.iterator](): IterableIterator; has(channelId: ChannelId): boolean; get(channelId: ChannelId): Channel | undefined; get size(): number; setHooks(hooks: ChannelDirectoryHooks): void; /** * Using an adapter's `generate` function, create a GeneratedChannel and then fill in * details needed to convert it to a ConnectedChannel. * * @param context The context specific to the Adapter type * @param onReceive A callback to be used to forward messages to the synchronizer * @returns a ConnectedChannel capable of sending EstablishmentMsgs */ create(context: G, onReceive: ReceiveFn): ConnectedChannel; remove(channelId: ChannelId): Channel | undefined; reset(): void; } /** * Context provided to send interceptors. */ type SendInterceptorContext = { /** The envelope being sent */ envelope: AddressedEnvelope; /** The adapter type (e.g., "websocket-client") */ adapterType: string; /** The adapter instance ID */ adapterId: string; }; /** * A send interceptor can delay, drop, log, or modify outgoing messages. * * Call `next()` to continue the chain. If `next()` is not called, the message is dropped. * * @example Delay all messages by 3 seconds * ```typescript * adapter.addSendInterceptor((ctx, next) => { * setTimeout(next, 3000) * }) * ``` * * @example Drop 10% of messages (simulate packet loss) * ```typescript * adapter.addSendInterceptor((ctx, next) => { * if (Math.random() > 0.1) next() * }) * ``` * * @example Log all messages * ```typescript * adapter.addSendInterceptor((ctx, next) => { * console.log('Sending:', ctx.envelope.message.type) * next() * }) * ``` */ type SendInterceptor = (context: SendInterceptorContext, next: () => void) => void; type HandleSendFn = (adapterType: AdapterType, toChannelId: ChannelId, message: ChannelMsg) => void; type AnyAdapter = Adapter; type AdapterParams = { adapterType: AdapterType; /** * Unique identifier for this adapter instance. * If not provided, auto-generated as `{adapterType}-{uuid}`. * Used for idempotent add/remove operations. */ adapterId?: string; }; /** * Context provided to adapters during initialization. * Contains identity, logger, and callbacks for channel lifecycle events. */ type AdapterContext = { identity: PeerIdentityDetails; logger: Logger; /** * Called when a message is received on a channel. * Note: channelId is passed instead of channel object because the channel * object may be stale (due to immutable state updates in the synchronizer). * The synchronizer should look up the current channel from its model. */ onChannelReceive: (channelId: ChannelId, message: ChannelMsg) => void; onChannelAdded: (channel: ConnectedChannel) => void; onChannelRemoved: (channel: Channel) => void; onChannelEstablish: (channel: ConnectedChannel) => void; }; /** * @deprecated Use AdapterContext instead */ type AdapterHooks = AdapterContext; declare abstract class Adapter { #private; /** * The kind of channels this adapter creates. * Default is "network". StorageAdapter overrides this to "storage". * * This is a first-class property of the adapter, not just the channels, * allowing code to check adapter capabilities before channels are created. */ readonly kind: ChannelKind; readonly adapterType: AdapterType; /** * Unique identifier for this adapter instance. * Used for idempotent add/remove operations. */ readonly adapterId: string; logger: Logger; readonly channels: ChannelDirectory; onSend: HandleSendFn | undefined; protected identity?: PeerIdentityDetails; constructor({ adapterType, adapterId }: AdapterParams); /** * Create a channel. Only callable during "started" state. * The channel must be ready to send/receive immediately. */ protected addChannel(context: G): ConnectedChannel; /** * Remove a channel. Only callable during "started" state. */ protected removeChannel(channelId: ChannelId): Channel | undefined; /** * Establish a channel by triggering the establishment handshake. * This should be called after addChannel() to initiate communication. * Only callable during "started" state. */ protected establishChannel(channelId: ChannelId): void; /** * Generate a GeneratedChannel for the given context. * The returned channel must be ready to use immediately. * * Note: Subclasses should return only `send` and `stop` functions. * The `kind` and `adapterType` will be overwritten by the adapter's * properties in _generate(), so specifying them is optional. */ protected abstract generate(context: G): GeneratedChannel; /** * Internal method that ensures channel metadata comes from the adapter. * This is what ChannelDirectory calls to create channels. */ private _generate; /** * Start the adapter. Create initial channels here. * For dynamic adapters (servers), set up listeners that will * call addChannel() when new connections arrive. */ abstract onStart(): Promise; /** * Stop the adapter. Clean up resources and remove channels. */ abstract onStop(): Promise; _initialize(context: AdapterContext): void; _start(): Promise; _stop(): Promise; /** * Given an envelope with zero or more toChannelIds, attempts to send the * message (in the envelope) through this adapter's channels. Note that this * does NOT guarantee delivery, only sending will be attempted through any * matching channels. * * @param envelope an AddressedEnvelope with message inside * @returns the number of channels to which the message was sent (optimistic count when interceptors are present) */ _send(envelope: AddressedEnvelope): number; /** * Add a send interceptor to the chain. * Interceptors are called in order of addition. * * @param interceptor - The interceptor function * @returns A function to remove the interceptor * * @example Delay all messages by 3 seconds * ```typescript * const unsubscribe = adapter.addSendInterceptor((ctx, next) => { * setTimeout(next, 3000) * }) * ``` * * @example Drop 10% of messages (simulate packet loss) * ```typescript * adapter.addSendInterceptor((ctx, next) => { * if (Math.random() > 0.1) next() * }) * ``` * * @example Log all messages * ```typescript * adapter.addSendInterceptor((ctx, next) => { * console.log('Sending:', ctx.envelope.message.type) * next() * }) * ``` */ addSendInterceptor(interceptor: SendInterceptor): () => void; /** * Clear all send interceptors. */ clearSendInterceptors(): void; } type BridgeParams = { logger?: Logger; }; /** * A simple message router that connects multiple BridgeAdapters within the same process. * This enables direct message passing between adapters for testing purposes. */ declare class Bridge { readonly adapters: Map; readonly logger: Logger; constructor({ logger }?: BridgeParams); /** * Register an adapter with this bridge */ addAdapter(adapter: BridgeAdapter): void; /** * Remove an adapter from this bridge */ removeAdapter(adapterType: AdapterType): void; /** * Route a message from one adapter to another */ routeMessage(fromAdapterType: AdapterType, toAdapterType: AdapterType, message: ChannelMsg): void; /** * Get all adapter IDs currently in the bridge */ get adapterTypes(): Set; } type BridgeAdapterContext = { targetAdapterType: AdapterType; }; type BridgeAdapterParams = { adapterType: AdapterType; /** * Unique identifier for this adapter instance. * If not provided, defaults to adapterType for backwards compatibility. */ adapterId?: string; bridge: Bridge; logger?: Logger; }; /** * An in-memory adapter for testing that connects multiple peers within the same process. * * BridgeAdapter simulates real network adapter behavior by delivering messages * asynchronously via `queueMicrotask()`. This ensures tests exercise the same * async codepaths as production adapters (WebSocket, SSE, etc.). * * **Important**: Tests using BridgeAdapter should use `waitForSync()` or * `waitUntilReady()` to await synchronization, just like they would with * real network adapters. * * @example * ```typescript * const bridge = new Bridge() * const repoA = new Repo({ * adapters: [new BridgeAdapter({ adapterType: "peer-a", bridge })], * }) * const repoB = new Repo({ * adapters: [new BridgeAdapter({ adapterType: "peer-b", bridge })], * }) * * const handleA = repoA.get("doc", DocSchema) * handleA.change(draft => { draft.text.insert(0, "hello") }) * * const handleB = repoB.get("doc", DocSchema) * await handleB.waitForSync() // Wait for async message delivery * expect(handleB.doc.toJSON().text).toBe("hello") * ``` */ declare class BridgeAdapter extends Adapter { readonly bridge: Bridge; readonly logger: Logger; private channelToAdapter; private adapterToChannel; constructor({ adapterType, adapterId, bridge, logger }: BridgeAdapterParams); generate(context: BridgeAdapterContext): GeneratedChannel; /** * Start participating in the in-process network. * Uses two-phase initialization: * 1. Create all channels (no messages sent) * 2. Establish channels (only the "newer" adapter initiates to avoid double-establishment) */ onStart(): Promise; /** * Stop participating in the in-process network. * Cleans up all channels and removes from bridge. */ onStop(): Promise; /** * Create a channel to a target adapter (Phase 1). * Does NOT trigger establishment - that happens in Phase 2. * Called by our own onStart() or by other adapters when they start. */ createChannelTo(targetAdapterType: AdapterType): void; /** * Establish a channel to a target adapter (Phase 2). * Triggers the establishment handshake. * Called by our own onStart() or by other adapters when they start. */ establishChannelTo(targetAdapterType: AdapterType): void; /** * Remove a channel to a target adapter. * Called by other adapters when they stop. */ removeChannelTo(targetAdapterType: AdapterType): void; /** * Deliver a message from another adapter to the appropriate channel. * Called by Bridge.routeMessage(). * * Delivers messages asynchronously via queueMicrotask() to simulate real * network adapter behavior. This ensures tests using BridgeAdapter exercise * the same async codepaths as production adapters (WebSocket, SSE, etc.). * * Tests should use `waitForSync()` or `waitUntilReady()` to await sync completion. */ deliverMessage(fromAdapterType: AdapterType, message: ChannelMsg): void; } /** * A test adapter that simulates network latency between channel establishment * and sync-response delivery. Useful for testing timing-sensitive scenarios. * * @example * ```typescript * const adapter = new DelayedNetworkAdapter({ syncResponseDelay: 100 }) * const repo = new Repo({ * identity: { name: "client", type: "user" }, * adapters: [adapter], * }) * * const handle = repo.get("test-doc", DocSchema) * * // Later, simulate server response * await adapter.deliverSyncResponse("test-doc", serverSnapshot) * // Or simulate server doesn't have the document * await adapter.deliverUnavailable("test-doc") * ``` */ type DelayedNetworkAdapterOptions = { /** * Delay in milliseconds before delivering sync responses. */ syncResponseDelay: number; /** * The peer ID to use for the simulated server. * @default "server" */ serverPeerId?: PeerID; /** * The name to use for the simulated server. * @default "server" */ serverName?: string; }; declare class DelayedNetworkAdapter extends Adapter { private channel?; private syncResponseDelay; private serverPeerId; private serverName; /** * Callback invoked when a sync-request is received. * Useful for tests that need to know when to deliver responses. */ onSyncRequestReceived?: (docId: string) => void; constructor(options: DelayedNetworkAdapterOptions); protected generate(): GeneratedChannel; onStart(): Promise; onStop(): Promise; /** * Simulate the server sending a sync-response with document data. * * @param docId - The document ID * @param data - The document snapshot data (from loroDoc.export({ mode: "snapshot" })) */ deliverSyncResponse(docId: string, data: Uint8Array): Promise; /** * Simulate the server responding that it doesn't have the document. * * @param docId - The document ID */ deliverUnavailable(docId: string): Promise; } /** * JSON-serializable version of VersionVector */ type VersionVectorJSON = Record; /** * JSON-serializable version of Uint8Array (base64 encoded) */ type BinaryDataJSON = string; /** * JSON-serializable version of SyncTransmission */ type SyncTransmissionJSON = { type: "up-to-date"; version: VersionVectorJSON; } | { type: "snapshot"; data: BinaryDataJSON; version: VersionVectorJSON; } | { type: "update"; data: BinaryDataJSON; version: VersionVectorJSON; } | { type: "unavailable"; }; /** * JSON-serializable version of EphemeralPeerData */ type EphemeralPeerDataJSON = { peerId: PeerID; data: BinaryDataJSON; namespace: string; }; /** * JSON-serializable version of EphemeralStoreData */ type EphemeralStoreDataJSON = { peerId: PeerID; data: BinaryDataJSON; namespace: string; }; /** * JSON-serializable versions of all channel messages */ type ChannelMsgJSON = { type: "channel/establish-request"; identity: { peerId: PeerID; name: string; }; } | { type: "channel/establish-response"; identity: { peerId: PeerID; name: string; }; } | { type: "channel/sync-request"; docId: string; requesterDocVersion: VersionVectorJSON; ephemeral?: EphemeralPeerDataJSON[]; bidirectional: boolean; } | { type: "channel/sync-response"; docId: string; transmission: SyncTransmissionJSON; ephemeral?: EphemeralPeerDataJSON[]; } | { type: "channel/update"; docId: string; transmission: SyncTransmissionJSON; } | { type: "channel/directory-request"; docIds?: string[]; } | { type: "channel/directory-response"; docIds: string[]; } | { type: "channel/new-doc"; docIds: string[]; } | { type: "channel/delete-request"; docId: string; } | { type: "channel/delete-response"; docId: string; status: "deleted" | "ignored"; } | { type: "channel/ephemeral"; docId: string; hopsRemaining: number; stores: EphemeralStoreDataJSON[]; } | { type: "channel/batch"; messages: BatchableMsgJSON[]; }; /** * JSON-serializable version of BatchableMsg (all established messages except batch itself) */ type BatchableMsgJSON = Exclude; /** * Utility functions for serialization */ declare function versionVectorToJSON(vv: VersionVector): VersionVectorJSON; declare function versionVectorFromJSON(json: VersionVectorJSON): VersionVector; declare function uint8ArrayToJSON(data: Uint8Array): BinaryDataJSON; declare function uint8ArrayFromJSON(json: BinaryDataJSON): Uint8Array; /** * Serialize a channel message to JSON-compatible format */ declare function serializeChannelMsg(msg: ChannelMsg): ChannelMsgJSON; /** * Deserialize a JSON-compatible message back to channel message */ declare function deserializeChannelMsg(json: ChannelMsgJSON): ChannelMsg; type AdapterManagerParams = { adapters?: AnyAdapter[]; context: AdapterContext; onReset: (adapter: AnyAdapter) => void; onSend?: HandleSendFn; logger?: Logger; }; /** * The AdapterManager is responsible for managing adapters and sending * AddressedEnvelopes to their addressees via the adapters. * * Supports dynamic add/remove of adapters at runtime. */ declare class AdapterManager { #private; readonly logger: Logger; constructor({ adapters, context, onReset, onSend, logger, }: AdapterManagerParams); /** * Start all adapters that were provided in the constructor. * This should be called after the Synchronizer is fully initialized. */ startAll(): void; /** * Get all adapters as an array. */ get adapters(): AnyAdapter[]; /** * Check if an adapter exists by ID. */ hasAdapter(adapterId: string): boolean; /** * Get an adapter by ID. */ getAdapter(adapterId: string): AnyAdapter | undefined; /** * Add an adapter at runtime. * Idempotent: adding an adapter with the same adapterId is a no-op. */ addAdapter(adapter: AnyAdapter): Promise; /** * Remove an adapter at runtime. * Idempotent: removing a non-existent adapter is a no-op. * * The sync protocol will naturally recover any "lost" state on the next * heartbeat or user-initiated sync. */ removeAdapter(adapterId: string): Promise; /** * Send an establishment message (establish-request or establish-response). * These messages can be sent to channels that are not yet established. */ sendEstablishmentMessage(envelope: AddressedEstablishmentEnvelope): number; /** * Send an established message (sync, directory, delete). * These messages can only be sent to channels that have been established. */ send(envelope: AddressedEstablishedEnvelope): number; /** * Reset all adapters and clear the manager. */ reset(): void; } /** * Context about the document being accessed. */ type DocContext = { id: DocId; doc: LoroDoc; }; /** * Context about the peer making the request. * Flat structure for ergonomic access (e.g., `peer.channelKind` not `peer.channel.kind`). */ type PeerContext = { peerId: PeerID; peerName?: string; peerType: "user" | "bot" | "service"; channelId: ChannelId; channelKind: ChannelKind; }; /** * Permissions control access to documents. * * Permissions are simple, synchronous predicates that determine what peers can do. * They run inside the synchronizer's TEA state machine. * * For advanced use cases (rate limiting, external auth, audit logging), * use middleware instead. * * @example * ```typescript * const repo = new Repo({ * permissions: { * visibility: (doc, peer) => doc.id.startsWith('public/'), * mutability: (doc, peer) => peer.peerType !== 'bot', * deletion: (doc, peer) => peer.peerType === 'service', * } * }) * ``` */ interface Permissions { /** * Who can discover this document exists? * * Called when: * - Responding to directory-request * - Propagating new documents to peers * - Sending sync-request to channels * * BYPASS: Skipped if peer is already subscribed to the document. * Rationale: Once a peer knows about a doc, you can't "un-reveal" it. * * @default () => true (reveal all docs) */ visibility(doc: DocContext, peer: PeerContext): boolean; /** * Who can modify this document? * * Called when: * - Receiving sync-response with document data * - Receiving channel/update messages * * NO BYPASS: Always checked, even for subscribed peers. * Rationale: Write permissions can change over time. * * @default () => true (allow all updates) */ mutability(doc: DocContext, peer: PeerContext): boolean; /** * Who can create new documents? * * Called when: * - Peer sends sync-request for a document that doesn't exist locally * * NO BYPASS: Always checked. * Rationale: Creation is a one-time event, no subscription context. * * Note: Receives only docId (not DocContext) since the document doesn't exist yet. * * @default () => true (allow all creation) */ creation(docId: DocId, peer: PeerContext): boolean; /** * Who can delete documents? * * Called when: * - Receiving channel/delete-request * - Local deletion request * * NO BYPASS: Always checked. * Rationale: Deletion is destructive and must always be authorized. * * @default () => false (deny all deletion - safe default) */ deletion(doc: DocContext, peer: PeerContext): boolean; } /** * Create a Permissions object with defaults for any unspecified permissions. * * @param permissions - Partial permissions to override defaults * @returns Complete Permissions object * * @example * ```typescript * // Only override what you need * const permissions = createPermissions({ * visibility: (doc, peer) => doc.id.startsWith('public/'), * }) * ``` */ declare function createPermissions(permissions?: Partial): Permissions; /** * Context available to middleware. * * The available fields depend on when the middleware runs: * - `peer` is always available * - `document` is available for document-specific messages * - `transmission` is available for sync-response and update messages */ type MiddlewareContext = { /** The incoming message */ message: ChannelMsg; /** Information about the peer sending the message */ peer: PeerContext; /** Document context (if message is document-specific) */ document?: DocContext; /** Transmission metadata (if message contains document data) */ transmission?: { type: "snapshot" | "update"; sizeBytes: number; }; }; /** * Result of a middleware check. */ type MiddlewareResult = { allow: true; } | { allow: false; reason?: string; }; /** * Middleware for advanced access control and cross-cutting concerns. * * Middleware runs BEFORE the synchronizer processes messages, at the async boundary. * Use middleware for: * - Rate limiting * - Size limits * - External auth service integration * - Audit logging * - Message transformation * * For simple permission checks, use `permissions` instead. * * @example * ```typescript * // Sync middleware: rate limiting * const rateLimiter: Middleware = { * name: 'rate-limiter', * requires: ['peer'], * check: (ctx) => { * const count = getRequestCount(ctx.peer.peerId) * return count < 100 ? { allow: true } : { allow: false, reason: 'rate-limited' } * } * } * * // Async middleware: external auth * const externalAuth: Middleware = { * name: 'external-auth', * requires: ['peer'], * check: async (ctx) => { * const allowed = await authService.canAccess(ctx.peer.peerId) * return allowed ? { allow: true } : { allow: false, reason: 'unauthorized' } * } * } * ``` */ interface Middleware { /** Name for logging and debugging */ name: string; /** * Declare what context fields this middleware needs. * * The system uses this to determine when to run the middleware: * - `['peer']` or `['document']` → runs pre-receive (before payload processed) * - `['transmission']` → runs post-receive (after payload available) * * If not specified, middleware runs for all messages. */ requires?: ("peer" | "document" | "transmission")[]; /** * Check whether the message should be allowed. * * Can be sync or async - the system handles both. * * @param ctx - Context about the message and peer * @returns Whether to allow the message, optionally with a reason for rejection */ check(ctx: MiddlewareContext): MiddlewareResult | Promise; } /** * Run all middleware checks for a message. * * Middleware runs in registration order and short-circuits on first rejection. * * @param middleware - Array of middleware to run * @param ctx - Context for the middleware * @param logger - Logger for debugging * @returns The result of the middleware chain */ declare function runMiddleware(middleware: Middleware[], ctx: MiddlewareContext, logger: Logger): Promise; /** * Synchronizer Program - Core orchestration for document discovery and synchronization * * This module implements the main state machine for the loro-extended synchronization protocol. * It follows The Elm Architecture (TEA) pattern with immutable updates via the mutative library. * * ## Architecture Overview * * The synchronizer uses a **pull-based discovery model** with two main message flows: * * 1. **Discovery Flow** (what documents exist): * - `directory-request/response` - Peers announce and discover documents * - Controlled by `permissions.visibility` * * 2. **Sync Flow** (transferring document data): * - `sync-request/response` - Peers explicitly request and receive document data * - Controlled by `permissions.mutability` * * ## Key Design Principles * * - **Separation of Concerns**: Discovery and sync are separate, explicit steps * - **Privacy by Design**: Permissions checked at every decision point * - **Symmetric Protocol**: Both peers use the same patterns (no client/server roles) * - **Pull-Based**: Peers announce documents, interested peers request them * * ## Message Flow Patterns * * ### Pattern 1: New Document Created * ``` * 1. local-doc-change triggered * 2. Send directory-response (announcement) to channels where visibility=true * 3. Interested peers send sync-request * 4. Send sync-response with document data * ``` * * ### Pattern 2: Existing Document Modified * ``` * 1. local-doc-change triggered * 2. If peer has previously requested (peerWantsUpdates=true): * - Send sync-response directly (real-time update) * 3. Otherwise: Send directory-response announcement * ``` * * ### Pattern 3: Peer Connection Established * ``` * 1. establish-request/response handshake * 2. Both peers send directory-request * 3. Both peers send sync-request for their own documents * 4. Discovery and sync happen in parallel * ``` * * @see docs/discovery-and-sync-architecture.md for detailed architecture documentation */ /** * The synchronizer's state model * * This represents the complete state of the synchronization system at any point in time. * All state updates are immutable (via mutative library). */ type SynchronizerModel = { /** Our own peer identity */ identity: PeerIdentityDetails; /** All documents we know about (local and synced from peers) */ documents: Map; /** All active channels (storage adapters, network peers) */ channels: Map; /** * Peer state tracking for reconnection optimization * * Tracks what each peer knows about our documents to enable: * - Optimized sync on reconnection (only send changed docs) * - Awareness-based message routing (announcements vs updates) */ peers: Map; }; /** * Messages that drive the synchronizer state machine * * These are the inputs to the update function. Each message triggers * a state transition and may produce commands as side effects. */ type SynchronizerMessage = { type: "synchronizer/heartbeat"; } | { type: "synchronizer/ephemeral-local-change"; docId: DocId; namespace: string; } | { type: "synchronizer/channel-added"; channel: ConnectedChannel; } | { type: "synchronizer/establish-channel"; channelId: ChannelId; } | { type: "synchronizer/channel-removed"; channel: Channel; } | { type: "synchronizer/doc-ensure"; docId: DocId; } | { type: "synchronizer/local-doc-change"; docId: DocId; } | { type: "synchronizer/doc-delete"; docId: DocId; } | { type: "synchronizer/doc-imported"; docId: DocId; fromPeerId: PeerID; } | { type: "synchronizer/channel-receive-message"; envelope: ReturnEnvelope; }; /** * Commands are side effects produced by the update function * * The synchronizer is pure - it doesn't perform side effects directly. * Instead, it returns commands that the runtime executes. */ type Command = { type: "cmd/stop-channel"; channel: Channel; } | { type: "cmd/send-establishment-message"; envelope: AddressedEstablishmentEnvelope; } | { type: "cmd/send-message"; envelope: AddressedEstablishedEnvelope; } | { type: "cmd/send-sync-response"; docId: DocId; requesterDocVersion: VersionVector; toChannelId: ChannelId; /** Whether to include ephemeral snapshot in the response */ includeEphemeral?: boolean; } | { type: "cmd/send-sync-request"; toChannelId: ChannelId; docs: { docId: DocId; requesterDocVersion: VersionVector; }[]; bidirectional: boolean; /** Whether to include ephemeral data for each doc in the request */ includeEphemeral?: boolean; } | { type: "cmd/subscribe-doc"; docId: DocId; } | { type: "cmd/import-doc-data"; docId: DocId; data: Uint8Array; fromPeerId: PeerID; } | { type: "cmd/apply-ephemeral"; docId: DocId; stores: EphemeralStoreData[]; } | { /** Broadcast a single namespace's ephemeral data for a document */ type: "cmd/broadcast-ephemeral-namespace"; docId: DocId; namespace: string; hopsRemaining: number; toChannelIds: ChannelId[]; } | { /** * Macro command: expands into multiple cmd/broadcast-ephemeral-namespace commands. * Used by heartbeat to broadcast all namespaces for multiple docs to a single peer. * Each sub-command queues messages; the deferred send layer aggregates them at flush time. */ type: "cmd/broadcast-ephemeral-batch"; docIds: DocId[]; hopsRemaining: number; toChannelId: ChannelId; } | { type: "cmd/remove-ephemeral-peer"; peerId: PeerID; } | { type: "cmd/emit-ephemeral-change"; docId: DocId; } | { type: "cmd/dispatch"; dispatch: SynchronizerMessage; } | { type: "cmd/batch"; commands: Command[]; }; /** * Events that the Synchronizer can emit. * This type is used by command handlers to emit events. */ type SynchronizerEvents = { "ready-state-changed": { docId: string; readyStates: ReadyState[]; }; "ephemeral-change": { docId: string; source: "local" | "remote"; keys?: string[]; peerId?: string; }; }; type HandleUpdateFn = (patches: Patch[]) => void; type SynchronizerParams = { identity: PeerIdentityDetails; adapters?: AnyAdapter[]; permissions?: Permissions; middleware?: Middleware[]; onUpdate?: HandleUpdateFn; logger?: Logger; }; type SynchronizerUpdate = (msg: SynchronizerMessage, model: SynchronizerModel) => [SynchronizerModel, Command?]; declare class Synchronizer { #private; readonly identity: PeerIdentityDetails; readonly adapters: AdapterManager; readonly logger: Logger; readonly updateFn: SynchronizerUpdate; /** * Per-doc namespaced ephemeral stores (unified model). * Internal getter used by command execution. */ get docNamespacedStores(): Map>; readonly emitter: Emittery__default; readonly readyStates: Map; model: SynchronizerModel; constructor({ identity, adapters, permissions, middleware, onUpdate, logger: preferredLogger, }: SynchronizerParams); startHeartbeat(): void; stopHeartbeat(): void; /** * Get the number of middleware configured (for debugging). */ get middlewareCount(): number; /** * Handle incoming channel messages. * * Uses a unified work queue to prevent recursion when adapters deliver messages * synchronously (e.g., BridgeAdapter, StorageAdapter). Messages are queued * and processed iteratively rather than recursively. * * If middleware is configured, it runs BEFORE the synchronizer processes the message. * Middleware can reject messages (e.g., rate limiting, auth). * * @param channelId - The channel ID (we look up the current channel from the model * to ensure we have the latest state, since the model uses immutable updates) * @param message - The message received on the channel */ channelReceive(channelId: ChannelId, message: ChannelMsg): void; channelAdded(channel: ConnectedChannel): void; channelEstablish(channel: ConnectedChannel): void; channelRemoved(channel: Channel): void; /** * Get or create a namespaced ephemeral store for a document. * This is used for the new unified ephemeral store model. * * @param docId The document ID * @param namespace The store namespace (e.g., 'presence', 'cursors', 'mouse') * @returns The ephemeral store for this namespace */ getOrCreateNamespacedStore(docId: DocId, namespace: string): EphemeralStore; /** * Register an external ephemeral store for network sync. * Use this for libraries that bring their own EphemeralStore (like loro-prosemirror). * * @param docId The document ID * @param namespace The store namespace * @param store The external EphemeralStore to register */ registerExternalStore(docId: DocId, namespace: string, store: EphemeralStore): void; /** * Get a namespaced store by name. * * @param docId The document ID * @param namespace The store namespace * @returns The EphemeralStore or undefined if not found */ getNamespacedStore(docId: DocId, namespace: string): EphemeralStore | undefined; /** * Broadcast a namespaced store to all peers. * This is called explicitly by the Handle when local changes are made. * * Routes through dispatch for TEA compliance and message aggregation. * * @param docId The document ID * @param namespace The store namespace */ broadcastNamespacedStore(docId: DocId, namespace: string): void; /** * Add an adapter at runtime. * Idempotent: adding an adapter with the same adapterId is a no-op. */ addAdapter(adapter: AnyAdapter): Promise; /** * Remove an adapter at runtime. * Idempotent: removing a non-existent adapter is a no-op. */ removeAdapter(adapterId: string): Promise; /** * Check if an adapter exists by ID. */ hasAdapter(adapterId: string): boolean; /** * Get an adapter by ID. */ getAdapter(adapterId: string): AnyAdapter | undefined; getOrCreateDocumentState(docId: DocId): DocState; getDocumentState(docId: DocId): DocState | undefined; /** * Get docIds that a channel's peer has subscribed to */ getChannelDocIds(channelId: ChannelId): DocId[]; /** * Get all peers */ getPeers(): PeerState[]; /** * Get the current ready states for a document */ getReadyStates(docId: DocId): ReadyState[]; /** * Wait until a docId is "ready", with "ready" meaning any number of flexible things: * e.g. * - the document has been loaded from a storage channel * - the document has been loaded from the server * - with regard to the document, all channels (peers) have responded * * All of this flexibility is achieved by allowing you to pass a "predicate" function * that returns true or false depending on your needs. * * @param docId The document ID under test for the predicate * @param predicate A condition to wait for--the predicate is passed a ReadyState[] array */ waitUntilReady(docId: DocId, predicate: (readyStates: ReadyState[]) => boolean): Promise; /** * Remove a document from the synchronizer and send delete messages to all channels. * * The dispatch handles both: * 1. Removing the document from the model * 2. Sending delete-request messages to all subscribed peers (via deferred send) */ removeDocument(docId: DocId): Promise; reset(): Promise; /** * Get the current model state (for debugging purposes). * Returns a deep copy to prevent accidental mutations. */ getModelSnapshot(): SynchronizerModel; } /** * Custom predicate for determining readiness. */ type ReadinessCheck = (readyStates: ReadyState[]) => boolean; /** * Options for waitForSync(). */ type WaitForSyncOptions = { /** * The kind of channel to wait for. * @default "network" */ kind?: "network" | "storage"; /** * Timeout in milliseconds. Set to 0 to disable timeout. * @default 30000 */ timeout?: number; /** * Optional AbortSignal for cancellation. * If aborted, the promise rejects with an AbortError. */ signal?: AbortSignal; }; /** * Error thrown when waitForSync() times out. */ declare class SyncTimeoutError extends Error { readonly kind: "network" | "storage"; readonly timeoutMs: number; readonly docId: string; readonly lastSeenStates?: ReadyState[] | undefined; constructor(kind: "network" | "storage", timeoutMs: number, docId: string, lastSeenStates?: ReadyState[] | undefined); } /** * Error thrown when waitForSync() is called but no adapters of the requested kind exist. */ declare class NoAdaptersError extends Error { readonly kind: "network" | "storage"; readonly docId: string; constructor(kind: "network" | "storage", docId: string); } /** * Shape for ephemeral store declarations. * Each key becomes a TypedEphemeral property on the handle. */ type EphemeralDeclarations = Record; /** * TypedEphemeral provides type-safe access to an ephemeral store. * All ephemeral stores are shared key-value stores where keys can be anything * (often peerIds, but not required). */ interface TypedEphemeral { /** Set a value for any key */ set(key: string, value: T): void; /** Get a value by key */ get(key: string): T | undefined; /** Get all key-value pairs */ getAll(): Map; /** Delete a key */ delete(key: string): void; /** Get my value: equivalent to get(myPeerId) */ readonly self: T | undefined; /** Set my value: equivalent to set(myPeerId, value) */ setSelf(value: T): void; /** Get all peers except me */ readonly peers: Map; /** Subscribe to changes */ subscribe(cb: (event: { key: string; value: T | undefined; source: "local" | "remote" | "initial"; }) => void): () => void; /** Access the underlying loro-crdt EphemeralStore */ readonly raw: EphemeralStore; } /** * Creates a TypedEphemeral wrapper around an EphemeralStore. * * Note: Broadcasting is handled automatically by the Synchronizer's subscription * to the store. When store.set() is called, the subscription fires with * by='local' and triggers the broadcast. */ declare function createTypedEphemeral(store: EphemeralStore, myPeerId: string, _shape: ValueShape): TypedEphemeral; /** * Parameters for creating a Handle. */ type HandleParams = { docId: DocId; docShape: D; ephemeralShapes?: E; synchronizer: Synchronizer; logger?: Logger; }; /** * A unified handle to a Loro document with typed ephemeral stores. * * This class provides: * - Type-safe document access via `.doc` (always a TypedDoc) * - Type-safe ephemeral store access via declared store names * - External store integration via `addEphemeral()` / `getEphemeral()` * - Sync infrastructure (readyStates, waitUntilReady, etc.) * * The Handle delegates ephemeral store management to the Synchronizer, * which is the single source of truth for all stores. * * @typeParam D - The document shape (use Shape.any() for untyped) * @typeParam E - The ephemeral store declarations */ declare class Handle> { /** * The document ID. */ readonly docId: DocId; /** * The peer ID of the local peer. */ readonly peerId: string; /** * The Synchronizer for network operations. * This is the single source of truth for ephemeral stores. */ private readonly synchronizer; /** * Logger instance. */ private readonly logger; /** * The document shape. */ private readonly _docShape; /** * The typed document. */ private readonly _doc; /** * Ephemeral shapes for declared stores. * Used to create TypedEphemeral wrappers on-demand. */ private readonly _ephemeralShapes; /** * Cache for TypedEphemeral wrappers. * Created on-demand and cached for performance. */ private readonly _typedEphemeralCache; constructor({ docId, docShape, ephemeralShapes, synchronizer, logger, }: HandleParams); /** * Get or create a TypedEphemeral wrapper for a store. * The wrapper is cached for performance. */ private _getOrCreateTypedEphemeral; /** * The strongly-typed document. * Always returns a TypedDoc - use Shape.any() for untyped access. * Access raw LoroDoc via getLoroDoc() for untyped operations. */ get doc(): TypedDoc; /** * Get the underlying LoroDoc for direct, untyped access. * Use this when you need to perform operations not supported by the typed API, * or when working with Shape.any() documents. * * @returns The raw LoroDoc instance * * @example * ```typescript * const handle = repo.get('my-doc', Shape.any()) * handle.loroDoc.getMap('root').set('key', 'value') * ``` */ get loroDoc(): LoroDoc; /** * Convenience method: change a set of mutations in a single commit. */ change(fn: (draft: Mutable) => void): TypedDoc; /** * Subscribe to all changes on the document. * * The listener receives a `LoroEventBatch` from loro-crdt containing: * - `by`: The origin of the change ("local", "import", or "checkout") * - `origin`: Optional string identifying the change source * - `currentTarget`: The container ID of the event receiver (undefined for root doc) * - `events`: Array of `LoroEvent` objects with container diffs * - `from`: The frontiers before the change * - `to`: The frontiers after the change * * @param listener - Callback invoked on each document change * @returns Unsubscribe function */ subscribe(listener: Listener): () => void; /** * Subscribe to changes at a specific path using the type-safe DSL. * * The callback receives: * - `value`: The current value at the path (properly typed) * - `prev`: The previous value (undefined on first call) * * This uses two-stage filtering: * 1. WASM-side: subscribeJsonpath for efficient path matching * 2. JS-side: Deep equality check to filter false positives * * @param selector - Path selector function using the DSL * @param listener - Callback receiving the typed value and previous value * @returns Unsubscribe function * * @example * ```typescript * handle.subscribe( * p => p.books.$each.title, * (titles, prev) => { * console.log("Titles changed from", prev, "to", titles) * } * ) * ``` */ subscribe(selector: (path: PathBuilder) => PathSelector, listener: (value: T, prev: T | undefined) => void): () => void; /** * Subscribe to changes that may affect a JSONPath query (escape hatch). * * Use this for complex queries not expressible in the DSL (filters, etc.). * Note: No type safety - callback receives unknown[]. * * @param jsonpath - JSONPath expression (e.g., "$.users[*].name") * @param listener - Callback receiving the query result * @returns Unsubscribe function * * @example * ```typescript * // Subscribe to changes affecting books with price > 10 * const unsubscribe = handle.subscribe( * "$.books[?@.price>10].title", * (titles) => { * console.log("Expensive book titles:", titles); * } * ); * ``` */ subscribe(jsonpath: string, listener: (value: unknown[]) => void): () => void; /** * Execute a JSONPath query against the document. * * This is a general-purpose method for querying the document with full * JSONPath expressiveness. Use this for ad-hoc queries or within callbacks. * * @example * ```typescript * const expensiveBooks = handle.jsonPath("$.books[?@.price>10]") * const allTitles = handle.jsonPath("$..title") * ``` */ jsonPath(path: string): unknown[]; /** * Get a typed ephemeral store by name. * Only works for stores declared in ephemeralShapes. */ getTypedEphemeral(name: K): TypedEphemeral>; /** * Add an external ephemeral store for network sync. * Use this for libraries that bring their own EphemeralStore (like loro-prosemirror). * * @param name - The store name (namespace) * @param store - The EphemeralStore to register */ addEphemeral(name: string, store: EphemeralStore): void; /** * Get a raw ephemeral store by name. * Delegates to Synchronizer which is the single source of truth. * * @param name - The store name * @returns The EphemeralStore or undefined if not found */ getEphemeral(name: string): EphemeralStore | undefined; /** * Get the current ready states for this document. */ get readyStates(): ReadyState[]; /** * Subscribe to ready state changes. * @param cb Callback that receives the new ready states * @returns Unsubscribe function */ onReadyStateChange(cb: (readyStates: ReadyState[]) => void): () => void; /** * Wait until the document meets custom readiness criteria. * @param predicate Function that determines if the document is ready */ waitUntilReady(predicate: ReadinessCheck): Promise>; /** * Wait for sync to complete with a peer of the specified kind. * * Resolves when we've completed the sync handshake with a peer: * - Received document data (peer state = "loaded") * - Peer confirmed it doesn't have the document (peer state = "absent") * * This enables the common "initializeIfEmpty" pattern: * ```typescript * await handle.waitForSync() * if (handle.loroDoc.opCount() === 0) { * // Server doesn't have it, safe to initialize * initializeDocument(handle) * } * ``` * * @param options - Configuration options * @param options.kind - The kind of channel to wait for ("network" or "storage"). Default: "network" * @param options.timeout - Timeout in milliseconds. Set to 0 to disable. Default: 30000 * @param options.signal - Optional AbortSignal for cancellation * @throws {NoAdaptersError} If no adapters of the requested kind are configured * @throws {SyncTimeoutError} If the timeout is reached before sync completes * @throws {DOMException} If the signal is aborted (name: "AbortError") */ waitForSync(options?: WaitForSyncOptions): Promise>; /** * Creates a predicate for checking sync completion with a peer of the specified kind. */ private createSyncPredicate; } /** * Type helper to extract ephemeral store types from a Handle. * This allows accessing declared ephemeral stores as properties. */ type HandleWithEphemerals = Handle & { [K in keyof E]: TypedEphemeral>; }; /** * Creates a Handle with ephemeral stores accessible as properties. */ declare function createHandle>(params: HandleParams): HandleWithEphemerals; /** * Rate Limiter Middleware * * Provides configurable rate limiting for incoming messages. * Uses a sliding window algorithm for smooth rate limiting. * * ## Features * * - **Per-peer rate limiting** - Each peer has their own rate limit * - **Sliding window** - Smooth rate limiting without burst issues * - **Configurable limits** - Set max requests per time window * - **Burst allowance** - Optional burst capacity for legitimate spikes * - **Auto-cleanup** - Removes stale peer data to prevent memory leaks * * ## Usage * * ```typescript * import { createRateLimiter } from '@loro-extended/repo' * * const repo = new Repo({ * middleware: [ * createRateLimiter({ * maxRequests: 100, // Max 100 requests * windowMs: 60000, // Per minute * burstAllowance: 10, // Allow 10 extra in bursts * }) * ] * }) * ``` * * @module */ /** * Configuration options for the rate limiter. */ type RateLimiterOptions = { /** * Maximum number of requests allowed per window. * @default 100 */ maxRequests?: number; /** * Time window in milliseconds. * @default 60000 (1 minute) */ windowMs?: number; /** * Additional burst capacity above maxRequests. * Allows short bursts of traffic without rejection. * @default 0 */ burstAllowance?: number; /** * How often to clean up stale peer data (in milliseconds). * Set to 0 to disable auto-cleanup. * @default 300000 (5 minutes) */ cleanupIntervalMs?: number; /** * Custom key function to determine rate limit grouping. * By default, rate limits are per-peer. * @default (ctx) => ctx.peer.peerId */ keyFn?: (ctx: MiddlewareContext) => string; /** * Custom handler for rate-limited requests. * Called when a request is rejected due to rate limiting. */ onRateLimited?: (ctx: MiddlewareContext, info: RateLimitInfo) => void; }; /** * Information about the current rate limit state. */ type RateLimitInfo = { /** Current request count in the window */ current: number; /** Maximum allowed requests */ limit: number; /** Milliseconds until the window resets */ resetMs: number; /** Whether this request was allowed */ allowed: boolean; }; /** * Internal state for tracking request counts. */ type PeerRateState = { /** Timestamps of requests in the current window */ timestamps: number[]; /** Last activity time (for cleanup) */ lastActivity: number; }; /** * Extended middleware interface with internal state access for testing. */ interface RateLimiterMiddleware extends Middleware { /** Internal state map (for testing) */ getState(): Map; /** Cleanup function to stop intervals and clear state */ cleanup(): void; } /** * Creates a rate limiter middleware with the specified options. * * @param options - Configuration options * @returns A middleware that enforces rate limits * * @example * ```typescript * // Basic usage: 100 requests per minute * const rateLimiter = createRateLimiter({ * maxRequests: 100, * windowMs: 60000, * }) * * // With burst allowance * const rateLimiter = createRateLimiter({ * maxRequests: 100, * windowMs: 60000, * burstAllowance: 20, // Allow up to 120 in bursts * }) * * // Custom key function (rate limit by document instead of peer) * const rateLimiter = createRateLimiter({ * maxRequests: 50, * windowMs: 60000, * keyFn: (ctx) => ctx.document?.id ?? 'unknown', * }) * ``` */ declare function createRateLimiter(options?: RateLimiterOptions): RateLimiterMiddleware; /** * Extended middleware interface for message type rate limiter. */ interface MessageTypeRateLimiterMiddleware extends Middleware { /** Cleanup function to stop intervals and clear state */ cleanup(): void; } /** * Creates a rate limiter that limits by message type. * Useful for limiting specific operations (e.g., sync-requests). * * @param messageTypes - Array of message types to rate limit * @param options - Rate limiter options * @returns A middleware that only rate limits specified message types * * @example * ```typescript * // Limit sync-requests to 10 per minute * const syncLimiter = createMessageTypeRateLimiter( * ['channel/sync-request'], * { maxRequests: 10, windowMs: 60000 } * ) * ``` */ declare function createMessageTypeRateLimiter(messageTypes: string[], options?: RateLimiterOptions): MessageTypeRateLimiterMiddleware; interface RepoParams { identity?: Partial; adapters?: AnyAdapter[]; /** * Permissions control access to documents. * * Permissions are simple, synchronous predicates that determine what peers can do. * For advanced use cases (rate limiting, external auth, audit logging), * use middleware instead. * * @example * ```typescript * const repo = new Repo({ * permissions: { * visibility: (doc, peer) => doc.id.startsWith('public/'), * mutability: (doc, peer) => peer.peerType !== 'bot', * deletion: (doc, peer) => peer.peerType === 'service', * } * }) * ``` */ permissions?: Partial; /** * Middleware for advanced access control and cross-cutting concerns. * * Middleware runs BEFORE the synchronizer processes messages, at the async boundary. * Use middleware for: * - Rate limiting * - Size limits * - External auth service integration * - Audit logging * * For simple permission checks, use `permissions` instead. * * @example * ```typescript * const repo = new Repo({ * middleware: [ * { * name: 'rate-limiter', * requires: ['peer'], * check: (ctx) => { * const count = getRequestCount(ctx.peer.peerId) * return count < 100 ? { allow: true } : { allow: false, reason: 'rate-limited' } * } * } * ] * }) * ``` */ middleware?: Middleware[]; onUpdate?: HandleUpdateFn; } /** * The Repo class is the central orchestrator for the Loro state synchronization system. * It manages the lifecycle of documents, coordinates subsystems, and provides the main * public API for document operations. * * With the simplified DocHandle architecture, Repo becomes a simpler orchestrator * that wires together the various subsystems without complex state management. * * Adapters are used to indicate how to retrieve doc state (updates, sync, etc.) from * storage or network systems. */ declare class Repo { #private; readonly logger: Logger; readonly identity: PeerIdentityDetails; constructor({ identity, adapters, permissions, middleware, onUpdate, }?: RepoParams); /** * Gets (or creates) a unified handle with typed document and ephemeral stores. * * This is the primary API for accessing documents. It supports: * - Typed documents (use Shape.any() for untyped) * - Multiple typed ephemeral stores * - External store integration via handle.addEphemeral() * * @param docId The document ID * @param docShape The shape of the document (use Shape.any() for untyped) * @param ephemeralShapes Optional ephemeral store declarations * @returns A Handle with typed document and ephemeral store access * * @example * ```typescript * // Typed document with typed ephemeral stores * const handle = repo.get('my-doc', DocSchema, { * presence: PresenceSchema, * cursors: CursorSchema * }) * handle.change(draft => { draft.title = 'Hello' }) * handle.presence.setSelf({ status: 'online' }) * * // Untyped document with typed ephemeral stores * const handle = repo.get('my-doc', Shape.any(), { * cursors: CursorSchema * }) * handle.loroDoc.getMap('root').set('key', 'value') * handle.cursors.setSelf({ position: 42 }) * ``` */ get>(docId: DocId, docShape: D, ephemeralShapes?: E): HandleWithEphemerals; /** * Check if a document exists in the repo. * @param docId The document ID * @returns true if the document exists */ has(docId: DocId): boolean; /** * Deletes a document from the repo. * @param docId The ID of the document to delete */ delete(docId: DocId): Promise; /** * Disconnects all network adapters and cleans up resources. * This should be called when the Repo is no longer needed. */ reset(): void; /** * Add an adapter at runtime. * Idempotent: adding an adapter with the same adapterId is a no-op. */ addAdapter(adapter: AnyAdapter): Promise; /** * Remove an adapter at runtime. * Idempotent: removing a non-existent adapter is a no-op. */ removeAdapter(adapterId: string): Promise; /** * Check if an adapter exists by ID. */ hasAdapter(adapterId: string): boolean; /** * Get an adapter by ID. */ getAdapter(adapterId: string): AnyAdapter | undefined; /** * Get all current adapters. */ get adapters(): AnyAdapter[]; get synchronizer(): Synchronizer; } type StorageKey = string[]; type Chunk = { key: StorageKey; data: Uint8Array; }; /** * A base class for storage adapters. * * This base class extends Adapter and handles all channel communication * behind the scenes. Subclasses only need to implement the following storage * operations, and do not need specialized knowledge of Adapter message protocol: * - load, save, remove * - loadRange, removeRange * * The base class automatically: * - Creates a single channel for storage operations * - Responds to channel establishment requests * - Responds to document sync requests * - Translates channel messages into storage operations * * The StorageAdapter essentially mimics what would happen if there were another * repo to communicate with, but instead intercepts and responds with appropriate * messages itself. */ declare abstract class StorageAdapter extends Adapter { /** * Storage adapters always create storage channels. * This overrides the default "network" kind from the base Adapter class. */ readonly kind: "storage"; protected storageChannel?: ConnectedChannel; private lastTimestamp; private counter; private readonly storagePeerId; /** * Generate channel actions for storage operations. * The kind and adapterType are automatically added by the base class. */ protected generate(): GeneratedChannel; /** * Start the storage adapter by creating its single channel. * Storage is always "ready" - no async initialization needed. */ onStart(): Promise; /** * Stop the storage adapter and clean up resources. */ onStop(): Promise; /** * Handle incoming channel messages and translate them into storage operations. */ private handleChannelMessage; /** * Handle batched messages by dispatching each one. */ private handleBatch; /** * Automatically respond to establishment requests. * Storage has no concept of "connection establishment" - it's always ready. * We immediately respond with our identity so the channel becomes established. * * NOTE: We intentionally do NOT call requestStoredDocuments() here. * Storage is lazy-loaded - documents are loaded on-demand when network clients * request them via the storage-first sync mechanism. This approach: * - Scales to millions of documents * - Reduces startup time * - Avoids race conditions with eager loading */ private handleEstablishRequest; /** * Handle sync requests by loading documents from storage. * * This implementation: * 1. Loads snapshot + incremental updates using loadRange * 2. Reconstructs document in temporary LoroDoc (order doesn't matter - Loro handles it) * 3. Uses requesterDocVersion to export only needed changes * 4. Enables efficient incremental sync * * WARNING: This implementation loads all chunks for a document into memory at once. * For very large documents or documents with long histories, this could lead to * high memory usage. A future improvement would be to stream chunks or use * a more memory-efficient reconstruction strategy. */ private handleSyncRequest; /** * Handle sync responses by saving document updates to storage. * This is called once in response to a sync-request. */ private handleSyncResponse; /** * Handle ongoing updates from subscribed documents. * This is called when a document changes after the initial sync. */ private handleUpdate; /** * Handle directory requests by listing available documents. */ private handleDirectoryRequest; /** * Handle new-doc announcements by eagerly requesting documents. * Storage adapters are "eager" - they automatically request all announced documents. */ private handleNewDoc; /** * Handle delete requests by removing documents from storage. */ private handleDeleteRequest; /** * Save document data to storage with a unique timestamped key. */ private saveDocumentData; /** * Send a reply message through the storage channel. * Throws an error if the channel is not properly initialized. * * Delivers messages synchronously. The Synchronizer's receive queue handles * recursion prevention by queuing messages and processing them iteratively. */ private reply; /** * Check which of the given docIds are available in storage. */ private checkDocIds; /** * Reply with sync request(s) for the given documents. * Uses channel/batch if multiple documents, single message otherwise. */ private replyWithSyncRequest; /** * Reply with a sync response containing document data. */ private replyWithSyncResponse; /** * Reply that the requester already has the latest version. */ private replyUpToDate; /** * Reply that the document is not available. */ private replyUnavailable; /** * Reply with a directory listing of available docIds. */ private replyWithDirectoryResponse; /** * Reply with the result of a delete operation. */ private replyWithDeleteResponse; /** Load a binary blob for a given key. */ abstract load(key: StorageKey): Promise; /** Save a binary blob to a given key. */ abstract save(key: StorageKey, data: Uint8Array): Promise; /** Remove a binary blob from a given key. */ abstract remove(key: StorageKey): Promise; /** Load all chunks whose keys begin with the given prefix. */ abstract loadRange(keyPrefix: StorageKey): Promise; /** Remove all chunks whose keys begin with the given prefix. */ abstract removeRange(keyPrefix: StorageKey): Promise; } declare class InMemoryStorageAdapter extends StorageAdapter { #private; constructor(sharedDataOrOptions?: Map | { sharedData?: Map; adapterType?: string; /** * Unique identifier for this adapter instance. * If not provided, auto-generated as `{adapterType}-{uuid}`. */ adapterId?: string; }); /** * Get the underlying storage map for sharing between instances */ getStorage(): Map; load(key: StorageKey): Promise; save(key: StorageKey, data: Uint8Array): Promise; remove(key: StorageKey): Promise; loadRange(keyPrefix: StorageKey): Promise; removeRange(keyPrefix: StorageKey): Promise; } /** * Generate a cryptographically random PeerID. * * PeerID must be an unsigned 64-bit integer represented as a decimal string. * This function uses crypto.getRandomValues to generate a random 64-bit value * that is globally unique with extremely high probability. * * @returns A random PeerID suitable for use with Loro * * @example * ```typescript * const peerId = generatePeerId() * doc.setPeerId(peerId) * ``` */ declare function generatePeerId(): PeerID; /** * Generate a UUID v4 string. * * Uses `crypto.randomUUID()` when available (secure contexts: HTTPS or localhost). * Falls back to a `crypto.getRandomValues()` based implementation for non-secure * contexts (e.g., HTTP on LAN IP addresses). * * @returns A UUID v4 string (e.g., "550e8400-e29b-41d4-a716-446655440000") */ declare function generateUUID(): string; /** * Validates that a peerId is compatible with Loro's `${number}` PeerID type. * A valid peerId must be a string representing a non-negative integer (unsigned 64-bit). * * @param peerId - The peerId string to validate * @throws Error if the peerId is not a valid numeric string */ declare function validatePeerId(peerId: string): asserts peerId is PeerID; export { Adapter, type AdapterContext, type AdapterHooks, type AdapterType, type AddressedEnvelope, type AddressedEstablishedEnvelope, type AddressedEstablishmentEnvelope, type AnyAdapter, type BatchableMsg, type BatchableMsgJSON, type BinaryDataJSON, Bridge, BridgeAdapter, type Channel, type ChannelActions, type ChannelId, type ChannelIdentity, type ChannelKind, type ChannelLifecycle, type ChannelMeta, type ChannelMsg, type ChannelMsgBatch, type ChannelMsgDeleteRequest, type ChannelMsgDeleteResponse, type ChannelMsgDirectoryRequest, type ChannelMsgDirectoryResponse, type ChannelMsgEphemeral, type ChannelMsgEstablishRequest, type ChannelMsgEstablishResponse, type ChannelMsgJSON, type ChannelMsgNewDoc, type ChannelMsgSyncRequest, type ChannelMsgSyncResponse, type ChannelMsgUpdate, type Chunk, type ConnectedChannel, DelayedNetworkAdapter, type DelayedNetworkAdapterOptions, type DocContent, type DocContext, type DocId, type DocState, type EphemeralDeclarations, type EphemeralPeerDataJSON, type EphemeralStoreData, type EphemeralStoreDataJSON, type EstablishedChannel, type EstablishedMsg, type EstablishmentMsg, type GenerateFn, type GeneratedChannel, type GeneratedChannelActions, Handle, type HandleSendFn, type HandleWithEphemerals, InMemoryStorageAdapter, type LoroDocMutator, type MessageTypeRateLimiterMiddleware, type Middleware, type MiddlewareContext, type MiddlewareResult, NoAdaptersError, type PeerContext, type PeerDocSyncState, type PeerIdentityDetails, type PeerState, type PendingNetworkRequest, type Permissions, type RateLimitInfo, type RateLimiterMiddleware, type RateLimiterOptions, type ReadinessCheck, type ReadyState, type ReadyStateChannelMeta, type ReceiveFn, Repo, type RepoParams, type ReturnEnvelope, type SendInterceptor, type SendInterceptorContext, StorageAdapter, type StorageKey, SyncTimeoutError, type SyncTransmission, type SyncTransmissionJSON, type TypedEphemeral, type VersionVectorJSON, type WaitForSyncOptions, createDocState, createHandle, createMessageTypeRateLimiter, createPermissions, createRateLimiter, createTypedEphemeral, deserializeChannelMsg, generatePeerId, generateUUID, isEstablished, isEstablishedMsg, isEstablishmentMsg, runMiddleware, serializeChannelMsg, uint8ArrayFromJSON, uint8ArrayToJSON, validatePeerId, versionVectorFromJSON, versionVectorToJSON };