/* eslint-disable @typescript-eslint/no-use-before-define */ import * as A from "../../Array"; import { identity, pipe, tuple } from "../../Function"; import * as O from "../../Option"; import * as T from "./_internal/task"; import { XQueue } from "./model"; /** * Takes between min and max number of values from the queue. If there * is less than min items available, it'll block until the items are * collected. */ export const takeBetween = (min: number, max: number) => ( self: XQueue ): T.Task => { function takeRemaining(n: number): T.Task> { if (n <= 0) { return T.pure([]); } else { return T.chain_(self.take, (a) => T.map_(takeRemaining(n - 1), (_) => [a, ..._])); } } if (max < min) { return T.pure([]); } else { return pipe( self.takeUpTo(max), T.chain((bs) => { const remaining = min - bs.length; if (remaining === 1) { return T.map_(self.take, (b) => [...bs, b]); } else if (remaining > 1) { return T.map_(takeRemaining(remaining - 1), (list) => [...bs, ...A.reverse(list)]); } else { return T.pure(bs); } }) ); } }; /** * Takes between min and max number of values from the queue. If there * is less than min items available, it'll block until the items are * collected. */ export const takeBetween_ = ( self: XQueue, min: number, max: number ): T.Task => takeBetween(min, max)(self); /** * Waits until the queue is shutdown. * The `IO` returned by this method will not resume until the queue has been shutdown. * If the queue is already shutdown, the `IO` will resume right away. */ export const awaitShutdown = (self: XQueue) => self.awaitShutdown; /** * How many elements can hold in the queue */ export const capacity = (self: XQueue) => self.capacity; /** * `true` if `shutdown` has been called. */ export const isShutdown = (self: XQueue) => self.isShutdown; /** * Places one value in the queue. */ export const offer = (a: A) => (self: XQueue) => self.offer(a); /** * Places one value in the queue. */ export const offer_ = (self: XQueue, a: A) => self.offer(a); /** * For Bounded Queue: uses the `BackPressure` Strategy, places the values in the queue and always returns true. * If the queue has reached capacity, then * the fiber performing the `offerAll` will be suspended until there is room in * the queue. * * For Unbounded Queue: * Places all values in the queue and returns true. * * For Sliding Queue: uses `Sliding` Strategy * If there is room in the queue, it places the values otherwise it removes the old elements and * enqueues the new ones. Always returns true. * * For Dropping Queue: uses `Dropping` Strategy, * It places the values in the queue but if there is no room it will not enqueue them and return false. * */ export const offerAll = (as: Iterable) => (self: XQueue) => self.offerAll(as); /** * For Bounded Queue: uses the `BackPressure` Strategy, places the values in the queue and always returns true. * If the queue has reached capacity, then * the fiber performing the `offerAll` will be suspended until there is room in * the queue. * * For Unbounded Queue: * Places all values in the queue and returns true. * * For Sliding Queue: uses `Sliding` Strategy * If there is room in the queue, it places the values otherwise it removes the old elements and * enqueues the new ones. Always returns true. * * For Dropping Queue: uses `Dropping` Strategy, * It places the values in the queue but if there is no room it will not enqueue them and return false. * */ export const offerAll_ = (self: XQueue, as: Iterable) => self.offerAll(as); /** * Interrupts any fibers that are suspended on `offer` or `take`. * Future calls to `offer*` and `take*` will be interrupted immediately. */ export const shutdown = (self: XQueue) => self.shutdown; /** * Retrieves the size of the queue, which is equal to the number of elements * in the queue. This may be negative if fibers are suspended waiting for * elements to be added to the queue. */ export const size = (self: XQueue) => self.size; /** * Removes the oldest value in the queue. If the queue is empty, this will * return a computation that resumes when an item has been added to the queue. */ export const take = (self: XQueue) => self.take; /** * Removes all the values in the queue and returns the list of the values. If the queue * is empty returns empty list. */ export const takeAll = (self: XQueue) => self.takeAll; /** * Takes up to max number of values in the queue. */ export const takeAllUpTo = (n: number) => (self: XQueue) => self.takeUpTo(n); /** * Takes up to max number of values in the queue. */ export const takeAllUpTo_ = (self: XQueue, n: number) => self.takeUpTo(n); /** * Creates a new queue from this queue and another. Offering to the composite queue * will broadcast the elements to both queues; taking from the composite queue * will dequeue elements from both queues and apply the function point-wise. * * Note that using queues with different strategies may result in surprising behavior. * For example, a dropping queue and a bounded queue composed together may apply `f` * to different elements. */ export const mapBothM = ( that: XQueue, f: (b: B, c: C) => T.Task ) => (self: XQueue) => mapBothM_(self, that, f); /** * Creates a new queue from this queue and another. Offering to the composite queue * will broadcast the elements to both queues; taking from the composite queue * will dequeue elements from both queues and apply the function point-wise. * * Note that using queues with different strategies may result in surprising behavior. * For example, a dropping queue and a bounded queue composed together may apply `f` * to different elements. */ export const mapBothM_ = ( self: XQueue, that: XQueue, f: (b: B, c: C) => T.Task ): XQueue => new (class extends XQueue { awaitShutdown: T.IO = T.chain_(self.awaitShutdown, () => that.awaitShutdown); capacity: number = Math.min(self.capacity, that.capacity); isShutdown: T.IO = self.isShutdown; offer: (a: A1) => T.Task = (a) => T.mapBothPar_(self.offer(a), that.offer(a), (x, y) => x && y); offerAll: (as: Iterable) => T.Task = (as) => T.mapBothPar_(self.offerAll(as), that.offerAll(as), (x, y) => x && y); shutdown: T.IO = T.mapBothPar_(self.shutdown, that.shutdown, () => undefined); size: T.IO = T.mapBothPar_(self.size, that.size, (x, y) => Math.max(x, y)); take: T.Task = T.chain_(T.bothPar_(self.take, that.take), ([b, c]) => f(b, c)); takeAll: T.Task = T.chain_( T.bothPar_(self.takeAll, that.takeAll), ([bs, cs]) => { const abs = Array.from(bs); const acs = Array.from(cs); const all = A.zip_(abs, acs); return T.traverseI_(all, ([b, c]) => f(b, c)); } ); takeUpTo: (n: number) => T.Task = (max) => T.chain_(T.bothPar_(self.takeUpTo(max), that.takeUpTo(max)), ([bs, cs]) => { const abs = Array.from(bs); const acs = Array.from(cs); const all = A.zip_(abs, acs); return T.traverseI_(all, ([b, c]) => f(b, c)); }); })(); /** * Like `bothWithM`, but uses a pure function. */ export const mapBoth = ( that: XQueue, f: (b: B, c: C) => D ) => (self: XQueue) => mapBothM_(self, that, (b, c) => T.pure(f(b, c))); /** * Like `bothWithM`, but uses a pure function. */ export const mapBoth_ = ( self: XQueue, that: XQueue, f: (b: B, c: C) => D ) => mapBothM_(self, that, (b, c) => T.pure(f(b, c))); /** * Like `bothWith`, but tuples the elements instead of applying a function. */ export const both = (that: XQueue) => < RA, RB, EA, EB >( self: XQueue ) => mapBoth_(self, that, (b, c) => tuple(b, c)); /** * Like `bothWith`, but tuples the elements instead of applying a function. */ export const both_ = ( self: XQueue, that: XQueue ) => mapBoth_(self, that, (b, c) => tuple(b, c)); /** * Transforms elements enqueued into and dequeued from this queue with the * specified effectual functions. */ export const bimap = (f: (c: C) => A, g: (b: B) => D) => ( self: XQueue ) => bimapM_( self, (c: C) => T.pure(f(c)), (b) => T.pure(g(b)) ); /** * Transforms elements enqueued into and dequeued from this queue with the * specified effectual functions. */ export const bimap_ = ( self: XQueue, f: (c: C) => A, g: (b: B) => D ) => bimapM_( self, (c: C) => T.pure(f(c)), (b) => T.pure(g(b)) ); /** * Transforms elements enqueued into and dequeued from this queue with the * specified effectual functions. */ export const bimapM = (f: (c: C) => T.Task, g: (b: B) => T.Task) => < RA, RB, EA, EB >( self: XQueue ): XQueue => bimapM_(self, f, g); /** * Transforms elements enqueued into and dequeued from this queue with the * specified effectual functions. */ export const bimapM_ = ( self: XQueue, f: (c: C) => T.Task, g: (b: B) => T.Task ): XQueue => new (class extends XQueue { awaitShutdown: T.IO = self.awaitShutdown; capacity: number = self.capacity; isShutdown: T.IO = self.isShutdown; offer: (a: C) => T.Task = (c) => T.chain_(f(c), self.offer); offerAll: (as: Iterable) => T.Task = (cs) => T.chain_(T.traverseI_(cs, f), self.offerAll); shutdown: T.IO = self.shutdown; size: T.IO = self.size; take: T.Task = T.chain_(self.take, g); takeAll: T.Task = T.chain_(self.takeAll, (a) => T.traverseI_(a, g)); takeUpTo: (n: number) => T.Task = (max) => T.chain_(self.takeUpTo(max), (bs) => T.traverseI_(bs, g)); })(); /** * Transforms elements enqueued into this queue with a taskful function. */ export const contramapM = (f: (c: C) => T.Task) => ( self: XQueue ) => bimapM_(self, f, T.pure); /** * Transforms elements enqueued into this queue with a pure function. */ export const contramap = (f: (c: C) => A) => (self: XQueue) => bimapM_(self, (c: C) => T.pure(f(c)), T.pure); /** * Like `filterInput`, but uses a taskful function to filter the elements. */ export const filterInputM = (f: (_: A1) => T.Task) => ( self: XQueue ): XQueue => filterInputM_(self, f); /** * Like `filterInput`, but uses a taskful function to filter the elements. */ export const filterInputM_ = ( self: XQueue, f: (_: A1) => T.Task ): XQueue => new (class extends XQueue { awaitShutdown: T.IO = self.awaitShutdown; capacity: number = self.capacity; isShutdown: T.IO = self.isShutdown; offer: (a: A1) => T.Task = (a) => T.chain_(f(a), (b) => (b ? self.offer(a) : T.pure(false))); offerAll: (as: Iterable) => T.Task = (as) => pipe( as, T.traverseI((a) => pipe( f(a), T.map((b) => (b ? O.some(a) : O.none())) ) ), T.chain((maybeAs) => { const filtered = A.mapOption_(maybeAs, identity); if (A.isEmpty(filtered)) { return T.pure(false); } else { return self.offerAll(filtered); } }) ); shutdown: T.IO = self.shutdown; size: T.IO = self.size; take: T.Task = self.take; takeAll: T.Task = self.takeAll; takeUpTo: (n: number) => T.Task = (max) => self.takeUpTo(max); })(); /** * Applies a filter to elements enqueued into this queue. Elements that do not * pass the filter will be immediately dropped. */ export const filterInput = (f: (_: A1) => boolean) => ( self: XQueue ): XQueue => filterInputM_(self, (a) => T.pure(f(a))); /** * Applies a filter to elements enqueued into this queue. Elements that do not * pass the filter will be immediately dropped. */ export const filterInput_ = ( self: XQueue, f: (_: A1) => boolean ): XQueue => filterInputM_(self, (a) => T.pure(f(a))); /** * Transforms elements dequeued from this queue with a taskful function. */ export const mapM = (f: (b: B) => T.Task) => ( self: XQueue ) => bimapM_(self, (a: A) => T.pure(a), f); /** * Transforms elements dequeued from this queue with a taskful function. */ export const mapM_ = ( self: XQueue, f: (b: B) => T.Task ) => bimapM_(self, (a: A) => T.pure(a), f); /** * Take the head option of values in the queue. */ export const poll = (self: XQueue) => T.map_(self.takeUpTo(1), A.head);