import { AsyncIterableX } from '../asynciterablex.js'; import { RefCountList } from '../../iterable/operators/_refcountlist.js'; import { create } from '../create.js'; import { OperatorAsyncFunction } from '../../interfaces.js'; import { MemoizeAsyncBuffer } from './memoize.js'; import { throwIfAborted } from '../../aborterror.js'; class PublishedAsyncBuffer extends MemoizeAsyncBuffer { protected declare _buffer: RefCountList; constructor(source: AsyncIterator) { super(source, new RefCountList(0)); } [Symbol.asyncIterator](signal?: AbortSignal) { throwIfAborted(signal); this._buffer.readerCount++; return this._getIterable(this._buffer.count)[Symbol.asyncIterator](); } } /** * Creates a buffer with a view over the source sequence, causing each iterator to obtain access to the * remainder of the sequence from the current index in the buffer. * * @template TSource Source sequence element type. * @returns {OperatorAsyncFunction} Buffer enabling each iterator to retrieve elements from * the shared source sequence, starting from the index at the point of obtaining the enumerator. */ export function publish(): OperatorAsyncFunction; /** * Buffer enabling each iterator to retrieve elements from the shared source sequence, starting from the * index at the point of obtaining the iterator. * * @template TSource Source sequence element type. * @template TResult Result sequence element type. * @param {(value: AsyncIterable) => AsyncIterable} [selector] Selector function with published * access to the source sequence for each iterator. * @returns {OperatorAsyncFunction} Sequence resulting from applying the selector function to the * published view over the source sequence. */ export function publish( selector?: (value: AsyncIterable) => AsyncIterable ): OperatorAsyncFunction; /** * Buffer enabling each iterator to retrieve elements from the shared source sequence, starting from the * index at the point of obtaining the iterator. * * @template TSource Source sequence element type. * @template TResult Result sequence element type. * @param {(value: AsyncIterable) => AsyncIterable} [selector] Selector function with published * access to the source sequence for each iterator. * @returns {(OperatorAsyncFunction)} Sequence resulting from applying the selector function to the * published view over the source sequence. */ export function publish( selector?: (value: AsyncIterable) => AsyncIterable ): OperatorAsyncFunction { return function publishOperatorFunction( source: AsyncIterable ): AsyncIterableX { return selector ? create(async () => selector(publish()(source))[Symbol.asyncIterator]()) : new PublishedAsyncBuffer(source[Symbol.asyncIterator]()); }; }