/** * Promise-based blocking queue. * * @license * Copyright 2022-2024 Matter.js Authors * SPDX-License-Identifier: Apache-2.0 */ import { MatterFlowError } from "../MatterError.js"; import { Time, Timer } from "../time/Time.js"; import { createPromise } from "./Promises.js"; import { EndOfStreamError, NoResponseTimeoutError, Stream } from "./Stream.js"; export class Queue implements Stream { private readonly queue = new Array(); private pendingRead?: { resolver: (data: T) => void; rejecter: (reason: any) => void; timeoutTimer?: Timer }; private closed = false; async read(timeoutMs = 60_000): Promise { const { promise, resolver, rejecter } = createPromise(); if (this.closed) throw new EndOfStreamError(); const data = this.queue.shift(); if (data !== undefined) { return data; } if (this.pendingRead !== undefined) throw new MatterFlowError("Only one pending read is supported"); this.pendingRead = { resolver, rejecter, timeoutTimer: Time.getTimer("Queue timeout", timeoutMs, () => rejecter(new NoResponseTimeoutError()), ).start(), }; return promise; } async write(data: T) { if (this.closed) throw new EndOfStreamError(); if (this.pendingRead !== undefined) { this.pendingRead.timeoutTimer?.stop(); this.pendingRead.resolver(data); this.pendingRead = undefined; return; } this.queue.push(data); } close() { if (this.closed) return; this.closed = true; if (this.pendingRead === undefined) return; this.pendingRead.timeoutTimer?.stop(); this.pendingRead.rejecter(new EndOfStreamError()); } }