import * as Arr from "../Array.js" import type { Cause } from "../Cause.js" import { NoSuchElementException } from "../Cause.js" import type { Channel } from "../Channel.js" import * as Chunk from "../Chunk.js" import type { Effect } from "../Effect.js" import * as Effectable from "../Effectable.js" import type { Exit } from "../Exit.js" import { dual } from "../Function.js" import * as Inspectable from "../Inspectable.js" import type * as Api from "../Mailbox.js" import * as Option from "../Option.js" import { pipeArguments } from "../Pipeable.js" import { hasProperty } from "../Predicate.js" import type { Scheduler } from "../Scheduler.js" import type { Scope } from "../Scope.js" import type { Stream } from "../Stream.js" import * as channel from "./channel.js" import * as channelExecutor from "./channel/channelExecutor.js" import * as coreChannel from "./core-stream.js" import * as core from "./core.js" import * as circular from "./effect/circular.js" import * as fiberRuntime from "./fiberRuntime.js" import * as stream from "./stream.js" /** @internal */ export const TypeId: Api.TypeId = Symbol.for("effect/Mailbox") as Api.TypeId /** @internal */ export const ReadonlyTypeId: Api.ReadonlyTypeId = Symbol.for("effect/Mailbox/ReadonlyMailbox") as Api.ReadonlyTypeId /** @internal */ export const isMailbox = (u: unknown): u is Api.Mailbox => hasProperty(u, TypeId) /** @internal */ export const isReadonlyMailbox = (u: unknown): u is Api.ReadonlyMailbox => hasProperty(u, ReadonlyTypeId) type MailboxState = { readonly _tag: "Open" readonly takers: Set<(_: Effect) => void> readonly offers: Set> readonly awaiters: Set<(_: Effect) => void> } | { readonly _tag: "Closing" readonly takers: Set<(_: Effect) => void> readonly offers: Set> readonly awaiters: Set<(_: Effect) => void> readonly exit: Exit } | { readonly _tag: "Done" readonly exit: Exit } type OfferEntry = { readonly _tag: "Array" readonly remaining: Array offset: number readonly resume: (_: Effect>) => void } | { readonly _tag: "Single" readonly message: A readonly resume: (_: Effect) => void } const empty = Chunk.empty() const exitEmpty = core.exitSucceed(empty) const exitFalse = core.exitSucceed(false) const exitTrue = core.exitSucceed(true) const constDone = [empty, true] as const class MailboxImpl extends Effectable.Class, done: boolean], E> implements Api.Mailbox { readonly [TypeId]: Api.TypeId = TypeId readonly [ReadonlyTypeId]: Api.ReadonlyTypeId = ReadonlyTypeId private state: MailboxState = { _tag: "Open", takers: new Set(), offers: new Set(), awaiters: new Set() } private messages: Array = [] private messagesChunk = Chunk.empty() constructor( readonly scheduler: Scheduler, private capacity: number, readonly strategy: "suspend" | "dropping" | "sliding" ) { super() } offer(message: A): Effect { return core.suspend(() => { if (this.state._tag !== "Open") { return exitFalse } else if (this.messages.length + this.messagesChunk.length >= this.capacity) { switch (this.strategy) { case "dropping": return exitFalse case "suspend": if (this.capacity <= 0 && this.state.takers.size > 0) { this.messages.push(message) this.releaseTaker() return exitTrue } return this.offerRemainingSingle(message) case "sliding": this.unsafeTake() this.messages.push(message) return exitTrue } } this.messages.push(message) this.scheduleReleaseTaker() return exitTrue }) } unsafeOffer(message: A): boolean { if (this.state._tag !== "Open") { return false } else if (this.messages.length + this.messagesChunk.length >= this.capacity) { if (this.strategy === "sliding") { this.unsafeTake() this.messages.push(message) return true } else if (this.capacity <= 0 && this.state.takers.size > 0) { this.messages.push(message) this.releaseTaker() return true } return false } this.messages.push(message) this.scheduleReleaseTaker() return true } offerAll(messages: Iterable): Effect> { return core.suspend(() => { if (this.state._tag !== "Open") { return core.succeed(Chunk.fromIterable(messages)) } const remaining = this.unsafeOfferAllArray(messages) if (remaining.length === 0) { return exitEmpty } else if (this.strategy === "dropping") { return core.succeed(Chunk.unsafeFromArray(remaining)) } return this.offerRemainingArray(remaining) }) } unsafeOfferAll(messages: Iterable): Chunk.Chunk { return Chunk.unsafeFromArray(this.unsafeOfferAllArray(messages)) } unsafeOfferAllArray(messages: Iterable): Array { if (this.state._tag !== "Open") { return Arr.fromIterable(messages) } else if (this.capacity === Number.POSITIVE_INFINITY || this.strategy === "sliding") { if (this.messages.length > 0) { this.messagesChunk = Chunk.appendAll(this.messagesChunk, Chunk.unsafeFromArray(this.messages)) } if (this.strategy === "sliding") { this.messagesChunk = this.messagesChunk.pipe( Chunk.appendAll(Chunk.fromIterable(messages)), Chunk.takeRight(this.capacity) ) } else if (Chunk.isChunk(messages)) { this.messagesChunk = Chunk.appendAll(this.messagesChunk, messages) } else { this.messages = Arr.fromIterable(messages) } this.scheduleReleaseTaker() return [] } const free = this.capacity <= 0 ? this.state.takers.size : this.capacity - this.messages.length - this.messagesChunk.length if (free === 0) { return Arr.fromIterable(messages) } const remaining: Array = [] let i = 0 for (const message of messages) { if (i < free) { this.messages.push(message) } else { remaining.push(message) } i++ } this.scheduleReleaseTaker() return remaining } fail(error: E) { return this.done(core.exitFail(error)) } failCause(cause: Cause) { return this.done(core.exitFailCause(cause)) } unsafeDone(exit: Exit): boolean { if (this.state._tag !== "Open") { return false } else if (this.state.offers.size === 0 && this.messages.length === 0 && this.messagesChunk.length === 0) { this.finalize(exit) return true } this.state = { ...this.state, _tag: "Closing", exit } return true } shutdown: Effect = core.sync(() => { if (this.state._tag === "Done") { return true } this.messages = [] this.messagesChunk = empty const offers = this.state.offers this.finalize(this.state._tag === "Open" ? core.exitVoid : this.state.exit) if (offers.size > 0) { for (const entry of offers) { if (entry._tag === "Single") { entry.resume(exitFalse) } else { entry.resume(core.exitSucceed(Chunk.unsafeFromArray(entry.remaining.slice(entry.offset)))) } } offers.clear() } return true }) done(exit: Exit) { return core.sync(() => this.unsafeDone(exit)) } end = this.done(core.exitVoid) clear: Effect, E> = core.suspend(() => { if (this.state._tag === "Done") { return core.exitAs(this.state.exit, empty) } const messages = this.unsafeTakeAll() this.releaseCapacity() return core.succeed(messages) }) takeAll: Effect, done: boolean], E> = core.suspend(() => { if (this.state._tag === "Done") { return core.exitAs(this.state.exit, constDone) } const messages = this.unsafeTakeAll() if (messages.length === 0) { return core.zipRight(this.awaitTake, this.takeAll) } return core.succeed([messages, this.releaseCapacity()]) }) takeN(n: number): Effect, done: boolean], E> { return core.suspend(() => { if (this.state._tag === "Done") { return core.exitAs(this.state.exit, constDone) } else if (n <= 0) { return core.succeed([empty, false]) } n = Math.min(n, this.capacity) let messages: Chunk.Chunk if (n <= this.messagesChunk.length) { messages = Chunk.take(this.messagesChunk, n) this.messagesChunk = Chunk.drop(this.messagesChunk, n) } else if (n <= this.messages.length + this.messagesChunk.length) { this.messagesChunk = Chunk.appendAll(this.messagesChunk, Chunk.unsafeFromArray(this.messages)) this.messages = [] messages = Chunk.take(this.messagesChunk, n) this.messagesChunk = Chunk.drop(this.messagesChunk, n) } else { return core.zipRight(this.awaitTake, this.takeN(n)) } return core.succeed([messages, this.releaseCapacity()]) }) } unsafeTake(): Exit | undefined { if (this.state._tag === "Done") { return core.exitZipRight(this.state.exit, core.exitFail(new NoSuchElementException())) } let message: A if (this.messagesChunk.length > 0) { message = Chunk.unsafeHead(this.messagesChunk) this.messagesChunk = Chunk.drop(this.messagesChunk, 1) } else if (this.messages.length > 0) { message = this.messages[0] this.messagesChunk = Chunk.drop(Chunk.unsafeFromArray(this.messages), 1) this.messages = [] } else if (this.capacity <= 0 && this.state.offers.size > 0) { this.capacity = 1 this.releaseCapacity() this.capacity = 0 return this.messages.length > 0 ? core.exitSucceed(this.messages.pop()!) : undefined } else { return undefined } this.releaseCapacity() return core.exitSucceed(message) } take: Effect = core.suspend(() => this.unsafeTake() ?? core.zipRight(this.awaitTake, this.take) ) await: Effect = core.asyncInterrupt((resume) => { if (this.state._tag === "Done") { return resume(this.state.exit) } this.state.awaiters.add(resume) return core.sync(() => { if (this.state._tag !== "Done") { this.state.awaiters.delete(resume) } }) }) unsafeSize(): Option.Option { const size = this.messages.length + this.messagesChunk.length return this.state._tag === "Done" ? Option.none() : Option.some(size) } size = core.sync(() => this.unsafeSize()) commit() { return this.takeAll } pipe() { return pipeArguments(this, arguments) } toJSON() { return { _id: "effect/Mailbox", state: this.state._tag, size: this.unsafeSize().toJSON() } } toString(): string { return Inspectable.format(this) } [Inspectable.NodeInspectSymbol]() { return Inspectable.format(this) } private offerRemainingSingle(message: A) { return core.asyncInterrupt((resume) => { if (this.state._tag !== "Open") { return resume(exitFalse) } const entry: OfferEntry = { _tag: "Single", message, resume } this.state.offers.add(entry) return core.sync(() => { if (this.state._tag === "Open") { this.state.offers.delete(entry) } }) }) } private offerRemainingArray(remaining: Array) { return core.asyncInterrupt>((resume) => { if (this.state._tag !== "Open") { return resume(core.exitSucceed(Chunk.unsafeFromArray(remaining))) } const entry: OfferEntry = { _tag: "Array", remaining, offset: 0, resume } this.state.offers.add(entry) return core.sync(() => { if (this.state._tag === "Open") { this.state.offers.delete(entry) } }) }) } private releaseCapacity(): boolean { if (this.state._tag === "Done") { return this.state.exit._tag === "Success" } else if (this.state.offers.size === 0) { if (this.state._tag === "Closing" && this.messages.length === 0 && this.messagesChunk.length === 0) { this.finalize(this.state.exit) return this.state.exit._tag === "Success" } return false } let n = this.capacity - this.messages.length - this.messagesChunk.length for (const entry of this.state.offers) { if (n === 0) return false else if (entry._tag === "Single") { this.messages.push(entry.message) n-- entry.resume(exitTrue) this.state.offers.delete(entry) } else { for (; entry.offset < entry.remaining.length; entry.offset++) { if (n === 0) return false this.messages.push(entry.remaining[entry.offset]) n-- } entry.resume(exitEmpty) this.state.offers.delete(entry) } } return false } private awaitTake = core.asyncInterrupt((resume) => { if (this.state._tag === "Done") { return resume(this.state.exit) } this.state.takers.add(resume) return core.sync(() => { if (this.state._tag !== "Done") { this.state.takers.delete(resume) } }) }) private scheduleRunning = false private scheduleReleaseTaker() { if (this.scheduleRunning) { return } this.scheduleRunning = true this.scheduler.scheduleTask(this.releaseTaker, 0) } private releaseTaker = () => { this.scheduleRunning = false if (this.state._tag === "Done") { return } else if (this.state.takers.size === 0) { return } for (const taker of this.state.takers) { this.state.takers.delete(taker) taker(core.exitVoid) if (this.messages.length + this.messagesChunk.length === 0) { break } } } private unsafeTakeAll() { if (this.messagesChunk.length > 0) { const messages = this.messages.length > 0 ? Chunk.appendAll(this.messagesChunk, Chunk.unsafeFromArray(this.messages)) : this.messagesChunk this.messagesChunk = empty this.messages = [] return messages } else if (this.messages.length > 0) { const messages = Chunk.unsafeFromArray(this.messages) this.messages = [] return messages } else if (this.state._tag !== "Done" && this.state.offers.size > 0) { this.capacity = 1 this.releaseCapacity() this.capacity = 0 return Chunk.of(this.messages.pop()!) } return empty } private finalize(exit: Exit) { if (this.state._tag === "Done") { return } const openState = this.state this.state = { _tag: "Done", exit } for (const taker of openState.takers) { taker(exit) } openState.takers.clear() for (const awaiter of openState.awaiters) { awaiter(exit) } openState.awaiters.clear() } } /** @internal */ export const make = ( capacity?: number | { readonly capacity?: number | undefined readonly strategy?: "suspend" | "dropping" | "sliding" | undefined } | undefined ): Effect> => core.withFiberRuntime((fiber) => core.succeed( new MailboxImpl( fiber.currentScheduler, typeof capacity === "number" ? capacity : capacity?.capacity ?? Number.POSITIVE_INFINITY, typeof capacity === "number" ? "suspend" : capacity?.strategy ?? "suspend" ) ) ) /** @internal */ export const into: { ( self: Api.Mailbox ): (effect: Effect) => Effect ( effect: Effect, self: Api.Mailbox ): Effect } = dual( 2, ( effect: Effect, self: Api.Mailbox ): Effect => core.uninterruptibleMask((restore) => core.matchCauseEffect(restore(effect), { onFailure: (cause) => self.failCause(cause), onSuccess: (_) => self.end }) ) ) /** @internal */ export const toChannel = (self: Api.ReadonlyMailbox): Channel, unknown, E> => { const loop: Channel, unknown, E> = coreChannel.flatMap(self.takeAll, ([messages, done]) => done ? messages.length === 0 ? coreChannel.void : coreChannel.write(messages) : channel.zipRight(coreChannel.write(messages), loop)) return loop } /** @internal */ export const toStream = (self: Api.ReadonlyMailbox): Stream => stream.fromChannel(toChannel(self)) /** @internal */ export const fromStream: { (options?: { readonly capacity?: number | undefined readonly strategy?: "suspend" | "dropping" | "sliding" | undefined }): (self: Stream) => Effect, never, R | Scope> ( self: Stream, options?: { readonly capacity?: number | undefined readonly strategy?: "suspend" | "dropping" | "sliding" | undefined } ): Effect, never, R | Scope> } = dual((args) => stream.isStream(args[0]), ( self: Stream, options?: { readonly capacity?: number | undefined readonly strategy?: "suspend" | "dropping" | "sliding" | undefined } ): Effect, never, R | Scope> => core.tap( fiberRuntime.acquireRelease( make(options), (mailbox) => mailbox.shutdown ), (mailbox) => { const writer: Channel, never, E> = coreChannel.readWithCause({ onInput: (input: Chunk.Chunk) => coreChannel.flatMap(mailbox.offerAll(input), () => writer), onFailure: (cause: Cause) => mailbox.failCause(cause), onDone: () => mailbox.end }) return fiberRuntime.scopeWith((scope) => stream.toChannel(self).pipe( coreChannel.pipeTo(writer), channelExecutor.runIn(scope), circular.forkIn(scope) ) ) } ))