import MicroEmitter from '../../micro-emitter'; import Subscription from './subscription'; import type { SubscriptionArgs, StreamingOptions } from './subscription'; import StreamingOrphanFinder from './orphan-finder'; import Connection from './connection/connection'; import * as connectionConstants from './connection/constants'; import type * as types from './types'; import type AuthProvider from '../authProvider'; import type { ITransport } from '../transport/transport-base'; /** * Find matching delay based on current retry count/index. * @param retryLevels - The retry levels that contain different delays for various retry count levels. * Structure: `[ { level: Number, delay: Number } ].` * @param retryIndex - The current retry index/try/count. * @param defaultDelay - The default delay. * @returns Matching delay to retry index/try/count. */ export declare function findRetryDelay(retryLevels: types.RetryDelayLevel[], retryIndex: number, defaultDelay: number): number; declare type EmittedEvents = { [connectionConstants.EVENT_CONNECTION_STATE_CHANGED]: (connectionState: types.ConnectionState | null) => void; [connectionConstants.EVENT_STREAMING_FAILED]: () => void; [connectionConstants.EVENT_CONNECTION_SLOW]: () => void; [connectionConstants.EVENT_DISCONNECT_REQUESTED]: () => void; [connectionConstants.EVENT_PROBE_MESSAGE]: (message: types.ProbeControlMessage) => void; }; /** * Manages subscriptions to the Open API streaming service. * Once created this will immediately attempt to start the streaming service */ declare class Streaming extends MicroEmitter { /** * Event that occurs when the connection state changes. */ EVENT_CONNECTION_STATE_CHANGED: "connectionStateChanged"; /** * Event that occurs when the connection is slow. */ EVENT_CONNECTION_SLOW: "connectionSlow"; /** * Event that occurs when the connection has completely failed. */ EVENT_STREAMING_FAILED: "streamingFailed"; /** * Event that occurs when server sends _disconnect control message. */ EVENT_DISCONNECT_REQUESTED: "streamingDisconnectRequested"; /** * Event that occurs when probe message is received. */ EVENT_PROBE_MESSAGE: "probeMessage"; /** * Streaming has been created but has not yet started the connection. */ CONNECTION_STATE_INITIALIZING: 1; /** * The connection has been started but may not yet be connecting. */ CONNECTION_STATE_STARTED: 2; /** * Connection is trying to connect. The previous state was CONNECTION_STATE_STARTED or CONNECTION_STATE_DISCONNECTED. */ CONNECTION_STATE_CONNECTING: 4; /** * Connection is connected and everything is good. */ CONNECTION_STATE_CONNECTED: 8; /** * Connection is reconnecting. The previous state was CONNECTION_STATE_CONNECTING. * We are current not connected, but might recover without having to reset. */ CONNECTION_STATE_RECONNECTING: 16; /** * Connection is disconnected. Streaming may attempt to connect again. */ CONNECTION_STATE_DISCONNECTED: 32; /** * Connection is failed */ CONNECTION_STATE_FAILED: 64; READABLE_CONNECTION_STATE_MAP: { readonly 1: "Initializing"; readonly 2: "Started"; readonly 4: "Connecting"; readonly 8: "Connected"; readonly 16: "Reconnecting"; readonly 32: "Disconnected"; readonly 64: "Failed"; }; retryCount: number; latestActivity: number; connectionState: types.ConnectionState | null; baseUrl: string; authProvider: AuthProvider; transport: ITransport; subscriptions: Subscription[]; isReset: boolean; paused: boolean; orphanFinder: StreamingOrphanFinder; private orphanEvents; private multipleOrphanDetectorTimeoutId; connection: Connection; connectionOptions: types.ConnectionOptions; reconnecting: boolean; contextId: string; contextMessageCount: number; retryDelay: number; retryDelayLevels?: types.RetryDelayLevel[]; reconnectTimer?: number; disposed: boolean; private heartBeatLog; /** * @param transport - The transport to use for subscribing/unsubscribing. * @param baseUrl - The base URL with which to connect. /streaming/connection will be appended to it. * @param authProvider - An instance of the AuthProvider class. * @param options - (optional) The configuration options for the streaming connection */ constructor(transport: ITransport, baseUrl: string, authProvider: AuthProvider, options?: Partial); setOptions(options: types.StreamingConfigurableOptions): void; /** * Initializes a connection, and starts handling streaming events. * * Starts in an Initializing state, transitions to Started when the Connection starts * then follows the Connection state model. */ private init; private onStreamingFailed; /** * The streaming connection received a unauthorized - the token is * being rejected so we should get a new one. */ private onUnauthorized; /** * Reconnects the streaming socket when it is disconnected */ private connect; private setNewContextId; /** * Retries the connection after a time */ private retryConnection; /** * Handles connection state change */ private onConnectionStateChanged; /** * Handles connection start */ private onConnectionStarted; private processUpdate; /** * handles the connection received event from SignalR * @param updates - updates */ private onReceived; /** * Finds a subscription by referenceId or returns undefined if not found * @param referenceId - referenceId */ private findSubscriptionByReferenceId; /** * Sends an update to a subscription by finding it and calling its callback * @param update - update */ private sendDataUpdateToSubscribers; private getHeartbeats; private getTargetReferenceIds; /** * Handles a control message on the streaming connection * @param message - message from open-api */ private handleControlMessage; /** * Fires heartbeats to relevant subscriptions * @param heartbeatList - heartbeatList */ private handleControlMessageFireHeartbeats; /** * Handles probe control messages and calls callback if specified in options */ private handleControlMessageProbe; /** * Resets subscriptions passed * @param subscriptions - subscriptions * @param isServerInitiated - (optional) will be false when we do rest due to missing message Default = true */ private resetSubscriptions; /** * Handles the control message to reset subscriptions based on a id list. If no list is given, * reset all subscriptions. * @param referenceIdList - referenceIdList */ private handleControlMessageResetSubscriptions; /** * Handles the control message to disconnect, * Notify subscriptions about connect unavailability * Fire disconnect requested event * @param referenceIdList - referenceIdList */ private handleControlMessageDisconnect; private handleControlMessageReconnect; reconnect(): void; /** * handles the connection slow event from SignalR. Happens when a keep-alive is missed. */ private onConnectionSlow; /** * Updates the connection query string */ private updateConnectionQuery; /** * Called when a subscription is created * updates the orphan finder to look for that subscription */ private onSubscriptionCreated; private detectMultipleOrphans; /** * Called when an orphan is found - resets that subscription * @param subscription - subscription */ private onOrphanFound; private handleSubscriptionReadyForUnsubscribe; private getSubscriptionsByTag; private getSubscriptionsReadyPromise; private unsubscribeSubscriptionByTag; private onSubscribeNetworkError; private onSubscriptionReset; private onSubscriptionReadyToRemove; /** * Constructs a new subscription to the given resource. * * @param servicePath - The service path e.g. 'trade' * @param url - The name of the resource to subscribe to, e.g. '/v1/infoprices/subscriptions'. * @param subscriptionArgs - (optional) Arguments that detail the subscription. * @param options - (optional) streaming options * @returns A subscription object. */ createSubscription(servicePath: string, url: string, subscriptionArgs?: SubscriptionArgs, options?: StreamingOptions): Subscription; /** * Makes a subscription start. * * @param subscription - The subscription to start. */ subscribe(subscription: Subscription): void; /** * Makes a subscription start with modification. * Modify subscription will keep pending unsubscribe followed by modify subscribe. * * @param subscription - The subscription to modify. * @param args - The target arguments of modified subscription. * @param options - Options for subscription modification. */ modify(subscription: Subscription, args: Record, options: { isPatch?: false; isReplace: true; } | { isPatch: true; isReplace?: false; patchArgsDelta: Record; } | { isPatch?: false; isReplace?: false; }): void; /** * Makes a subscription stop (can be restarted). See {@link saxo.openapi.Streaming#disposeSubscription} for permanently stopping a subscription. * * @param subscription - The subscription to stop. */ unsubscribe(subscription: Subscription): void; /** * Disposes a subscription permanently. It will be stopped and not be able to be started. * * @param subscription - The subscription to stop and remove. */ disposeSubscription(subscription: Subscription): void; /** * Makes all subscriptions stop at the given service path and url with the given tag (can be restarted) * See {@link saxo.openapi.Streaming#disposeSubscriptionByTag} for permanently stopping subscriptions by tag. * * @param servicePath - the service path of the subscriptions to unsubscribe * @param url - the url of the subscriptions to unsubscribe * @param tag - the tag of the subscriptions to unsubscribe */ unsubscribeByTag(servicePath: string, url: string, tag: string): void; /** * Disposes all subscriptions at the given service path and url by tag permanently. They will be stopped and not be able to be started. * * @param servicePath - the service path of the subscriptions to unsubscribe * @param url - the url of the subscriptions to unsubscribe * @param tag - the tag of the subscriptions to unsubscribe */ disposeSubscriptionByTag(servicePath: string, url: string, tag: string): void; /** * This disconnects the current socket. We will follow normal reconnection logic to try and restore the connection. * It *will not* stop the subscription (see dispose for that). It is useful for testing reconnect logic works or for resetting all subscriptions. */ disconnect(): void; pause(): void; resume(): void; /** * Shuts down streaming. */ dispose(): void; getQuery(): string | void | null; resetStreaming(baseUrl: string, options?: {}): void; getActiveTransportName(): string | null; isPaused(): boolean; } export default Streaming;