import { mergeMap } from 'rxjs/operators'; import { Observable, ObservableInput } from 'rxjs'; import { ServerGatewayMessage } from '../../Gateway/ValueObject/ServerGatewayMessage'; import { ServerGatewayMetadata } from '../../Gateway/ValueObject/ServerGatewayMetadata'; import { ServerGatewayInterface } from '../../Gateway/ServerGatewayInterface'; /** * Helper transformation operator with client gateways. * * @example * * fromClientQuery((clientGateway) => { * return (clientGateway, messages$) => { * // clientGateway.emit() * // Return some observable * return messages$; * }; * }) */ export function fromClientQuery< R, ClientGateway extends ServerGatewayInterface, Message extends ServerGatewayMessage> >( callback: (clientGateway: ClientGateway, message: Message) => () => ObservableInput, ) { return (input: Observable) => { return input.pipe( mergeMap((message) => { return callback(message.metadata.clientGateway, message)(); }), ); }; }