import { Queue } from './queue.js'; export type UnlockFn = () => void; /** * An asynchronous semaphore implementation with associated items per lease. * * @internal This class is meant to be used in PowerSync SDKs only, and is not part of the public API. */ export class Semaphore { // Available items that are not currently assigned to a waiter. private readonly available: Queue; readonly size: number; // Linked list of waiters. We don't expect the wait list to become particularly large, and this allows removing // aborted waiters from the middle of the list efficiently. private firstWaiter?: SemaphoreWaitNode; private lastWaiter?: SemaphoreWaitNode; constructor(elements: Iterable) { this.available = new Queue(elements); this.size = this.available.length; } private addWaiter(requestedItems: number, onAcquire: () => void): SemaphoreWaitNode { const node: SemaphoreWaitNode = { isActive: true, acquiredItems: [], remainingItems: requestedItems, onAcquire, prev: this.lastWaiter }; if (this.lastWaiter) { this.lastWaiter.next = node; this.lastWaiter = node; } else { // First waiter this.lastWaiter = this.firstWaiter = node; } return node; } private deactivateWaiter(waiter: SemaphoreWaitNode) { const { prev, next } = waiter; waiter.isActive = false; if (prev) prev.next = next; if (next) next.prev = prev; if (waiter == this.firstWaiter) this.firstWaiter = next; if (waiter == this.lastWaiter) this.lastWaiter = prev; } private requestPermits(amount: number, abort?: AbortSignal): Promise<{ items: T[]; release: UnlockFn }> { if (amount <= 0 || amount > this.size) { throw new Error(`Invalid amount of items requested (${amount}), must be between 1 and ${this.size}`); } return new Promise((resolve, reject) => { function rejectAborted() { reject(abort?.reason ?? new Error('Semaphore acquire aborted')); } if (abort?.aborted) { return rejectAborted(); } let waiter: SemaphoreWaitNode; const markCompleted = () => { const items = waiter.acquiredItems; waiter.acquiredItems = []; // Avoid releasing items twice. for (const element of items) { // Give to next waiter, if possible. const nextWaiter = this.firstWaiter; if (nextWaiter) { nextWaiter.acquiredItems.push(element); nextWaiter.remainingItems--; if (nextWaiter.remainingItems == 0) { nextWaiter.onAcquire(); } } else { // No pending waiter, return lease into pool. this.available.addLast(element); } } }; const onAbort = () => { abort?.removeEventListener('abort', onAbort); if (waiter.isActive) { this.deactivateWaiter(waiter); rejectAborted(); } }; const resolvePromise = () => { this.deactivateWaiter(waiter); abort?.removeEventListener('abort', onAbort); const items = waiter.acquiredItems; resolve({ items, release: markCompleted }); }; waiter = this.addWaiter(amount, resolvePromise); // If there are items in the pool that haven't been assigned, we can pull them into this waiter. Note that this is // only the case if we're the first waiter (otherwise, items would have been assigned to an earlier waiter). while (!this.available.isEmpty && waiter.remainingItems > 0) { waiter.acquiredItems.push(this.available.removeFirst()); waiter.remainingItems--; } if (waiter.remainingItems == 0) { return resolvePromise(); } abort?.addEventListener('abort', onAbort); }); } /** * Requests a single item from the pool. * * The returned `release` callback must be invoked to return the item into the pool. */ async requestOne(abort?: AbortSignal): Promise<{ item: T; release: UnlockFn }> { const { items, release } = await this.requestPermits(1, abort); return { release, item: items[0] }; } /** * Requests access to all items from the pool. * * The returned `release` callback must be invoked to return items into the pool. */ requestAll(abort?: AbortSignal): Promise<{ items: T[]; release: UnlockFn }> { return this.requestPermits(this.size, abort); } } interface SemaphoreWaitNode { /** * Whether the waiter is currently active (not aborted and not fullfilled). */ isActive: boolean; acquiredItems: T[]; remainingItems: number; onAcquire: () => void; prev?: SemaphoreWaitNode; next?: SemaphoreWaitNode; } /** * An asynchronous mutex implementation. * * @internal This class is meant to be used in PowerSync SDKs only, and is not part of the public API. */ export class Mutex { private inner = new Semaphore([null]); async acquire(abort?: AbortSignal): Promise { const { release } = await this.inner.requestOne(abort); return release; } async runExclusive(fn: () => PromiseLike | T, abort?: AbortSignal): Promise { const returnMutex = await this.acquire(abort); try { return await fn(); } finally { returnMutex(); } } } /** * Creates a signal aborting after the set timeout. */ export function timeoutSignal(timeout: number): AbortSignal; export function timeoutSignal(timeout?: number): AbortSignal | undefined; export function timeoutSignal(timeout?: number): AbortSignal | undefined { if (timeout == null) return; if ('timeout' in AbortSignal) return AbortSignal.timeout(timeout); const controller = new AbortController(); setTimeout(() => controller.abort(new Error('Timeout waiting for lock')), timeout); return controller.signal; }