import { PromiseResolver, isPromiseLike } from "@yume-chan/async"; import type { ConsumableReadableStreamController, ConsumableReadableStreamSource, ConsumableWritableStreamSink, } from "./consumable/index.js"; import { ConsumableReadableStream, ConsumableWrapByteReadableStream, ConsumableWrapWritableStream, ConsumableWritableStream, } from "./consumable/index.js"; import type { Task } from "./task.js"; import { createTask } from "./task.js"; export class Consumable { static readonly WritableStream = ConsumableWritableStream; static readonly WrapWritableStream = ConsumableWrapWritableStream; static readonly ReadableStream = ConsumableReadableStream; static readonly WrapByteReadableStream = ConsumableWrapByteReadableStream; readonly #task: Task; readonly #resolver: PromiseResolver; readonly value: T; readonly consumed: Promise; constructor(value: T) { this.#task = createTask("Consumable"); this.value = value; this.#resolver = new PromiseResolver(); this.consumed = this.#resolver.promise; } consume() { this.#resolver.resolve(); } error(error: unknown) { this.#resolver.reject(error); } tryConsume(callback: (value: T) => U) { try { let result = this.#task.run(() => callback(this.value)); if (isPromiseLike(result)) { result = result.then( (value) => { this.#resolver.resolve(); return value; }, (e) => { this.#resolver.reject(e); throw e; }, ) as U; } else { this.#resolver.resolve(); } return result; } catch (e) { this.#resolver.reject(e); throw e; } } } export namespace Consumable { export type WritableStreamSink = ConsumableWritableStreamSink; export type WritableStream = typeof ConsumableWritableStream; export type WrapWritableStream = typeof ConsumableWrapWritableStream; export type ReadableStreamController = ConsumableReadableStreamController; export type ReadableStreamSource = ConsumableReadableStreamSource; export type ReadableStream = typeof ConsumableReadableStream; export type WrapByteReadableStream = typeof ConsumableWrapByteReadableStream; }