import { STMInterruptException } from "@effect/core/stm/STM" import { concreteTDequeue } from "@effect/core/stm/THub/operations/_internal/InternalTDequeue" /** * Takes up to the specified number of values from the queue. * * @tsplus static effect/core/stm/THub/TDequeue.Aspects takeUpTo * @tsplus pipeable effect/core/stm/THub/TDequeue takeUpTo */ export function takeUpTo(max: number) { return (self: THub.TDequeue): STM> => { concreteTDequeue(self) return STM.Effect((journal, fiberId) => { let currentSubscriberHead = self.subscriberHead.unsafeGet(journal) if (currentSubscriberHead == null) { throw new STMInterruptException(fiberId) } const builder = Chunk.builder() let n = 0 while (n != max) { const node = currentSubscriberHead.unsafeGet(journal) if (node == null) { n = max } else { const head = node.head const tail = node.tail if (head != null) { const subscribers = node.subscribers if (subscribers == 1) { const size = self.hubSize.unsafeGet(journal) const updatedNode = THub.Node(undefined, 0, node.tail) currentSubscriberHead.unsafeSet(updatedNode, journal) self.publisherHead.unsafeSet(tail, journal) self.hubSize.unsafeSet(size - 1, journal) } else { const updatedNode = THub.Node(node.head, subscribers - 1, node.tail) currentSubscriberHead.unsafeSet(updatedNode, journal) } builder.append(head) n += 1 } currentSubscriberHead = tail } } self.subscriberHead.unsafeSet(currentSubscriberHead, journal) return builder.build() }) } }