import { OperationCanceledException, Trampoline } from "./AsyncBuilder.ts"; import { Continuation, Continuations } from "./AsyncBuilder.ts"; import { Async, IAsyncContext, CancellationToken } from "./AsyncBuilder.ts"; import { protectedCont, protectedBind, protectedReturn } from "./AsyncBuilder.ts"; import { FSharpChoice$2_$union, Choice_makeChoice1Of2, Choice_makeChoice2Of2 } from "./Choice.ts"; import { TimeoutException_$ctor } from "./System.ts"; import { Exception } from "./Util.ts"; function emptyContinuation(_x: T) { // NOP } // see AsyncBuilder.Delay function delay(generator: () => Async) { return protectedCont((ctx: IAsyncContext) => generator()(ctx)); } // MakeAsync: body:(AsyncActivation<'T> -> AsyncReturn) -> Async<'T> export function makeAsync(body: Async) { return body; } // Invoke: computation: Async<'T> -> ctxt:AsyncActivation<'T> -> AsyncReturn export function invoke(computation: Async, ctx: IAsyncContext) { return computation(ctx); } // CallThenInvoke: ctxt:AsyncActivation<'T> -> result1:'U -> part2:('U -> Async<'T>) -> AsyncReturn export function callThenInvoke(ctx: IAsyncContext, result1: U, part2: (x: U) => Async) { return part2(result1)(ctx); } // Bind: ctxt:AsyncActivation<'T> -> part1:Async<'U> -> part2:('U -> Async<'T>) -> AsyncReturn export function bind(ctx: IAsyncContext, part1: Async, part2: (x: U) => Async) { return protectedBind(part1, part2)(ctx); } export function createCancellationToken(arg?: boolean | number): CancellationToken { const token = new CancellationToken(typeof arg === "boolean" ? arg : false); if (typeof arg === "number") { setTimeout(() => { token.cancel(); }, arg); } return token; } export function cancel(token: CancellationToken) { token.cancel(); } export function cancelAfter(token: CancellationToken, ms: number) { setTimeout(() => { token.cancel(); }, ms); } export function isCancellationRequested(token: CancellationToken) { return token != null && token.isCancelled; } export function throwIfCancellationRequested(token: CancellationToken) { if (token != null && token.isCancelled) { throw new Exception("Operation is cancelled"); } } export function startChild(computation: Async, ms?: number): Async> { const promise = startAsPromise(computation); let promiseToRun = promise; if (ms) { // Race the computation against a timeout: whichever settles first wins. promiseToRun = new Promise((resolve, reject) => { const timeoutId = setTimeout(() => reject(TimeoutException_$ctor()), ms); promise.then( value => { clearTimeout(timeoutId); resolve(value); }, error => { clearTimeout(timeoutId); reject(error); } ); }); } // JS Promises are hot, computation has already started // but we delay returning the result return protectedCont((ctx) => protectedReturn(awaitPromise(promiseToRun))(ctx)); } export function awaitPromise(p: Promise) { return fromContinuations((conts: Continuations) => p.then(conts[0]).catch((err) => (err instanceof OperationCanceledException ? conts[2] : conts[1])(err))); } export function cancellationToken() { return protectedCont((ctx: IAsyncContext) => ctx.onSuccess(ctx.cancelToken)); } export const defaultCancellationToken = new CancellationToken(); export function catchAsync(work: Async) { return protectedCont((ctx: IAsyncContext>) => { work({ onSuccess: (x) => ctx.onSuccess(Choice_makeChoice1Of2(x)), onError: (ex) => ctx.onSuccess(Choice_makeChoice2Of2(ex)), onCancel: ctx.onCancel, cancelToken: ctx.cancelToken, trampoline: ctx.trampoline, }); }); } export function fromContinuations(f: (conts: Continuations) => void) { return protectedCont((ctx: IAsyncContext) => f([ctx.onSuccess, ctx.onError, ctx.onCancel])); } export function ignore(computation: Async) { return protectedBind(computation, (_x) => protectedReturn(void 0)); } export function parallel(computations: Iterable>) { return delay(() => awaitPromise(Promise.all(Array.from(computations, (w) => startAsPromise(w))))); } export function sequential(computations: Iterable>) { function _sequential(computations: Iterable>): Promise { let pr: Promise = Promise.resolve([]); for (const c of computations) { pr = pr.then(results => startAsPromise(c).then(r => results.concat([r]))) } return pr; } return delay(() => awaitPromise(_sequential(computations))); } export function sleep(millisecondsDueTime: number) { return protectedCont((ctx: IAsyncContext) => { let tokenId: number; const timeoutId = setTimeout(() => { ctx.cancelToken.removeListener(tokenId); ctx.onSuccess(void 0); }, millisecondsDueTime); tokenId = ctx.cancelToken.addListener(() => { clearTimeout(timeoutId); ctx.onCancel(new OperationCanceledException()); }); }); } export function start(computation: Async, cancellationToken?: CancellationToken) { return startWithContinuations( computation, emptyContinuation, function (err) { throw err }, emptyContinuation, cancellationToken ); } export function startImmediate(computation: Async, cancellationToken?: CancellationToken) { return start(computation, cancellationToken); } export function startWithContinuations( computation: Async, continuation: Continuation, exceptionContinuation: Continuation, cancellationContinuation: Continuation, cancelToken?: CancellationToken) { const trampoline = new Trampoline(); computation({ onSuccess: continuation ? continuation as Continuation : emptyContinuation, onError: exceptionContinuation, onCancel: cancellationContinuation, cancelToken: cancelToken ? cancelToken : defaultCancellationToken, trampoline, }); } export function startAsPromise(computation: Async, cancellationToken?: CancellationToken) { return new Promise((resolve: Continuation, reject: Continuation) => startWithContinuations(computation, resolve, reject, reject, cancellationToken ? cancellationToken : defaultCancellationToken)); }