import Logger, { ILogger } from 'js-logger'; import { InternalProgressInformation } from '../../../db/crud/SyncProgress.js'; import { SyncStatus, SyncStatusOptions } from '../../../db/crud/SyncStatus.js'; import { AbortOperation } from '../../../utils/AbortOperation.js'; import { BaseListener, BaseObserver, BaseObserverInterface, Disposable } from '../../../utils/BaseObserver.js'; import { throttleLeadingTrailing } from '../../../utils/async.js'; import { BucketChecksum, BucketDescription, BucketStorageAdapter, Checkpoint, PowerSyncControlCommand } from '../bucket/BucketStorageAdapter.js'; import { CrudEntry } from '../bucket/CrudEntry.js'; import { SyncDataBucket } from '../bucket/SyncDataBucket.js'; import { AbstractRemote, FetchStrategy, SyncStreamOptions } from './AbstractRemote.js'; import { EstablishSyncStream, Instruction, coreStatusToJs } from './core-instruction.js'; import { BucketRequest, StreamingSyncLine, StreamingSyncLineOrCrudUploadComplete, StreamingSyncRequestParameterType, isStreamingKeepalive, isStreamingSyncCheckpoint, isStreamingSyncCheckpointComplete, isStreamingSyncCheckpointDiff, isStreamingSyncCheckpointPartiallyComplete, isStreamingSyncData } from './streaming-sync-types.js'; import { extractBsonObjects, extractJsonLines, injectable, InjectableIterator, map, SimpleAsyncIterator } from '../../../utils/stream_transform.js'; import type { BSON } from 'bson'; export enum LockType { CRUD = 'crud', SYNC = 'sync' } export enum SyncStreamConnectionMethod { HTTP = 'http', WEB_SOCKET = 'web-socket' } export enum SyncClientImplementation { /** * Decodes and handles sync lines received from the sync service in JavaScript. * * This is the default option. * * @deprecated We recommend the {@link RUST} client implementation for all apps. If you have issues with * the Rust client, please file an issue or reach out to us. The JavaScript client will be removed in a future * version of the PowerSync SDK. */ JAVASCRIPT = 'js', /** * This implementation offloads the sync line decoding and handling into the PowerSync * core extension. * * This option is more performant than the {@link JAVASCRIPT} client, enabled by default and the * recommended client implementation for all apps. * * ## Compatibility warning * * The Rust sync client stores sync data in a format that is slightly different than the one used * by the old {@link JAVASCRIPT} implementation. When adopting the {@link RUST} client on existing * databases, the PowerSync SDK will migrate the format automatically. * Further, the {@link JAVASCRIPT} client in recent versions of the PowerSync JS SDK (starting from * the version introducing {@link RUST} as an option) also supports the new format, so you can switch * back to {@link JAVASCRIPT} later. * * __However__: Upgrading the SDK version, then adopting {@link RUST} as a sync client and later * downgrading the SDK to an older version (necessarily using the JavaScript-based implementation then) * can lead to sync issues. */ RUST = 'rust' } /** * The default {@link SyncClientImplementation} to use, {@link SyncClientImplementation.RUST}. */ export const DEFAULT_SYNC_CLIENT_IMPLEMENTATION = SyncClientImplementation.RUST; /** * Abstract Lock to be implemented by various JS environments */ export interface LockOptions { callback: () => Promise; type: LockType; signal?: AbortSignal; } export interface AbstractStreamingSyncImplementationOptions extends RequiredAdditionalConnectionOptions { adapter: BucketStorageAdapter; subscriptions: SubscribedStream[]; uploadCrud: () => Promise; /** * An identifier for which PowerSync DB this sync implementation is * linked to. Most commonly DB name, but not restricted to DB name. */ identifier?: string; logger?: ILogger; remote: AbstractRemote; } export interface StreamingSyncImplementationListener extends BaseListener { /** * Triggered whenever a status update has been attempted to be made or * refreshed. */ statusUpdated?: ((statusUpdate: SyncStatusOptions) => void) | undefined; /** * Triggers whenever the status' members have changed in value */ statusChanged?: ((status: SyncStatus) => void) | undefined; } /** * Configurable options to be used when connecting to the PowerSync * backend instance. */ export type PowerSyncConnectionOptions = Omit; export interface InternalConnectionOptions extends BaseConnectionOptions, AdditionalConnectionOptions {} /** @internal */ export interface BaseConnectionOptions { /** * A set of metadata to be included in service logs. */ appMetadata?: Record; /** * Whether to use a JavaScript implementation to handle received sync lines from the sync * service, or whether this work should be offloaded to the PowerSync core extension. * * This defaults to the JavaScript implementation ({@link SyncClientImplementation.JAVASCRIPT}) * since the ({@link SyncClientImplementation.RUST}) implementation is experimental at the moment. */ clientImplementation?: SyncClientImplementation; /** * The connection method to use when streaming updates from * the PowerSync backend instance. * Defaults to a HTTP streaming connection. */ connectionMethod?: SyncStreamConnectionMethod; /** * The fetch strategy to use when streaming updates from the PowerSync backend instance. */ fetchStrategy?: FetchStrategy; /** * These parameters are passed to the sync rules, and will be available under the`user_parameters` object. */ params?: Record; /** * Whether to include streams that have `auto_subscribe: true` in their definition. * * This defaults to `true`. */ includeDefaultStreams?: boolean; /** * The serialized schema - mainly used to forward information about raw tables to the sync client. */ serializedSchema?: any; } /** @internal */ export interface AdditionalConnectionOptions { /** * Delay for retrying sync streaming operations * from the PowerSync backend after an error occurs. */ retryDelayMs?: number; /** * Backend Connector CRUD operations are throttled * to occur at most every `crudUploadThrottleMs` * milliseconds. */ crudUploadThrottleMs?: number; } /** @internal */ export interface RequiredAdditionalConnectionOptions extends Required { subscriptions: SubscribedStream[]; } export interface StreamingSyncImplementation extends BaseObserverInterface, Disposable { /** * Connects to the sync service */ connect(options?: InternalConnectionOptions): Promise; /** * Disconnects from the sync services. * @throws if not connected or if abort is not controlled internally */ disconnect(): Promise; getWriteCheckpoint: () => Promise; hasCompletedSync: () => Promise; isConnected: boolean; lastSyncedAt?: Date; syncStatus: SyncStatus; triggerCrudUpload: () => void; waitForReady(): Promise; waitForStatus(status: SyncStatusOptions): Promise; waitUntilStatusMatches(predicate: (status: SyncStatus) => boolean): Promise; updateSubscriptions(subscriptions: SubscribedStream[]): void; } export const DEFAULT_CRUD_UPLOAD_THROTTLE_MS = 1000; export const DEFAULT_RETRY_DELAY_MS = 5000; export const DEFAULT_STREAMING_SYNC_OPTIONS = { retryDelayMs: DEFAULT_RETRY_DELAY_MS, crudUploadThrottleMs: DEFAULT_CRUD_UPLOAD_THROTTLE_MS }; export type RequiredPowerSyncConnectionOptions = Required; export const DEFAULT_STREAM_CONNECTION_OPTIONS: RequiredPowerSyncConnectionOptions = { appMetadata: {}, connectionMethod: SyncStreamConnectionMethod.WEB_SOCKET, clientImplementation: DEFAULT_SYNC_CLIENT_IMPLEMENTATION, fetchStrategy: FetchStrategy.Buffered, params: {}, serializedSchema: undefined, includeDefaultStreams: true }; export type SubscribedStream = { name: string; params: Record | null; }; // The priority we assume when we receive checkpoint lines where no priority is set. // This is the default priority used by the sync service, but can be set to an arbitrary // value since sync services without priorities also won't send partial sync completion // messages. const FALLBACK_PRIORITY = 3; export abstract class AbstractStreamingSyncImplementation extends BaseObserver implements StreamingSyncImplementation { protected _lastSyncedAt: Date | null; protected options: AbstractStreamingSyncImplementationOptions; protected abortController: AbortController | null; // In rare cases, mostly for tests, uploads can be triggered without being properly connected. // This allows ensuring that all upload processes can be aborted. protected uploadAbortController: AbortController | null; protected crudUpdateListener?: () => void; protected streamingSyncPromise?: Promise; protected logger: ILogger; private activeStreams: SubscribedStream[]; private isUploadingCrud: boolean = false; private notifyCompletedUploads?: () => void; private handleActiveStreamsChange?: () => void; syncStatus: SyncStatus; triggerCrudUpload: () => void; constructor(options: AbstractStreamingSyncImplementationOptions) { super(); this.options = options; this.activeStreams = options.subscriptions; this.logger = options.logger ?? Logger.get('PowerSyncStream'); this.syncStatus = new SyncStatus({ connected: false, connecting: false, lastSyncedAt: undefined, dataFlow: { uploading: false, downloading: false } }); this.abortController = null; this.triggerCrudUpload = throttleLeadingTrailing(() => { if (!this.syncStatus.connected || this.isUploadingCrud) { return; } this.isUploadingCrud = true; this._uploadAllCrud().finally(() => { this.notifyCompletedUploads?.(); this.isUploadingCrud = false; }); }, this.options.crudUploadThrottleMs!); } async waitForReady() {} waitForStatus(status: SyncStatusOptions): Promise { return this.waitUntilStatusMatches((currentStatus) => { /** * Match only the partial status options provided in the * matching status */ const matchPartialObject = (compA: object, compB: object) => { return Object.entries(compA).every(([key, value]) => { const comparisonBValue = compB[key]; if (typeof value == 'object' && typeof comparisonBValue == 'object') { return matchPartialObject(value, comparisonBValue); } return value == comparisonBValue; }); }; return matchPartialObject(status, currentStatus); }); } waitUntilStatusMatches(predicate: (status: SyncStatus) => boolean): Promise { return new Promise((resolve) => { if (predicate(this.syncStatus)) { resolve(); return; } const l = this.registerListener({ statusChanged: (updatedStatus) => { if (predicate(updatedStatus)) { resolve(); l?.(); } } }); }); } get lastSyncedAt() { const lastSynced = this.syncStatus.lastSyncedAt; return lastSynced && new Date(lastSynced); } get isConnected() { return this.syncStatus.connected; } async dispose() { super.dispose(); this.crudUpdateListener?.(); this.crudUpdateListener = undefined; this.uploadAbortController?.abort(); } abstract obtainLock(lockOptions: LockOptions): Promise; async hasCompletedSync() { return this.options.adapter.hasCompletedSync(); } async getWriteCheckpoint(): Promise { const clientId = await this.options.adapter.getClientId(); let path = `/write-checkpoint2.json?client_id=${clientId}`; const response = await this.options.remote.get(path); const checkpoint = response['data']['write_checkpoint'] as string; this.logger.debug(`Created write checkpoint: ${checkpoint}`); return checkpoint; } protected async _uploadAllCrud(): Promise { return this.obtainLock({ type: LockType.CRUD, callback: async () => { /** * Keep track of the first item in the CRUD queue for the last `uploadCrud` iteration. */ let checkedCrudItem: CrudEntry | undefined; const controller = new AbortController(); this.uploadAbortController = controller; this.abortController?.signal.addEventListener( 'abort', () => { controller.abort(); }, { once: true } ); while (!controller.signal.aborted) { try { /** * This is the first item in the FIFO CRUD queue. */ const nextCrudItem = await this.options.adapter.nextCrudItem(); if (nextCrudItem) { this.updateSyncStatus({ dataFlow: { uploading: true } }); if (nextCrudItem.clientId == checkedCrudItem?.clientId) { // This will force a higher log level than exceptions which are caught here. this.logger.warn(`Potentially previously uploaded CRUD entries are still present in the upload queue. Make sure to handle uploads and complete CRUD transactions or batches by calling and awaiting their [.complete()] method. The next upload iteration will be delayed.`); throw new Error('Delaying due to previously encountered CRUD item.'); } checkedCrudItem = nextCrudItem; await this.options.uploadCrud(); this.updateSyncStatus({ dataFlow: { uploadError: undefined } }); } else { // Uploading is completed const neededUpdate = await this.options.adapter.updateLocalTarget(() => this.getWriteCheckpoint()); if (neededUpdate == false && checkedCrudItem != null) { // Only log this if there was something to upload this.logger.debug('Upload complete, no write checkpoint needed.'); } break; } } catch (ex) { checkedCrudItem = undefined; this.updateSyncStatus({ dataFlow: { uploading: false, uploadError: ex } }); await this.delayRetry(controller.signal); if (!this.isConnected) { // Exit the upload loop if the sync stream is no longer connected break; } this.logger.debug( `Caught exception when uploading. Upload will retry after a delay. Exception: ${ex.message}` ); } finally { this.updateSyncStatus({ dataFlow: { uploading: false } }); } } this.uploadAbortController = null; } }); } async connect(options?: PowerSyncConnectionOptions) { if (this.abortController) { await this.disconnect(); } const controller = new AbortController(); this.abortController = controller; this.streamingSyncPromise = this.streamingSync(this.abortController.signal, options); // Return a promise that resolves when the connection status is updated to indicate that we're connected. return new Promise((resolve) => { const disposer = this.registerListener({ statusChanged: (status) => { if (status.dataFlowStatus.downloadError != null) { this.logger.warn('Initial connect attempt did not successfully connect to server'); } else if (status.connecting) { // Still connecting. return; } disposer(); resolve(); } }); }); } async disconnect(): Promise { if (!this.abortController) { return; } // This might be called multiple times if (!this.abortController.signal.aborted) { this.abortController.abort(new AbortOperation('Disconnect has been requested')); } // Await any pending operations before completing the disconnect operation try { await this.streamingSyncPromise; } catch (ex) { // The operation might have failed, all we care about is if it has completed this.logger.warn(ex); } this.streamingSyncPromise = undefined; this.abortController = null; this.updateSyncStatus({ connected: false, connecting: false }); } /** * @deprecated use [connect instead] */ async streamingSync(signal?: AbortSignal, options?: PowerSyncConnectionOptions): Promise { if (!signal) { this.abortController = new AbortController(); signal = this.abortController.signal; } /** * Listen for CRUD updates and trigger upstream uploads */ this.crudUpdateListener = this.options.adapter.registerListener({ crudUpdate: () => this.triggerCrudUpload() }); /** * Create a new abort controller which aborts items downstream. * This is needed to close any previous connections on exception. */ let nestedAbortController = new AbortController(); signal.addEventListener('abort', () => { /** * A request for disconnect was received upstream. Relay the request * to the nested abort controller. */ nestedAbortController.abort(signal?.reason ?? new AbortOperation('Received command to disconnect from upstream')); this.crudUpdateListener?.(); this.crudUpdateListener = undefined; this.updateSyncStatus({ connected: false, connecting: false, dataFlow: { downloading: false, downloadProgress: null } }); }); /** * This loops runs until [retry] is false or the abort signal is set to aborted. * Aborting the nestedAbortController will: * - Abort any pending fetch requests * - Close any sync stream ReadableStreams (which will also close any established network requests) */ while (true) { this.updateSyncStatus({ connecting: true }); let shouldDelayRetry = true; let result: RustIterationResult | null = null; try { if (signal?.aborted) { break; } result = await this.streamingSyncIteration(nestedAbortController.signal, options); // Continue immediately, streamingSyncIteration will wait before completing if necessary. } catch (ex) { /** * Either: * - A network request failed with a failed connection or not OKAY response code. * - There was a sync processing error. * - The connection was aborted. * This loop will retry after a delay if the connection was not aborted. * The nested abort controller will cleanup any open network requests and streams. * The WebRemote should only abort pending fetch requests or close active Readable streams. */ if (ex instanceof AbortOperation) { this.logger.warn(ex); shouldDelayRetry = false; // A disconnect was requested, we should not delay since there is no explicit retry } else { this.logger.error(ex); } this.updateSyncStatus({ dataFlow: { downloadError: ex } }); } finally { this.notifyCompletedUploads = undefined; if (!signal.aborted) { nestedAbortController.abort(new AbortOperation('Closing sync stream network requests before retry.')); nestedAbortController = new AbortController(); } if (result?.immediateRestart != true) { this.updateSyncStatus({ connected: false, connecting: true // May be unnecessary }); // On error, wait a little before retrying if (shouldDelayRetry) { await this.delayRetry(nestedAbortController.signal); } } } } // Mark as disconnected if here this.updateSyncStatus({ connected: false, connecting: false }); } private async collectLocalBucketState(): Promise<[BucketRequest[], Map]> { const bucketEntries = await this.options.adapter.getBucketStates(); const req: BucketRequest[] = bucketEntries.map((entry) => ({ name: entry.bucket, after: entry.op_id })); const localDescriptions = new Map(); for (const entry of bucketEntries) { localDescriptions.set(entry.bucket, null); } return [req, localDescriptions]; } /** * Older versions of the JS SDK used to encode subkeys as JSON in {@link OplogEntry.toJSON}. * Because subkeys are always strings, this leads to quotes being added around them in `ps_oplog`. * While this is not a problem as long as it's done consistently, it causes issues when a database * created by the JS SDK is used with other SDKs, or (more likely) when the new Rust sync client * is enabled. * * So, we add a migration from the old key format (with quotes) to the new one (no quotes). The * migration is only triggered when necessary (for now). The function returns whether the new format * should be used, so that the JS SDK is able to write to updated databases. * * @param requireFixedKeyFormat Whether we require the new format or also support the old one. * The Rust client requires the new subkey format. * @returns Whether the database is now using the new, fixed subkey format. */ private async requireKeyFormat(requireFixedKeyFormat: boolean): Promise { const hasMigrated = await this.options.adapter.hasMigratedSubkeys(); if (requireFixedKeyFormat && !hasMigrated) { await this.options.adapter.migrateToFixedSubkeys(); return true; } else { return hasMigrated; } } protected streamingSyncIteration( signal: AbortSignal, options?: PowerSyncConnectionOptions ): Promise { return this.obtainLock({ type: LockType.SYNC, signal, callback: async () => { const resolvedOptions: RequiredPowerSyncConnectionOptions = { ...DEFAULT_STREAM_CONNECTION_OPTIONS, ...(options ?? {}) }; // Validate app metadata const invalidMetadata = Object.entries(resolvedOptions.appMetadata).filter( ([_, value]) => typeof value != 'string' ); if (invalidMetadata.length > 0) { throw new Error( `Invalid appMetadata provided. Only string values are allowed. Invalid values: ${invalidMetadata.map(([key, value]) => `${key}: ${value}`).join(', ')}` ); } const clientImplementation = resolvedOptions.clientImplementation; this.updateSyncStatus({ clientImplementation }); if (clientImplementation == SyncClientImplementation.JAVASCRIPT) { await this.legacyStreamingSyncIteration(signal, resolvedOptions); return null; } else { await this.requireKeyFormat(true); return await this.rustSyncIteration(signal, resolvedOptions); } } }); } private async receiveSyncLines(data: { options: SyncStreamOptions; connection: RequiredPowerSyncConnectionOptions; bson?: typeof BSON; }): Promise> { const { options, connection, bson } = data; const remote = this.options.remote; if (connection.connectionMethod == SyncStreamConnectionMethod.HTTP) { return await remote.fetchStream(options); } else { return await this.options.remote.socketStreamRaw( { ...options, ...{ fetchStrategy: connection.fetchStrategy } }, bson ); } } private async legacyStreamingSyncIteration(signal: AbortSignal, resolvedOptions: RequiredPowerSyncConnectionOptions) { const rawTables = resolvedOptions.serializedSchema?.raw_tables; if (rawTables != null && rawTables.length) { this.logger.warn('Raw tables require the Rust-based sync client. The JS client will ignore them.'); } if (this.activeStreams.length) { this.logger.error('Sync streams require `clientImplementation: SyncClientImplementation.RUST` when connecting.'); } this.logger.debug('Streaming sync iteration started'); this.options.adapter.startSession(); let [req, bucketMap] = await this.collectLocalBucketState(); let targetCheckpoint: Checkpoint | null = null; // A checkpoint that has been validated but not applied (e.g. due to pending local writes) let pendingValidatedCheckpoint: Checkpoint | null = null; const clientId = await this.options.adapter.getClientId(); const usingFixedKeyFormat = await this.requireKeyFormat(false); this.logger.debug('Requesting stream from server'); const syncOptions: SyncStreamOptions = { path: '/sync/stream', abortSignal: signal, data: { buckets: req, include_checksum: true, raw_data: true, parameters: resolvedOptions.params, app_metadata: resolvedOptions.appMetadata, client_id: clientId } }; const bson = await this.options.remote.getBSON(); const source = await this.receiveSyncLines({ options: syncOptions, connection: resolvedOptions, bson }); const stream: InjectableIterator = injectable( map(source, (line) => { if (typeof line == 'string') { return JSON.parse(line) as StreamingSyncLine; } else { return bson.deserialize(line) as StreamingSyncLine; } }) ); this.logger.debug('Stream established. Processing events'); this.notifyCompletedUploads = () => { stream.inject({ crud_upload_completed: null }); }; while (true) { const { value: line, done } = await stream.next(); if (done) { // The stream has closed while waiting return; } if ('crud_upload_completed' in line) { if (pendingValidatedCheckpoint != null) { const { applied, endIteration } = await this.applyCheckpoint(pendingValidatedCheckpoint); if (applied) { pendingValidatedCheckpoint = null; } else if (endIteration) { break; } } continue; } // A connection is active and messages are being received if (!this.syncStatus.connected) { // There is a connection now Promise.resolve().then(() => this.triggerCrudUpload()); this.updateSyncStatus({ connected: true }); } if (isStreamingSyncCheckpoint(line)) { targetCheckpoint = line.checkpoint; // New checkpoint - existing validated checkpoint is no longer valid pendingValidatedCheckpoint = null; const bucketsToDelete = new Set(bucketMap.keys()); const newBuckets = new Map(); for (const checksum of line.checkpoint.buckets) { newBuckets.set(checksum.bucket, { name: checksum.bucket, priority: checksum.priority ?? FALLBACK_PRIORITY }); bucketsToDelete.delete(checksum.bucket); } if (bucketsToDelete.size > 0) { this.logger.debug('Removing buckets', [...bucketsToDelete]); } bucketMap = newBuckets; await this.options.adapter.removeBuckets([...bucketsToDelete]); await this.options.adapter.setTargetCheckpoint(targetCheckpoint); await this.updateSyncStatusForStartingCheckpoint(targetCheckpoint); } else if (isStreamingSyncCheckpointComplete(line)) { const result = await this.applyCheckpoint(targetCheckpoint!); if (result.endIteration) { return; } else if (!result.applied) { // "Could not apply checkpoint due to local data". We need to retry after // finishing uploads. pendingValidatedCheckpoint = targetCheckpoint; } else { // Nothing to retry later. This would likely already be null from the last // checksum or checksum_diff operation, but we make sure. pendingValidatedCheckpoint = null; } } else if (isStreamingSyncCheckpointPartiallyComplete(line)) { const priority = line.partial_checkpoint_complete.priority; this.logger.debug('Partial checkpoint complete', priority); const result = await this.options.adapter.syncLocalDatabase(targetCheckpoint!, priority); if (!result.checkpointValid) { // This means checksums failed. Start again with a new checkpoint. // TODO: better back-off await new Promise((resolve) => setTimeout(resolve, 50)); return; } else if (!result.ready) { // If we have pending uploads, we can't complete new checkpoints outside of priority 0. // We'll resolve this for a complete checkpoint. } else { // We'll keep on downloading, but can report that this priority is synced now. this.logger.debug('partial checkpoint validation succeeded'); // All states with a higher priority can be deleted since this partial sync includes them. const priorityStates = this.syncStatus.priorityStatusEntries.filter((s) => s.priority <= priority); priorityStates.push({ priority, lastSyncedAt: new Date(), hasSynced: true }); this.updateSyncStatus({ connected: true, priorityStatusEntries: priorityStates }); } } else if (isStreamingSyncCheckpointDiff(line)) { // TODO: It may be faster to just keep track of the diff, instead of the entire checkpoint if (targetCheckpoint == null) { throw new Error('Checkpoint diff without previous checkpoint'); } // New checkpoint - existing validated checkpoint is no longer valid pendingValidatedCheckpoint = null; const diff = line.checkpoint_diff; const newBuckets = new Map(); for (const checksum of targetCheckpoint.buckets) { newBuckets.set(checksum.bucket, checksum); } for (const checksum of diff.updated_buckets) { newBuckets.set(checksum.bucket, checksum); } for (const bucket of diff.removed_buckets) { newBuckets.delete(bucket); } const newCheckpoint: Checkpoint = { last_op_id: diff.last_op_id, buckets: [...newBuckets.values()], write_checkpoint: diff.write_checkpoint }; targetCheckpoint = newCheckpoint; await this.updateSyncStatusForStartingCheckpoint(targetCheckpoint); bucketMap = new Map(); newBuckets.forEach((checksum, name) => bucketMap.set(name, { name: checksum.bucket, priority: checksum.priority ?? FALLBACK_PRIORITY }) ); const bucketsToDelete = diff.removed_buckets; if (bucketsToDelete.length > 0) { this.logger.debug('Remove buckets', bucketsToDelete); } await this.options.adapter.removeBuckets(bucketsToDelete); await this.options.adapter.setTargetCheckpoint(targetCheckpoint); } else if (isStreamingSyncData(line)) { const { data } = line; const previousProgress = this.syncStatus.dataFlowStatus.downloadProgress; let updatedProgress: InternalProgressInformation | null = null; if (previousProgress) { updatedProgress = { ...previousProgress }; const progressForBucket = updatedProgress[data.bucket]; if (progressForBucket) { updatedProgress[data.bucket] = { ...progressForBucket, since_last: progressForBucket.since_last + data.data.length }; } } this.updateSyncStatus({ dataFlow: { downloading: true, downloadProgress: updatedProgress } }); await this.options.adapter.saveSyncData({ buckets: [SyncDataBucket.fromRow(data)] }, usingFixedKeyFormat); } else if (isStreamingKeepalive(line)) { const remaining_seconds = line.token_expires_in; if (remaining_seconds == 0) { // Connection would be closed automatically right after this this.logger.debug('Token expiring; reconnect'); /** * For a rare case where the backend connector does not update the token * (uses the same one), this should have some delay. */ await this.delayRetry(); return; } else if (remaining_seconds < 30) { this.logger.debug('Token will expire soon; reconnect'); // Pre-emptively refresh the token this.options.remote.invalidateCredentials(); return; } this.triggerCrudUpload(); } else { this.logger.debug('Received unknown sync line', line); } } this.logger.debug('Stream input empty'); // Connection closed. Likely due to auth issue. return; } private async rustSyncIteration( signal: AbortSignal, resolvedOptions: RequiredPowerSyncConnectionOptions ): Promise { const syncImplementation = this; const adapter = this.options.adapter; const remote = this.options.remote; const controller = new AbortController(); const abort = () => { return controller.abort(signal.reason); }; signal.addEventListener('abort', abort); let receivingLines: Promise | null = null; let hadSyncLine = false; let hideDisconnectOnRestart = false; if (signal.aborted) { throw new AbortOperation('Connection request has been aborted'); } // Pending sync lines received from the service, as well as local events that trigger a powersync_control // invocation (local events include refreshed tokens and completed uploads). // This is a single data stream so that we can handle all control calls from a single place. let controlInvocations: InjectableIterator | null = null; async function connect(instr: EstablishSyncStream) { const syncOptions: SyncStreamOptions = { path: '/sync/stream', abortSignal: controller.signal, data: instr.request }; controlInvocations = injectable( map( await syncImplementation.receiveSyncLines({ options: syncOptions, connection: resolvedOptions }), (line) => { if (typeof line == 'string') { return { command: PowerSyncControlCommand.PROCESS_TEXT_LINE, payload: line }; } else { return { command: PowerSyncControlCommand.PROCESS_BSON_LINE, payload: line }; } } ) ); // The rust client will set connected: true after the first sync line because that's when it gets invoked, but // we're already connected here and can report that. syncImplementation.updateSyncStatus({ connected: true }); try { while (true) { let event = await controlInvocations.next(); if (event.done) { break; } const line = event.value; await control(line.command, line.payload); if (!hadSyncLine) { syncImplementation.triggerCrudUpload(); hadSyncLine = true; } } } finally { abort(); signal.removeEventListener('abort', abort); } } async function stop() { await control(PowerSyncControlCommand.STOP); } async function control(op: PowerSyncControlCommand, payload?: Uint8Array | string) { const rawResponse = await adapter.control(op, payload ?? null); const logger = syncImplementation.logger; logger.trace( 'powersync_control', op, payload == null || typeof payload == 'string' ? payload : '', rawResponse ); await handleInstructions(JSON.parse(rawResponse)); } async function handleInstruction(instruction: Instruction) { if ('LogLine' in instruction) { switch (instruction.LogLine.severity) { case 'DEBUG': syncImplementation.logger.debug(instruction.LogLine.line); break; case 'INFO': syncImplementation.logger.info(instruction.LogLine.line); break; case 'WARNING': syncImplementation.logger.warn(instruction.LogLine.line); break; } } else if ('UpdateSyncStatus' in instruction) { syncImplementation.updateSyncStatus(coreStatusToJs(instruction.UpdateSyncStatus.status)); } else if ('EstablishSyncStream' in instruction) { if (receivingLines != null) { // Already connected, this shouldn't happen during a single iteration. throw 'Unexpected request to establish sync stream, already connected'; } receivingLines = connect(instruction.EstablishSyncStream); } else if ('FetchCredentials' in instruction) { if (instruction.FetchCredentials.did_expire) { remote.invalidateCredentials(); } else { remote.invalidateCredentials(); // Restart iteration after the credentials have been refreshed. remote.fetchCredentials().then( (_) => { controlInvocations?.inject({ command: PowerSyncControlCommand.NOTIFY_TOKEN_REFRESHED }); }, (err) => { syncImplementation.logger.warn('Could not prefetch credentials', err); } ); } } else if ('CloseSyncStream' in instruction) { controller.abort(); hideDisconnectOnRestart = instruction.CloseSyncStream.hide_disconnect; } else if ('FlushFileSystem' in instruction) { // Not necessary on JS platforms. } else if ('DidCompleteSync' in instruction) { syncImplementation.updateSyncStatus({ dataFlow: { downloadError: undefined } }); } } async function handleInstructions(instructions: Instruction[]) { for (const instr of instructions) { await handleInstruction(instr); } } try { const options: any = { parameters: resolvedOptions.params, app_metadata: resolvedOptions.appMetadata, active_streams: this.activeStreams, include_defaults: resolvedOptions.includeDefaultStreams }; if (resolvedOptions.serializedSchema) { options.schema = resolvedOptions.serializedSchema; } await control(PowerSyncControlCommand.START, JSON.stringify(options)); this.notifyCompletedUploads = () => { controlInvocations?.inject({ command: PowerSyncControlCommand.NOTIFY_CRUD_UPLOAD_COMPLETED }); }; this.handleActiveStreamsChange = () => { controlInvocations?.inject({ command: PowerSyncControlCommand.UPDATE_SUBSCRIPTIONS, payload: JSON.stringify(this.activeStreams) }); }; await receivingLines; } finally { this.notifyCompletedUploads = this.handleActiveStreamsChange = undefined; await stop(); } return { immediateRestart: hideDisconnectOnRestart }; } private async updateSyncStatusForStartingCheckpoint(checkpoint: Checkpoint) { const localProgress = await this.options.adapter.getBucketOperationProgress(); const progress: InternalProgressInformation = {}; let invalidated = false; for (const bucket of checkpoint.buckets) { const savedProgress = localProgress[bucket.bucket]; const atLast = savedProgress?.atLast ?? 0; const sinceLast = savedProgress?.sinceLast ?? 0; progress[bucket.bucket] = { // The fallback priority doesn't matter here, but 3 is the one newer versions of the sync service // will use by default. priority: bucket.priority ?? 3, at_last: atLast, since_last: sinceLast, target_count: bucket.count ?? 0 }; if (bucket.count != null && bucket.count < atLast + sinceLast) { // Either due to a defrag / sync rule deploy or a compaction operation, the size // of the bucket shrank so much that the local ops exceed the ops in the updated // bucket. We can't prossibly report progress in this case (it would overshoot 100%). invalidated = true; } } if (invalidated) { for (const bucket in progress) { const bucketProgress = progress[bucket]; bucketProgress.at_last = 0; bucketProgress.since_last = 0; } } this.updateSyncStatus({ dataFlow: { downloading: true, downloadProgress: progress } }); } private async applyCheckpoint(checkpoint: Checkpoint) { let result = await this.options.adapter.syncLocalDatabase(checkpoint); if (!result.checkpointValid) { this.logger.debug(`Checksum mismatch in checkpoint ${checkpoint.last_op_id}, will reconnect`); // This means checksums failed. Start again with a new checkpoint. // TODO: better back-off await new Promise((resolve) => setTimeout(resolve, 50)); return { applied: false, endIteration: true }; } else if (!result.ready) { this.logger.debug( `Could not apply checkpoint ${checkpoint.last_op_id} due to local data. We will retry applying the checkpoint after that upload is completed.` ); return { applied: false, endIteration: false }; } this.logger.debug(`Applied checkpoint ${checkpoint.last_op_id}`, checkpoint); this.updateSyncStatus({ connected: true, lastSyncedAt: new Date(), dataFlow: { downloading: false, downloadProgress: null, downloadError: undefined } }); return { applied: true, endIteration: false }; } protected updateSyncStatus(options: SyncStatusOptions) { const updatedStatus = new SyncStatus({ connected: options.connected ?? this.syncStatus.connected, connecting: !options.connected && (options.connecting ?? this.syncStatus.connecting), lastSyncedAt: options.lastSyncedAt ?? this.syncStatus.lastSyncedAt, dataFlow: { ...this.syncStatus.dataFlowStatus, ...options.dataFlow }, priorityStatusEntries: options.priorityStatusEntries ?? this.syncStatus.priorityStatusEntries, clientImplementation: options.clientImplementation ?? this.syncStatus.clientImplementation }); if (!this.syncStatus.isEqual(updatedStatus)) { this.syncStatus = updatedStatus; // Only trigger this is there was a change this.iterateListeners((cb) => cb.statusChanged?.(updatedStatus)); } // trigger this for all updates this.iterateListeners((cb) => cb.statusUpdated?.(options)); } private async delayRetry(signal?: AbortSignal): Promise { return new Promise((resolve) => { if (signal?.aborted) { // If the signal is already aborted, resolve immediately resolve(); return; } const { retryDelayMs } = this.options; let timeoutId: ReturnType | undefined; const endDelay = () => { resolve(); if (timeoutId) { clearTimeout(timeoutId); timeoutId = undefined; } signal?.removeEventListener('abort', endDelay); }; signal?.addEventListener('abort', endDelay, { once: true }); timeoutId = setTimeout(endDelay, retryDelayMs); }); } updateSubscriptions(subscriptions: SubscribedStream[]): void { this.activeStreams = subscriptions; this.handleActiveStreamsChange?.(); } } interface EnqueuedCommand { command: PowerSyncControlCommand; payload?: Uint8Array | string; } interface RustIterationResult { immediateRestart: boolean; }