import { Exception, ensureErrorOrException, IDisposable } from "./Util.ts"; export interface AsyncReplyChannel { reply(value: Reply): void } export type Continuation = (x: T) => void; export type Continuations = [ Continuation, Continuation, Continuation ]; export class CancellationToken implements IDisposable { private _id: number; private _cancelled: boolean; private _listeners: Map void>; constructor(cancelled = false) { this._id = 0; this._cancelled = cancelled; this._listeners = new Map(); } get isCancelled() { return this._cancelled; } public cancel() { if (!this._cancelled) { this._cancelled = true; for (const [, listener] of this._listeners) { listener(); } } } public addListener(f: () => void) { const id = this._id; this._listeners.set(this._id++, f); return id; } public removeListener(id: number) { return this._listeners.delete(id); } public register(f: (state?: any) => void, state?: any) { const $ = this; const id = this.addListener(state == null ? f : () => f(state)); return { Dispose() { $.removeListener(id); } }; } public Dispose() { // Implement IDisposable for compatibility but do nothing // According to docs, calling Dispose does not trigger cancellation // https://docs.microsoft.com/en-us/dotnet/api/system.threading.cancellationtokensource.dispose?view=net-6.0 } } export class OperationCanceledException extends Exception { constructor(msg?: string) { super(msg ?? "The operation was canceled"); // Object.setPrototypeOf(this, OperationCanceledException.prototype); } } export class Trampoline { static get maxTrampolineCallCount() { return 2000; } private callCount: number; constructor() { this.callCount = 0; } public incrementAndCheck() { return this.callCount++ > Trampoline.maxTrampolineCallCount; } public hijack(f: () => void) { this.callCount = 0; setTimeout(f, 0); } } export interface IAsyncContext { onSuccess: Continuation; onError: Continuation; onCancel: Continuation; cancelToken: CancellationToken; trampoline: Trampoline; } export type Async = (x: IAsyncContext) => void; export function protectedCont(f: Async) { return (ctx: IAsyncContext) => { if (ctx.cancelToken.isCancelled) { ctx.onCancel(new OperationCanceledException()); } else if (ctx.trampoline.incrementAndCheck()) { ctx.trampoline.hijack(() => { try { f(ctx); } catch (err) { ctx.onError(ensureErrorOrException(err)); } }); } else { try { f(ctx); } catch (err) { ctx.onError(ensureErrorOrException(err)); } } }; } export function protectedBind(computation: Async, binder: (x: T) => Async) { return protectedCont((ctx: IAsyncContext) => { computation({ onSuccess: (x: T) => { try { binder(x)(ctx); } catch (err) { ctx.onError(ensureErrorOrException(err)); } }, onError: ctx.onError, onCancel: ctx.onCancel, cancelToken: ctx.cancelToken, trampoline: ctx.trampoline, }); }); } export function protectedReturn(value: T) { return protectedCont((ctx: IAsyncContext) => ctx.onSuccess(value)); } export class AsyncBuilder { public Bind(computation: Async, binder: (x: T) => Async) { return protectedBind(computation, binder); } public Combine(computation1: Async, computation2: Async) { return this.Bind(computation1, () => computation2); } public Delay(generator: () => Async) { return protectedCont((ctx: IAsyncContext) => generator()(ctx)); } public For(sequence: Iterable, body: (x: T) => Async) { const iter = sequence[Symbol.iterator](); let cur = iter.next(); return this.While(() => !cur.done, this.Delay(() => { const res = body(cur.value); cur = iter.next(); return res; })); } public Return(value: T) { return protectedReturn(value); } public ReturnFrom(computation: Async) { return computation; } public TryFinally(computation: Async, compensation: () => void) { return protectedCont((ctx: IAsyncContext) => { computation({ onSuccess: (x: T) => { compensation(); ctx.onSuccess(x); }, onError: (x: any) => { compensation(); ctx.onError(x); }, onCancel: (x: any) => { compensation(); ctx.onCancel(x); }, cancelToken: ctx.cancelToken, trampoline: ctx.trampoline, }); }); } public TryWith(computation: Async, catchHandler: (e: any) => Async) { return protectedCont((ctx: IAsyncContext) => { computation({ onSuccess: ctx.onSuccess, onCancel: ctx.onCancel, cancelToken: ctx.cancelToken, trampoline: ctx.trampoline, onError: (ex: any) => { try { catchHandler(ex)(ctx); } catch (err) { ctx.onError(ensureErrorOrException(err)); } }, }); }); } public Using(resource: T, binder: (x: T) => Async) { return this.TryFinally(binder(resource), () => resource.Dispose()); } public While(guard: () => boolean, computation: Async): Async { if (guard()) { return this.Bind(computation, () => this.While(guard, computation)); } else { return this.Return(void 0); } } public Zero() { return protectedCont((ctx: IAsyncContext) => ctx.onSuccess(void 0)); } } export const singleton = new AsyncBuilder();