import { TEvent } from "./event" import { abort } from "./fn" import { isNone, Voidable } from "./maybe" /** Cancellation Token */ export interface CancelToken { /** Has it been cancelled or whether cancellation has been requested */ readonly cancelled: boolean /** Communicates a request for cancellation */ cancel(): void /** If cancelled, throw `CancelGuard` */ guard(): void /** Register an event that will be triggered when cancelled */ reg(f: () => void): void /** Waiting for cancellation */ reg(): Promise /** Unregister event */ unReg(f: () => void): void } /** Cancel Guard */ export class CancelGuard { #source: CancelToken private constructor(source: CancelToken) { this.#source = source } is(source: CancelToken) { return source === this.#source } static new(source: CancelToken) { return new CancelGuard(source) } } /** Cancellation provider */ class CancelSource implements CancelToken { #cancelled = false #reg?: TEvent get cancelled() { return this.#cancelled } cancel() { if (!this.#cancelled) { this.#reg?.emit() } this.#cancelled = true } guard() { if (this.#cancelled) throw CancelGuard.new(this) } reg(): Promise reg(f: () => any): void reg(f?: () => any) { if (isNone(f)) return new Promise(res => this.reg(() => res())) if (isNone(this.#reg)) this.#reg = new TEvent this.#reg.once(f) } unReg(f: () => any) { this.#reg?.off(f) } } /** Sync cancelable */ export function syncCancelable(f: (ctx: CancelToken) => R): R | void { const token = new CancelSource try { return f(token) } catch (e) { if (e instanceof CancelGuard && e.is(token)) return throw e } } /** Async cancelable */ export async function cancelable(f: (ctx: CancelToken) => Promise): Promise { const token = new CancelSource try { return await f(token) } catch (e) { if (e instanceof CancelGuard && e.is(token)) return throw e } } /** Task Like */ export interface TaskLike extends CancelToken, PromiseLike { /** Run task */ run(): PromiseLike /** Is running */ running: boolean /** Whether finished */ finished: boolean } const TaskNoInit = Symbol('TaskNoInit') /** Cancelable async task */ export class Task implements TaskLike, Promise { // @ts-ignore #p: Promise #cancelled = false #finished = false #reg?: TEvent /** Creates a new Task */ constructor(f: (self: Task) => PromiseLike) /** Creates a new Task with CancelToken */ constructor(token: CancelToken, f: (self: Task) => PromiseLike) constructor(a: any, b?: any) { if (a === TaskNoInit) return if (typeof b === 'function') [a, b] = [b, a] const token: Voidable = b, f: (self: Task) => PromiseLike = a if (token?.cancelled === true) return Task.abort() const cancel = () => this.cancel() token?.reg(cancel) this.#p = (async () => { try { return await f(this) } catch (e) { if (e instanceof CancelGuard && e.is(this)) return throw e } finally { this.#finished = true token?.unReg(cancel) } })() } /** Communicates a request for cancellation */ cancel() { if (!this.#cancelled) { this.#cancelled = true this.#reg?.emit() } } /** If cancelled, throw CancelGuard */ guard() { if (this.#cancelled) throw CancelGuard.new(this) } /** If cancelled will not continue */ aguard(): Promise { if (this.#cancelled) return abort() else return Promise.resolve() } /** Register an event that will be triggered when cancelled */ reg(f: () => any): void /** Waiting for cancellation */ reg(): Promise reg(f?: () => any) { if (isNone(f)) return new Promise(res => this.reg(() => res())) if (isNone(this.#reg)) this.#reg = new TEvent this.#reg.once(f) } /** Unregister event */ unReg(f: () => any) { this.#reg?.off(f) } /** Has it been cancelled or whether cancellation has been requested */ get cancelled() { return this.#cancelled } /** Whether finished */ get finished() { return this.#finished } /** Is running */ get running() { return !this.#cancelled && !this.#finished } /** Run task * * Even if this function is not called the task will still run */ run(): Promise { return this.#p } /** Attaches callbacks for the resolution and/or rejection of the Promise. */ then(onfulfilled?: ((value: T | void) => TResult1 | PromiseLike) | null, onrejected?: ((reason: any) => TResult2 | PromiseLike) | null): Promise { return this.#p.then(onfulfilled, onrejected) } /** Attaches a callback for only the rejection of the Promise. */ async catch(onrejected?: ((reason: any) => TResult | PromiseLike) | null): Promise { return this.#p.catch(onrejected) } [Symbol.toStringTag]: string /** Attaches a callback that is invoked when the Promise is settled (fulfilled or rejected). The resolved value cannot be modified from the callback. */ finally(onfinally?: (() => void) | null): Promise { return this.#p.finally(onfinally) } /** Run Task */ static run(f: (self: Task) => PromiseLike): Task /** Run Task with CancelToken */ static run(token: CancelToken, f: (self: Task) => PromiseLike): Task static run(a: any, b?: any): Task { return new Task(a, b) } /** Run task with promise parameters * @param executor A callback used to initialize the promise. This callback is passed two arguments: * a resolve callback used to resolve the promise with a value or the result of another promise, * and a reject callback used to reject the promise with a provided reason or error. */ static exec(executor: (self: Task, resolve: (value: T | PromiseLike) => void, reject: (reason?: any) => void) => void): Task /** Run task with promise parameters with CancelToken * @param executor A callback used to initialize the promise. This callback is passed two arguments: * a resolve callback used to resolve the promise with a value or the result of another promise, * and a reject callback used to reject the promise with a provided reason or error. */ static exec(token: CancelToken, executor: (self: Task, resolve: (value: T | PromiseLike) => void, reject: (reason?: any) => void) => void): Task static exec(a: any, b?: any): Task { if (typeof b === 'function') { if (a?.cancelled === true) return Task.abort() return new Task(a, self => new Promise((res, rej) => b(self, res, rej))) } return new Task(self => new Promise((res, rej) => a(self, res, rej))) } /** Cancelable async delay */ static delay(ms?: number): Task /** Cancelable async delay with CancelToken */ static delay(token: CancelToken, ms?: number): Task static delay(a?: any, b?: number): Task { if (typeof b === 'number') { if (a?.cancelled === true) return Task.abort() return Task.exec(a, (self, res) => { const f = () => clearTimeout(id) const id = setTimeout(() => { self.unReg(f) res() }, b); self.reg(f) }) } return Task.exec((self, res) => { const f = () => clearTimeout(id) const id = setTimeout(() => { self.unReg(f) res() }, a); self.reg(f) }) } /** Yield time slice and not continue if cancelled */ static yield(token?: CancelToken): Task { if (isNone(token)) return Task.exec((self, res) => { queueMicrotask(() => { if (!self.#cancelled) res() }) }) if (token.cancelled) return Task.abort() return Task.exec(token, (self, res) => { queueMicrotask(() => { if (!(self.#cancelled || token.cancelled)) res() }) }) } /** Yield time slice and not continue if cancelled, using this as CancelToken */ yield(): Task { return Task.yield(this) } /** Cancelable async delay, using this as CancelToken */ delay(ms?: number): Task { return Task.delay(this, ms) } /** Run Task, using this as CancelToken */ subRun(f: (self: Task) => PromiseLike): Task { return Task.run(this, f) } /** Run task with promise parameters, using this as CancelToken * @param executor A callback used to initialize the promise. This callback is passed two arguments: * a resolve callback used to resolve the promise with a value or the result of another promise, * and a reject callback used to reject the promise with a provided reason or error. */ exec(executor: (self: Task, resolve: (value: T | PromiseLike) => void, reject: (reason?: any) => void) => void): Task { return Task.exec(this, executor) } /** Creates a new cancelled task * @returns A cancelled task */ static abort(): Task { const task = new Task(TaskNoInit as any) task.#p = abort() task.#cancelled = true return task } /** * Creates a new resolved task. * @returns A resolved task. */ static resolve(): Task /** * Creates a new resolved task for the provided value. * @param value A task. * @returns A task whose internal state matches the provided task. */ static resolve(value: T | PromiseLike): Task static resolve(value?: T | PromiseLike): Task{ const task = new Task(TaskNoInit as any) task.#p = Promise.resolve(value!) task.#finished = true return task } /** * Creates a new rejected promise for the provided reason. * @param reason The reason the promise was rejected. * @returns A new rejected Promise. */ static reject(reason?: any): Task { const task = new Task(TaskNoInit as any) task.#p = Promise.reject(reason) task.#finished = true return task } /** Wait or cancel all task */ static all(token: CancelToken, ...args: T): Task<{ [K in keyof T]: T[K] extends PromiseLike ? V : T[K] }> { return Task.exec(token, (self, res) => { const fs: (() => void)[] = []; (async () => { try { await Promise.all(args.map(v => { if ('cancel' in v) { const f = () => v.cancel() fs.push(f) self.reg(f) } return v })).then(res) } catch (e) { for (const f of fs) { f() } throw e } finally { for (const f of fs) { self.unReg(f) } } })() }) } /** Wait or cancel all task */ all(...args: T): Task<{ [K in keyof T]: T[K] extends PromiseLike ? V : T[K] }> { return Task.all(this, ...args) } /** Wait one and cancel other task */ static race(token: CancelToken, ...args: T[]): Task ? U : T> { return Task.exec(token, (self, res) => { const fs: (() => void)[] = []; (async () => { try { await Promise.race(args.map(v => { if ('cancel' in v) { const f = () => (v as any).cancel() fs.push(f) self.reg() } return v })).then(res) } finally { for (const f of fs) { f() self.unReg(f) } } })() }) } /** Wait one and cancel other task */ race(...args: T[]): Task ? U : T> { return Task.race(this, ...args) } static async scope>(token: CancelToken, promise: T): Promise ? R : T> static async scope>(token: CancelToken, promise: T): Promise ? R : T> static async scope(token: CancelToken, promise: T): Promise ? R : T> static async scope(token: CancelToken, promise: T): Promise ? R : T> { let f: (() => void) | undefined if ('cancel' in promise) { f = () => (promise as any).cancel() token.reg(f) } try { return await (promise as any) } finally { if (f != null) token.unReg(f) } } scope>(promise: T): Promise ? R : T> scope>(promise: T): Promise ? R : T> scope(promise: T): Promise ? R : T> scope(promise: T): Promise ? R : T> { return Task.scope(this, promise) } } Task.prototype[Symbol.toStringTag] = 'Task'