import { ISocketEpicContext } from '../kit'; import { IncomingMessage } from 'http'; import { Observable } from 'rxjs'; import { fromEventBus, pushToEventBus } from '../eventBus'; import { takeUntil } from 'rxjs/operators'; import { whenCompleted } from '../whenCompleted'; import { IAction } from '../action'; import { TaggedLogger } from '../logging'; export interface ICreateContextParams = {}> { request: IncomingMessage & { id: string; }; commands: Observable; binary: Observable; logger: TaggedLogger; buildDeps?: () => D; } export function createSocketEpicContext = {}>( params: ICreateContextParams ): ISocketEpicContext & D { const { request, commands, binary, logger, buildDeps } = params; const closed = commands.pipe(whenCompleted()); const takeUntilClosed = () => (stream: Observable) => takeUntil(closed)(stream); const subscribe = () => fromEventBus().pipe(takeUntilClosed()); const publish = () => (stream: Observable) => stream.pipe(pushToEventBus()); const deps: D | {} = buildDeps?.() ?? {}; return { ...deps, id: request.id, request, binary, publish, subscribe, logger, takeUntilClosed, } as D & ISocketEpicContext; }