import { defaultCancellationToken } from "./Async.ts"; import { fromContinuations } from "./Async.ts"; import { startImmediate } from "./Async.ts"; import { Async } from "./AsyncBuilder.ts"; import { Continuation, Continuations } from "./AsyncBuilder.ts"; import { CancellationToken } from "./AsyncBuilder.ts"; import { Exception } from "./Util.ts"; class QueueCell { public value: Msg; public next?: QueueCell; constructor(message: Msg) { this.value = message; } } class MailboxQueue { private firstAndLast?: [QueueCell, QueueCell]; public add(message: Msg) { const itCell = new QueueCell(message); if (this.firstAndLast) { this.firstAndLast[1].next = itCell; this.firstAndLast = [this.firstAndLast[0], itCell]; } else { this.firstAndLast = [itCell, itCell]; } } public tryGet() { if (this.firstAndLast) { const value = this.firstAndLast[0].value; if (this.firstAndLast[0].next) { this.firstAndLast = [this.firstAndLast[0].next, this.firstAndLast[1]]; } else { delete this.firstAndLast; } return value; } return void 0; } } export type MailboxBody = (m: MailboxProcessor) => Async; export interface AsyncReplyChannel { reply: (r: Reply) => void; } export class MailboxProcessor { public body: MailboxBody; public cancellationToken: CancellationToken; public messages: MailboxQueue; public continuation?: Continuation; constructor(body: MailboxBody, cancellationToken?: CancellationToken) { this.body = body; this.cancellationToken = cancellationToken || defaultCancellationToken; this.messages = new MailboxQueue(); } } function __processEvents($this: MailboxProcessor) { if ($this.continuation) { const value = $this.messages.tryGet(); if (value) { const cont = $this.continuation; delete $this.continuation; cont(value); } } } export function startInstance($this: MailboxProcessor) { startImmediate($this.body($this), $this.cancellationToken); } export function receive($this: MailboxProcessor) { return fromContinuations((conts: Continuations) => { if ($this.continuation) { throw new Exception("Receive can only be called once!"); } $this.continuation = conts[0]; __processEvents($this); }); } export function post($this: MailboxProcessor, message: Msg) { $this.messages.add(message); __processEvents($this); } export function postAndAsyncReply( $this: MailboxProcessor, buildMessage: (c: AsyncReplyChannel) => Msg, ) { let result: Reply; let continuation: Continuation; function checkCompletion() { if (result !== void 0 && continuation !== void 0) { continuation(result); } } const reply = { reply: (res: Reply) => { result = res; checkCompletion(); }, }; $this.messages.add(buildMessage(reply)); __processEvents($this); return fromContinuations((conts: Continuations) => { continuation = conts[0]; checkCompletion(); }); } export function start(body: MailboxBody, cancellationToken?: CancellationToken) { const mbox = new MailboxProcessor(body, cancellationToken); startInstance(mbox); return mbox; } export default MailboxProcessor;