import { AbstractPowerSyncDatabase } from '../../../client/AbstractPowerSyncDatabase.js'; import { MetaBaseObserver } from '../../../utils/MetaBaseObserver.js'; import { WatchedQuery, WatchedQueryListener, WatchedQueryListenerEvent, WatchedQueryOptions, WatchedQueryState } from '../WatchedQuery.js'; /** * @internal */ export interface AbstractQueryProcessorOptions { db: AbstractPowerSyncDatabase; watchOptions: Settings; placeholderData: Data; } /** * @internal */ export interface LinkQueryOptions { abortSignal: AbortSignal; settings: Settings; } type MutableDeep = T extends ReadonlyArray ? U[] // convert readonly arrays to mutable arrays : T; /** * @internal Mutable version of {@link WatchedQueryState}. * This is used internally to allow updates to the state. */ export type MutableWatchedQueryState = { -readonly [P in keyof WatchedQueryState]: MutableDeep[P]>; }; type WatchedQueryProcessorListener = WatchedQueryListener; /** * Performs underlying watching and yields a stream of results. * @internal */ export abstract class AbstractQueryProcessor< Data = unknown[], Settings extends WatchedQueryOptions = WatchedQueryOptions > extends MetaBaseObserver> implements WatchedQuery { readonly state: WatchedQueryState; protected abortController: AbortController; protected initialized: Promise; protected _closed: boolean; protected disposeListeners: (() => void) | null; get closed() { return this._closed; } constructor(protected options: AbstractQueryProcessorOptions) { super(); this.abortController = new AbortController(); this._closed = false; this.state = this.constructInitialState(); this.disposeListeners = null; this.initialized = this.init(this.abortController.signal); } protected constructInitialState(): WatchedQueryState { return { isLoading: true, isFetching: this.reportFetching, // Only set to true if we will report updates in future error: null, lastUpdated: null, data: this.options.placeholderData }; } protected get reportFetching() { return this.options.watchOptions.reportFetching ?? true; } protected async updateSettingsInternal(settings: Settings, signal: AbortSignal) { // This may have been aborted while awaiting or if multiple calls to `updateSettings` were made if (this._closed || signal.aborted) { return; } this.options.watchOptions = settings; this.iterateListeners((l) => l[WatchedQueryListenerEvent.SETTINGS_WILL_UPDATE]?.()); if (!this.state.isFetching && this.reportFetching) { await this.updateState({ isFetching: true }); } await this.runWithReporting(() => this.linkQuery({ abortSignal: signal, settings }) ); } /** * Updates the underlying query. */ async updateSettings(settings: Settings) { // Abort the previous request this.abortController.abort(); // Keep track of this controller's abort status const abortController = new AbortController(); // Allow this to be aborted externally this.abortController = abortController; await this.initialized; return this.updateSettingsInternal(settings, abortController.signal); } /** * This method is used to link a query to the subscribers of this listener class. * This method should perform actual query watching and report results via {@link updateState} method. */ protected abstract linkQuery(options: LinkQueryOptions): Promise; protected async updateState(update: Partial>) { if (this._closed) { return; } if (typeof update.error !== 'undefined') { await this.iterateAsyncListenersWithError(async (l) => l.onError?.(update.error!)); // An error always stops for the current fetching state update.isFetching = false; update.isLoading = false; } Object.assign(this.state, { lastUpdated: new Date() } satisfies Partial>, update); if (typeof update.data !== 'undefined') { await this.iterateAsyncListenersWithError(async (l) => l.onData?.(this.state.data)); } await this.iterateAsyncListenersWithError(async (l) => l.onStateChange?.(this.state)); } /** * Configures base DB listeners and links the query to listeners. */ protected async init(signal: AbortSignal) { const { db } = this.options; const disposeCloseListener = db.registerListener({ closing: async () => { await this.close(); } }); // Wait for the schema to be set before listening to changes await db.waitForReady(); const disposeSchemaListener = db.registerListener({ schemaChanged: async () => { await this.runWithReporting(async () => { await this.updateSettings(this.options.watchOptions); }); } }); this.disposeListeners = () => { disposeCloseListener(); disposeSchemaListener(); }; // Initial setup await this.runWithReporting(async () => { await this.updateSettingsInternal(this.options.watchOptions, signal); }); } async close() { this._closed = true; this.abortController.abort(); this.disposeListeners?.(); this.disposeListeners = null; this.iterateListeners((l) => l.closed?.()); this.listeners.clear(); } /** * Runs a callback and reports errors to the error listeners. */ protected async runWithReporting(callback: () => Promise): Promise { try { await callback(); } catch (error) { // This will update the error on the state and iterate error listeners await this.updateState({ error }); } } /** * Iterate listeners and reports errors to onError handlers. */ protected async iterateAsyncListenersWithError( callback: (listener: Partial>) => Promise | void ) { try { await this.iterateAsyncListeners(async (l) => callback(l)); } catch (error) { try { await this.iterateAsyncListeners(async (l) => l.onError?.(error)); } catch (error) { // Errors here are ignored // since we are already in an error state this.options.db.logger.error('Watched query error handler threw an Error', error); } } } }