import { Disconnect, Exchange, NormalizedData } from '../types' export type Computable = { readonly sourceDataTypes: string[] compute(message: NormalizedData): IterableIterator } export type ComputableFactory = () => Computable async function* _compute[], U extends NormalizedData | Disconnect>( messages: AsyncIterableIterator, ...computables: T ): AsyncIterableIterator[] ? (U extends Disconnect ? U | Z | Disconnect : U | Z) : never> { const factory = new Computables(computables) for await (const message of messages) { // always pass through source message yield message as any if (message.type === 'disconnect') { // reset all computables for given exchange if we've received disconnect for it factory.reset(message.exchange) continue } const normalizedMessage = message as NormalizedData const id = normalizedMessage.name !== undefined ? `${normalizedMessage.symbol}:${normalizedMessage.name}` : normalizedMessage.symbol const computablesMap = factory.getOrCreate(normalizedMessage.exchange, id) const computables = computablesMap[normalizedMessage.type] if (!computables) continue for (const computable of computables) { for (const computedMessage of computable.compute(normalizedMessage)) { yield computedMessage } } } } export function compute[], U extends NormalizedData | Disconnect>( messages: AsyncIterableIterator, ...computables: T ): AsyncIterableIterator[] ? (U extends Disconnect ? U | Z | Disconnect : U | Z) : never> { let _iterator = _compute(messages, ...computables) if ((messages as any).__realtime__ === true) { ;(_iterator as any).__realtime__ = true } return _iterator } class Computables { private _computables: { [ex in Exchange]?: { [key: string]: { [dataType: string]: Computable[] } } } = {} constructor(private readonly _computablesFactories: ComputableFactory[]) {} getOrCreate(exchange: Exchange, id: string) { if (this._computables[exchange] === undefined) { this._computables[exchange] = {} } if (this._computables[exchange]![id] === undefined) { this._computables[exchange]![id] = createComputablesMap(this._computablesFactories.map((c) => c())) } return this._computables[exchange]![id]! } reset(exchange: Exchange) { this._computables[exchange] = undefined } } function createComputablesMap(computables: Computable[]) { return computables.reduce((acc, computable) => { computable.sourceDataTypes.forEach((dataType) => { const existing = acc[dataType] if (!existing) { acc[dataType] = [computable] } else { acc[dataType].push(computable) } }) return acc }, {} as { [dataType: string]: Computable[] }) }