import { Event, EventLike, EventStream, EventStreamSeed, EventStreamSource, isEnd, ObservableSeed, Observer, Scope, Subscribe, toEvents, TypeBitfield, T_SOURCE, T_SCOPED, T_SEED, T_STREAM, Unsub, Desc, } from "./abstractions" import { applyScopeMaybe } from "./applyscope" import { Dispatcher } from "./dispatcher" import { HKT } from "./hkt" import { ObservableBase, ObservableSeedImpl } from "./observable" import { scopedSubscribe, scopedSubscribe1 } from "./scope" type StreamEvents = { value: V } // TODO: consider how scopes should affect streams export abstract class EventStreamBase extends ObservableBase> implements EventStream { public [HKT]!: EventStream observableType(): "EventStream" | "Bus" { return "EventStream" } _L: TypeBitfield = T_STREAM | T_SCOPED abstract getScope(): Scope applyScope(scope: Scope): EventStream { return new StatelessEventStream( this.desc, scopedSubscribe1(scope, this.subscribe.bind(this)), scope ) } } // Note that we could use a Dispatcher as Bus, except for prototype inheritance of EventStream on the way export class StatefulEventStream extends EventStreamBase { protected dispatcher = new Dispatcher>() private _scope: Scope constructor(desc: Desc, scope: Scope) { super(desc) this._scope = scope } subscribe(onValue: Observer, onEnd?: Observer) { return this.dispatcher.on("value", onValue, onEnd) } getScope() { return this._scope } } export class StatelessEventStream extends EventStreamBase { observableType() { return "EventStream" as const } private _scope: Scope subscribe: Subscribe constructor(desc: Desc, subscribe: Subscribe, scope: Scope) { super(desc) this._scope = scope // No need to wrap subscribe with scope in EventStreams as there's no `get` method to protect unlike in Properties this.subscribe = subscribe } getScope() { return this._scope } } export class SeedToStream extends StatefulEventStream { observableType() { return "EventStream" as const } constructor( seed: ObservableSeed, EventStream>, scope: Scope ) { super(seed.desc, scope) const source = seed.consume() scope.subscribe(() => source.subscribe( (v) => this.dispatcher.dispatch("value", v), () => this.dispatcher.dispatchEnd("value") ) ) } } export class EventStreamSourceImpl extends ObservableBase< V, EventStream > { public [HKT]!: EventStreamSource observableType() { return "EventStreamSource" as const } _L: TypeBitfield = T_STREAM | T_SOURCE subscribe: Subscribe constructor(desc: Desc, subscribe: Subscribe) { super(desc) this.subscribe = subscribe } applyScope(scope: Scope): EventStream { return new SeedToStream(this, scope) } } export class EventStreamSeedImpl extends ObservableSeedImpl, EventStream> implements EventStreamSeed { public [HKT]!: EventStreamSeed observableType() { return "EventStreamSeed" as const } _L: TypeBitfield = T_STREAM | T_SEED constructor(desc: Desc, subscribe: Subscribe) { super(new EventStreamSourceImpl(desc, subscribe)) } applyScope(scope: Scope): EventStream { return new SeedToStream(this, scope) } }