import { WatchCompatibleQuery, WatchedQuery, WatchedQueryOptions } from '../WatchedQuery.js'; import { AbstractQueryProcessor, AbstractQueryProcessorOptions, LinkQueryOptions, MutableWatchedQueryState } from './AbstractQueryProcessor.js'; import { WatchedQueryComparator } from './comparators.js'; /** * Settings for {@link WatchedQuery} instances created via {@link Query#watch}. */ export interface WatchedQuerySettings extends WatchedQueryOptions { query: WatchCompatibleQuery; } /** * {@link WatchedQuery} returned from {@link Query#watch}. */ export type StandardWatchedQuery = WatchedQuery>; /** * @internal */ export interface OnChangeQueryProcessorOptions extends AbstractQueryProcessorOptions> { comparator?: WatchedQueryComparator; } /** * Uses the PowerSync onChange event to trigger watched queries. * Results are emitted on every change of the relevant tables. * @internal */ export class OnChangeQueryProcessor extends AbstractQueryProcessor> { constructor(protected options: OnChangeQueryProcessorOptions) { super(options); } /** * @returns If the sets are equal */ protected checkEquality(current: Data, previous: Data): boolean { // Use the provided comparator if available. Assume values are unique if not available. return this.options.comparator?.checkEquality?.(current, previous) ?? false; } protected async linkQuery(options: LinkQueryOptions): Promise { const { db, watchOptions } = this.options; const { abortSignal } = options; const compiledQuery = watchOptions.query.compile(); const tables = await db.resolveTables(compiledQuery.sql, compiledQuery.parameters as any[], { tables: options.settings.triggerOnTables }); db.onChangeWithCallback( { onChange: async () => { if (this.closed || abortSignal.aborted) { return; } // This fires for each change of the relevant tables try { if (this.reportFetching && !this.state.isFetching) { await this.updateState({ isFetching: true }); } const partialStateUpdate: Partial> & { data?: Data } = {}; // Always run the query if an underlying table has changed const result = await watchOptions.query.execute({ sql: compiledQuery.sql, // Allows casting from ReadOnlyArray[unknown] to Array // This allows simpler compatibility with PowerSync queries parameters: [...compiledQuery.parameters], db: this.options.db }); if (abortSignal.aborted) { return; } if (this.reportFetching) { partialStateUpdate.isFetching = false; } if (this.state.isLoading) { partialStateUpdate.isLoading = false; } // Check if the result has changed if (!this.checkEquality(result, this.state.data)) { Object.assign(partialStateUpdate, { data: result }); } if (this.state.error) { partialStateUpdate.error = null; } if (Object.keys(partialStateUpdate).length > 0) { await this.updateState(partialStateUpdate); } } catch (error) { await this.updateState({ error }); } }, onError: async (error) => { await this.updateState({ error }); } }, { signal: abortSignal, tables, throttleMs: watchOptions.throttleMs, triggerImmediate: true // used to emit the initial state } ); } }