import { STMInterruptException, STMRetryException } from "@effect/core/stm/STM" import { concreteTDequeue } from "@effect/core/stm/THub/operations/_internal/InternalTDequeue" /** * Takes a value from the queue. * * @tsplus getter effect/core/stm/THub/TDequeue take */ export function take(self: THub.TDequeue): USTM { concreteTDequeue(self) return STM.Effect((journal, fiberId) => { let currentSubscriberHead = self.subscriberHead.unsafeGet(journal) if (currentSubscriberHead == null) { throw new STMInterruptException(fiberId) } let a = undefined as A let loop = true while (loop) { const node = currentSubscriberHead.unsafeGet(journal) if (node == null) { throw new STMRetryException() } const head = node.head const tail = node.tail if (head != null) { const subscribers = node.subscribers if (subscribers == 1) { const size = self.hubSize.unsafeGet(journal) const updatedNode = THub.Node(undefined, 0, node.tail) currentSubscriberHead.unsafeSet(updatedNode, journal) self.publisherHead.unsafeSet(tail, journal) self.hubSize.unsafeSet(size - 1, journal) } else { const updatedNode = THub.Node(node.head, subscribers - 1, node.tail) currentSubscriberHead.unsafeSet(updatedNode, journal) } self.subscriberHead.unsafeSet(tail, journal) a = head loop = false } else { currentSubscriberHead = tail } } return a }) }