import { AsyncLocalStorage } from 'node:async_hooks'; import { Injectable } from '@travetto/di'; import { AppError, AsyncQueue, castTo } from '@travetto/runtime'; type Ctx = Record; /** * Async context using `asyncHooks` */ @Injectable() export class AsyncContext { storage = new AsyncLocalStorage(); constructor() { this.run = this.run.bind(this); this.iterate = this.iterate.bind(this); } #get(): Ctx { const store = this.storage.getStore(); if (!store) { throw new AppError('Context is not initialized'); } return castTo(store); } /** * Are we in an active context */ get active(): boolean { return this.storage.getStore() !== undefined; } /** * Get context field by key */ get(key: string | symbol): T | undefined { return this.#get()[key]; } /** * Set context field by key */ set(key: string | symbol, value: T | undefined): void { this.#get()[key] = value; } /** * Get entire context * @private */ copy(): Ctx { return structuredClone(this.#get()); } /** * Run an async function and ensure the context is available during execution */ async run(fn: () => Promise | T, init: Ctx = {}): Promise { return this.storage.run({ ...this.storage.getStore(), ...init }, fn); } /** * Run an async function and ensure the context is available during execution */ iterate(fn: () => AsyncIterable, init: Ctx = {}): AsyncIterable { const out = new AsyncQueue(); this.storage.run({ ...this.storage.getStore(), ...init }, async () => { try { for await (const item of fn()) { out.add(item); } out.close(); } catch (error) { out.throw(castTo(error)); } }); return out; } }