import { SubjectStream } from '@dequanto/class/SubjectStream'; import { TEth } from '@dequanto/models/TEth'; import { TTransport } from './transports/ITransport'; import { class_EventEmitter } from 'atma-utils'; export class RpcSubscription extends SubjectStream implements TTransport.Subscription { private _emitter: class_EventEmitter constructor (public id: number, public transport: TTransport.Transport, mapper?: (value) => T) { super(); this._mapper = mapper; } map (mapper: (x: T) => any): RpcSubscription { let stream = new RpcSubscription(this.id, this.transport, mapper); stream.fromStream(this); return stream; } async unsubscribe(cb?: Function): Promise { super.unsubscribe(cb); if (this._cbs.length === 0) { await this.transport.unsubscribe({ id: Date.now(), jsonrpc: '2.0', method: 'eth_unsubscribe', params: [ this.id ] }); } return true; } static createMapping (subscription: TTransport.Subscription, transport: TTransport.Transport, mapper: (x: any) => any): RpcSubscription { let stream = new RpcSubscription(subscription.id, transport, mapper); //@TODO: fix 'any' stream.fromStream(subscription as any); return stream; } } export type RpcLogFilterOptions = { address?: TEth.Address | TEth.Address[] topics?: TEth.Hex[] }