{"version":3,"sources":["../src/async_queue.ts"],"sourcesContent":["// SPDX-FileCopyrightText: 2025 LiveKit, Inc.\n//\n// SPDX-License-Identifier: Apache-2.0\nimport { Deque } from '@datastructures-js/deque';\n\n/**\n * AsyncQueue is a bounded queue with async support for both producers and consumers.\n *\n * This queue simplifies the AudioMixer implementation by handling backpressure and\n * synchronization automatically:\n * - Producers can await put() until the queue has space (when queue is full)\n * - Consumers can await waitForItem() until data is available (when queue is empty)\n *\n * This eliminates the need for manual coordination logic, polling loops, and\n * complex state management throughout the rest of the codebase.\n */\nexport class AsyncQueue<T> {\n  private items: T[] = [];\n  private waitingProducers = new Deque<{ resolve: () => void; reject: (err: Error) => void }>();\n  private waitingConsumers = new Deque<() => void>();\n  closed = false;\n\n  constructor(private capacity: number = Infinity) {}\n\n  async put(item: T) {\n    if (this.closed) throw new Error('Queue closed');\n\n    while (this.items.length >= this.capacity) {\n      await new Promise<void>((resolve, reject) =>\n        this.waitingProducers.pushBack({ resolve, reject }),\n      );\n      // Re-check if closed after waking up\n      if (this.closed) throw new Error('Queue closed');\n    }\n\n    this.items.push(item);\n\n    // Wake up one waiting consumer\n    if (this.waitingConsumers.size() > 0) {\n      const resolve = this.waitingConsumers.popFront()!;\n      resolve();\n    }\n  }\n\n  get(): T | undefined {\n    const item = this.items.shift();\n    if (this.waitingProducers.size() > 0) {\n      const producer = this.waitingProducers.popFront()!;\n      producer.resolve(); // wakes up one waiting producer\n    }\n    return item;\n  }\n\n  /**\n   * Wait until an item is available or the queue is closed.\n   * Returns immediately if items are already available.\n   */\n  async waitForItem(): Promise<void> {\n    if (this.items.length > 0 || this.closed) {\n      return;\n    }\n    await new Promise<void>((resolve) => this.waitingConsumers.pushBack(resolve));\n  }\n\n  close() {\n    this.closed = true;\n    // Reject all waiting producers with an error\n    this.waitingProducers\n      .toArray()\n      .forEach((producer) => producer.reject(new Error('Queue closed')));\n    // Resolve all waiting consumers so they can see the queue is closed\n    this.waitingConsumers.toArray().forEach((resolve) => resolve());\n    this.waitingProducers.clear();\n    this.waitingConsumers.clear();\n  }\n\n  get length() {\n    return this.items.length;\n  }\n}\n"],"mappings":";;;;;;;;;;;;;;;;;;AAAA;AAAA;AAAA;AAAA;AAAA;AAGA,mBAAsB;AAaf,MAAM,WAAc;AAAA,EAMzB,YAAoB,WAAmB,UAAU;AAA7B;AALpB,SAAQ,QAAa,CAAC;AACtB,SAAQ,mBAAmB,IAAI,mBAA6D;AAC5F,SAAQ,mBAAmB,IAAI,mBAAkB;AACjD,kBAAS;AAAA,EAEyC;AAAA,EAElD,MAAM,IAAI,MAAS;AACjB,QAAI,KAAK,OAAQ,OAAM,IAAI,MAAM,cAAc;AAE/C,WAAO,KAAK,MAAM,UAAU,KAAK,UAAU;AACzC,YAAM,IAAI;AAAA,QAAc,CAAC,SAAS,WAChC,KAAK,iBAAiB,SAAS,EAAE,SAAS,OAAO,CAAC;AAAA,MACpD;AAEA,UAAI,KAAK,OAAQ,OAAM,IAAI,MAAM,cAAc;AAAA,IACjD;AAEA,SAAK,MAAM,KAAK,IAAI;AAGpB,QAAI,KAAK,iBAAiB,KAAK,IAAI,GAAG;AACpC,YAAM,UAAU,KAAK,iBAAiB,SAAS;AAC/C,cAAQ;AAAA,IACV;AAAA,EACF;AAAA,EAEA,MAAqB;AACnB,UAAM,OAAO,KAAK,MAAM,MAAM;AAC9B,QAAI,KAAK,iBAAiB,KAAK,IAAI,GAAG;AACpC,YAAM,WAAW,KAAK,iBAAiB,SAAS;AAChD,eAAS,QAAQ;AAAA,IACnB;AACA,WAAO;AAAA,EACT;AAAA;AAAA;AAAA;AAAA;AAAA,EAMA,MAAM,cAA6B;AACjC,QAAI,KAAK,MAAM,SAAS,KAAK,KAAK,QAAQ;AACxC;AAAA,IACF;AACA,UAAM,IAAI,QAAc,CAAC,YAAY,KAAK,iBAAiB,SAAS,OAAO,CAAC;AAAA,EAC9E;AAAA,EAEA,QAAQ;AACN,SAAK,SAAS;AAEd,SAAK,iBACF,QAAQ,EACR,QAAQ,CAAC,aAAa,SAAS,OAAO,IAAI,MAAM,cAAc,CAAC,CAAC;AAEnE,SAAK,iBAAiB,QAAQ,EAAE,QAAQ,CAAC,YAAY,QAAQ,CAAC;AAC9D,SAAK,iBAAiB,MAAM;AAC5B,SAAK,iBAAiB,MAAM;AAAA,EAC9B;AAAA,EAEA,IAAI,SAAS;AACX,WAAO,KAAK,MAAM;AAAA,EACpB;AACF;","names":[]}