import { VirtualWebSocketClient } from './virtual-websocket-client' import { genRequestId } from './message' import { IDatabaseServiceContext, } from '@cloudbase/types/database' import { IWatchOptions, DBRealtimeListener, IRequestMessage, IResponseMessage, IRequestMessagePingMsg, IRequestMessageLoginMsg, IResponseMessageLoginResMsg, IRequestMessageLoginData, } from '@cloudbase/types/realtime' import { CloseEventCode, CLOSE_EVENT_CODE_INFO, getWSCloseError, } from './ws-event' import { ERR_CODE, TimeoutError, RealtimeErrorMessageError, CloudSDKError } from './error' import { getWsClass, getRuntime } from './common' import { sleep } from './utils' export interface IRealtimeWebSocketClientConstructorOptions { maxReconnect?: number reconnectInterval?: number context: IDatabaseServiceContext } export interface ISignature { envId: string secretVersion: number signStr: string wsUrl: string expireTS: number } export interface ILoginInfo { loggedIn: boolean loggingInPromise?: Promise loginStartTS?: number loginResult?: ILoginResult } export interface ILoginResult { envId: string } export interface IWSSendOptions { msg: IRequestMessage waitResponse?: boolean // when waitResponse is set to true, if skipOnMessage is true, general onMessage handler will be skipped skipOnMessage?: boolean timeout?: number } export interface IWSWatchOptions extends IWatchOptions { envId?: string collectionName: string query: string limit?: number orderBy?: Record } interface IResolveReject { resolve: (value?: any | PromiseLike | undefined) => void reject: (reason?: any) => void } interface IResponseWaitSpec extends IResolveReject { skipOnMessage?: boolean } interface IWsSign { signStr: string, wsUrl: string, secretVersion: string envId: string expiredTs: number } const WS_READY_STATE = { CONNECTING: 0, OPEN: 1, CLOSING: 2, CLOSED: 3, } const MAX_RTT_OBSERVED = 3 const DEFAULT_EXPECTED_EVENT_WAIT_TIME = 5000 const DEFAULT_UNTRUSTED_RTT_THRESHOLD = 10000 const DEFAULT_MAX_RECONNECT = 5 const DEFAULT_WS_RECONNECT_INTERVAL = 10000 // const DEFAULT_WS_RECONNECT_MAX_VALID_INTERVAL = 3 * 60 * 1000 const DEFAULT_PING_FAIL_TOLERANCE = 2 const DEFAULT_PONG_MISS_TOLERANCE = 2 const DEFAULT_LOGIN_TIMEOUT = 5000 export class RealtimeWebSocketClient { private virtualWSClient: Set = new Set() // after listener initWatch, the listener has the queryID and can store it here private queryIdClientMap: Map = new Map() private watchIdClientMap: Map = new Map() private maxReconnect: number private reconnectInterval: number private context: IDatabaseServiceContext private ws?: any private lastPingSendTS?: number private pingFailed = 0 private pongMissed = 0 private pingTimeoutId?: number private pongTimeoutId?: number private logins: Map = new Map() private wsInitPromise?: Promise private wsReadySubsribers: IResolveReject[] = [] private wsResponseWait: Map< string /* requestId */, IResponseWaitSpec > = new Map() private rttObserved: number[] = [] private reconnectState: boolean // obtained from the first getSignature with no envId provided private wsSign: IWsSign constructor(options: IRealtimeWebSocketClientConstructorOptions) { this.maxReconnect = options.maxReconnect || DEFAULT_MAX_RECONNECT this.reconnectInterval = options.reconnectInterval || DEFAULT_WS_RECONNECT_INTERVAL this.context = options.context } clearHeartbeat() { this.pingTimeoutId && clearTimeout(this.pingTimeoutId) this.pongTimeoutId && clearTimeout(this.pongTimeoutId) } send = async (opts: IWSSendOptions): Promise => new Promise((_resolve, _reject) => { void (async () => { let timeoutId: number let hasResolved = false let hasRejected = false const resolve: typeof _resolve = (value?: T | PromiseLike | undefined) => { hasResolved = true timeoutId && clearTimeout(timeoutId) _resolve(value) } const reject: typeof _reject = (error: any) => { hasRejected = true timeoutId && clearTimeout(timeoutId) _reject(error) } if (opts.timeout) { // @ts-ignore timeoutId = setTimeout(() => { (async () => { if (!hasResolved || !hasRejected) { // wait another immediate timeout to allow the success/fail callback to be invoked if ws has already got the result, // this is because the timer is registered before ws.send await sleep(0) if (!hasResolved || !hasRejected) { reject(new TimeoutError('wsclient.send timedout')) } } })() }, opts.timeout) } try { if (this.wsInitPromise !== undefined || this.wsInitPromise !== null) { await this.wsInitPromise } if (!this.ws) { reject(new Error('invalid state: ws connection not exists, can not send message')) return } if (this.ws.readyState !== WS_READY_STATE.OPEN) { reject(new Error(`ws readyState invalid: ${this.ws.readyState}, can not send message`)) return } if (opts.waitResponse) { const respWaitSpec: IResponseWaitSpec = { resolve, reject, skipOnMessage: opts.skipOnMessage, } this.wsResponseWait.set(opts.msg.requestId, respWaitSpec) } // console.log('send msg:', opts.msg) try { await this.ws.send(JSON.stringify(opts.msg)) if (!opts.waitResponse) { resolve(void 0) } } catch (err) { if (err) { reject(err) if (opts.waitResponse) { this.wsResponseWait.delete(opts.msg.requestId) } } } } catch (e) { reject(e) } })() }) close(code: CloseEventCode) { this.clearHeartbeat() if (this.ws) { this.ws.close(code, CLOSE_EVENT_CODE_INFO[code].name) this.ws = undefined } } closeAllClients = (error: any) => { this.virtualWSClient.forEach((client) => { client.closeWithError(error) }) } pauseClients = (clients?: Set) => { (clients || this.virtualWSClient).forEach((client) => { client.pause() }) } resumeClients = (clients?: Set) => { (clients || this.virtualWSClient).forEach((client) => { client.resume() }) } watch(options: IWSWatchOptions): DBRealtimeListener { if (!this.ws && (this.wsInitPromise === undefined || this.wsInitPromise === null)) { this.initWebSocketConnection(false) } const virtualClient = new VirtualWebSocketClient({ ...options, send: this.send, login: this.webLogin, isWSConnected: this.isWSConnected, onceWSConnected: this.onceWSConnected, getWaitExpectedTimeoutLength: this.getWaitExpectedTimeoutLength, onWatchStart: this.onWatchStart, onWatchClose: this.onWatchClose, debug: true, }) this.virtualWSClient.add(virtualClient) this.watchIdClientMap.set(virtualClient.watchId, virtualClient) return virtualClient.listener } private initWebSocketConnection = async ( reconnect: boolean, availableRetries: number = this.maxReconnect ): Promise => { // 当前处于正在重连中的状态 if (reconnect && this.reconnectState) { return // 忽略 } if (reconnect) { this.reconnectState = true // 重连状态开始 } if (this.wsInitPromise !== undefined && this.wsInitPromise !== null) { // there already exists a websocket initiation, just wait for it return this.wsInitPromise } if (reconnect) { this.pauseClients() } this.close(CloseEventCode.ReconnectWebSocket) this.wsInitPromise = new Promise((resolve, reject) => { (async () => { try { const wsSign = await this.getWsSign() await new Promise((success) => { const url = wsSign.wsUrl || 'wss://tcb-ws.tencentcloudapi.com' const wsClass = getWsClass() /* eslint-disable-next-line */ this.ws = wsClass ? new wsClass(url) : new WebSocket(url) success(void 0) }) if (this.ws.connect) { await this.ws.connect() } await this.initWebSocketEvent() resolve() if (reconnect) { this.resumeClients() this.reconnectState = false // 重连状态结束 } } catch (e) { console.error('[realtime] initWebSocketConnection connect fail', e) if (availableRetries > 0) { // this is an optimization, in case of network offline, we don't need to stubbornly sleep for sometime, // we only need to wait for the network to be back online, this ensures minimum downtime // const { isConnected } = await getNetworkStatus() const isConnected = true this.wsInitPromise = undefined if (isConnected) { await sleep(this.reconnectInterval) if (reconnect) { this.reconnectState = false // 重连异常也算重连状态结束 } } resolve(this.initWebSocketConnection(reconnect, availableRetries - 1)) } else { reject(e) if (reconnect) { this.closeAllClients(new CloudSDKError({ errCode: ERR_CODE.SDK_DATABASE_REALTIME_LISTENER_RECONNECT_WATCH_FAIL as string, errMsg: e, })) } } } })() }) try { await this.wsInitPromise this.wsReadySubsribers.forEach(({ resolve }) => resolve()) } catch (e) { this.wsReadySubsribers.forEach(({ reject }) => reject()) } finally { this.wsInitPromise = undefined this.wsReadySubsribers = [] } } private initWebSocketEvent = () => new Promise((resolve, reject) => { if (!this.ws) { throw new Error('can not initWebSocketEvent, ws not exists') } let wsOpened = false this.ws.onopen = (event) => { console.warn('[realtime] ws event: open', event) wsOpened = true resolve() } this.ws.onerror = (event) => { // all logins are invalid after disconnection this.logins = new Map() // error写进file if (!wsOpened) { console.error('[realtime] ws open failed with ws event: error', event) reject(event) } else { console.error('[realtime] ws event: error', event) this.clearHeartbeat() this.virtualWSClient.forEach(client => client.closeWithError(new CloudSDKError({ errCode: ERR_CODE.SDK_DATABASE_REALTIME_LISTENER_WEBSOCKET_CONNECTION_ERROR as string, errMsg: event, }))) } } // TODO: reconnect this.ws.onclose = (closeEvent) => { console.warn('[realtime] ws event: close', closeEvent) // all logins are invalid after disconnection this.logins = new Map() this.clearHeartbeat() switch (closeEvent.code) { case CloseEventCode.ReconnectWebSocket: { // just ignore break } case CloseEventCode.NoRealtimeListeners: { // quit break } case CloseEventCode.HeartbeatPingError: case CloseEventCode.HeartbeatPongTimeoutError: case CloseEventCode.NormalClosure: case CloseEventCode.AbnormalClosure: { // Normal Closure and Abnormal Closure: // expected closure, most likely dispatched by wechat client, // since this is the status code dispatched in case of network failure, // we should retry if (this.maxReconnect > 0) { this.initWebSocketConnection(true, this.maxReconnect) } else { this.closeAllClients(getWSCloseError(closeEvent.code)) } break } case CloseEventCode.NoAuthentication: { this.closeAllClients(getWSCloseError(closeEvent.code, closeEvent.reason)) break } default: { // we should retry by default if (this.maxReconnect > 0) { this.initWebSocketConnection(true, this.maxReconnect) } else { this.closeAllClients(getWSCloseError(closeEvent.code)) } } } } this.ws.onmessage = (res) => { // 支付宝小程序会返回res.data.data或res.message // 微信小程序返回res.data const rawMsg = res.data?.data || res.data // reset & restart heartbeat this.heartbeat() let msg: IResponseMessage try { msg = typeof rawMsg === 'string' ? JSON.parse(rawMsg as string) : rawMsg } catch (e) { throw new Error(`[realtime] onMessage parse res.data error: ${e}`) } if (msg.msgType === 'ERROR') { // 找到当前监听,并将error返回 let virtualWatch = null this.virtualWSClient.forEach((item) => { if (item.watchId === msg.watchId) { virtualWatch = item } }) if (virtualWatch) { virtualWatch.listener.onError(msg) } } const responseWaitSpec = this.wsResponseWait.get(msg.requestId) if (responseWaitSpec) { try { if (msg.msgType === 'ERROR') { responseWaitSpec.reject(new RealtimeErrorMessageError(msg)) } else { responseWaitSpec.resolve(msg) } } catch (e) { console.error( 'ws onMessage responseWaitSpec.resolve(msg) errored:', e ) } finally { this.wsResponseWait.delete(msg.requestId) } if (responseWaitSpec.skipOnMessage) { return } } if (msg.msgType === 'PONG') { if (this.lastPingSendTS) { const rtt = Date.now() - this.lastPingSendTS if (rtt > DEFAULT_UNTRUSTED_RTT_THRESHOLD) { console.warn(`[realtime] untrusted rtt observed: ${rtt}`) return } if (this.rttObserved.length >= MAX_RTT_OBSERVED) { this.rttObserved.splice( 0, this.rttObserved.length - MAX_RTT_OBSERVED + 1 ) } this.rttObserved.push(rtt) } return } let client = msg.watchId && this.watchIdClientMap.get(msg.watchId) if (client) { client.onMessage(msg) } else { console.error( `[realtime] no realtime listener found responsible for watchId ${msg.watchId}: `, msg ) switch (msg.msgType) { case 'INIT_EVENT': case 'NEXT_EVENT': case 'CHECK_EVENT': { client = this.queryIdClientMap.get(msg.msgData.queryID) if (client) { client.onMessage(msg) } break } default: { for (const [, client] of Array.from(this.watchIdClientMap.entries())) { client.onMessage(msg) break } } } } } this.heartbeat() }) private isWSConnected = (): boolean => Boolean(this.ws && this.ws.readyState === WS_READY_STATE.OPEN) private onceWSConnected = async (): Promise => { if (this.isWSConnected()) { return } if (this.wsInitPromise !== null && this.wsInitPromise !== undefined) { return this.wsInitPromise } return new Promise((resolve, reject) => { this.wsReadySubsribers.push({ resolve, reject, }) }) } private webLogin = async ( envId?: string, refresh?: boolean ): Promise => { if (!refresh) { if (envId) { const loginInfo = this.logins.get(envId) if (loginInfo) { if (loginInfo.loggedIn && loginInfo.loginResult) { return loginInfo.loginResult } if (loginInfo.loggingInPromise !== null && loginInfo.loggingInPromise !== undefined) { return loginInfo.loggingInPromise } } } else { const emptyEnvLoginInfo = this.logins.get('') if (emptyEnvLoginInfo?.loggingInPromise !== null && emptyEnvLoginInfo?.loggingInPromise !== undefined) { return emptyEnvLoginInfo.loggingInPromise } } } const promise = new Promise((resolve, reject) => { (async () => { try { const wsSign = await this.getWsSign() const msgData: IRequestMessageLoginData = { envId: wsSign.envId || '', accessToken: '', // 已废弃字段 referrer: 'web', sdkVersion: '', dataVersion: '', } const loginMsg: IRequestMessageLoginMsg = { watchId: undefined, requestId: genRequestId(), msgType: 'LOGIN', msgData, exMsgData: { runtime: getRuntime(), signStr: wsSign.signStr, secretVersion: wsSign.secretVersion, }, } const loginResMsg = await this.send({ msg: loginMsg, waitResponse: true, skipOnMessage: true, timeout: DEFAULT_LOGIN_TIMEOUT, }) if (!loginResMsg.msgData.code) { // login success resolve({ envId: wsSign.envId, }) } else { // login failed reject(new Error(`${loginResMsg.msgData.code} ${loginResMsg.msgData.message}`)) } } catch (e) { reject(e) } })() }) let loginInfo = envId && this.logins.get(envId) const loginStartTS = Date.now() if (loginInfo) { loginInfo.loggedIn = false loginInfo.loggingInPromise = promise loginInfo.loginStartTS = loginStartTS } else { loginInfo = { loggedIn: false, loggingInPromise: promise, loginStartTS, } this.logins.set(envId || '', loginInfo) } try { const loginResult = await promise const curLoginInfo = envId && this.logins.get(envId) if ( curLoginInfo && curLoginInfo === loginInfo && curLoginInfo.loginStartTS === loginStartTS ) { loginInfo.loggedIn = true loginInfo.loggingInPromise = undefined loginInfo.loginStartTS = undefined loginInfo.loginResult = loginResult return loginResult } if (curLoginInfo) { if (curLoginInfo.loggedIn && curLoginInfo.loginResult) { return curLoginInfo.loginResult } if (curLoginInfo.loggingInPromise !== null && curLoginInfo.loggingInPromise !== undefined) { return curLoginInfo.loggingInPromise } throw new Error('ws unexpected login info') } else { throw new Error('ws login info reset') } } catch (e) { loginInfo.loggedIn = false loginInfo.loggingInPromise = undefined loginInfo.loginStartTS = undefined loginInfo.loginResult = undefined throw e } } private getWsSign = async (): Promise => { if (this.wsSign && this.wsSign.expiredTs > Date.now()) { return this.wsSign } const expiredTs = Date.now() + 60000 const res = await this.context.appConfig.request.send('auth.wsWebSign', { runtime: getRuntime() }) if (res.code) { throw new Error(`[tcb-js-sdk] 获取实时数据推送登录票据失败: ${res.code}`) } if (res.data) { const { signStr, wsUrl, secretVersion, envId } = res.data return { signStr, wsUrl, secretVersion, envId, expiredTs, } } throw new Error('[tcb-js-sdk] 获取实时数据推送登录票据失败') } private getWaitExpectedTimeoutLength = () => { if (!this.rttObserved.length) { return DEFAULT_EXPECTED_EVENT_WAIT_TIME } // 1.5 * RTT return ( (this.rttObserved.reduce((acc, cur) => acc + cur) / this.rttObserved.length) * 1.5 ) } private heartbeat(immediate?: boolean) { this.clearHeartbeat() // @ts-ignore this.pingTimeoutId = setTimeout( () => { ( async () => { try { if (!this.ws || this.ws.readyState !== WS_READY_STATE.OPEN) { // no need to ping return } this.lastPingSendTS = Date.now() await this.ping() this.pingFailed = 0 // @ts-ignore this.pongTimeoutId = setTimeout(() => { console.error('pong timed out') if (this.pongMissed < DEFAULT_PONG_MISS_TOLERANCE) { this.pongMissed += 1 this.heartbeat(true) } else { // logical perceived connection lost, even though websocket did not receive error or close event this.initWebSocketConnection(true) } }, this.context.appConfig.realtimePongWaitTimeout) } catch (e) { if (this.pingFailed < DEFAULT_PING_FAIL_TOLERANCE) { this.pingFailed += 1 this.heartbeat() } else { this.close(CloseEventCode.HeartbeatPingError) } } } )() }, immediate ? 0 : this.context.appConfig.realtimePingInterval ) } private ping = async () => { const msg: IRequestMessagePingMsg = { watchId: undefined, requestId: genRequestId(), msgType: 'PING', msgData: null, } await this.send({ msg, }) } private onWatchStart = (client: VirtualWebSocketClient, queryID: string) => { this.queryIdClientMap.set(queryID, client) } private onWatchClose = (client: VirtualWebSocketClient, queryID: string) => { if (queryID) { this.queryIdClientMap.delete(queryID) } this.watchIdClientMap.delete(client.watchId) this.virtualWSClient.delete(client) if (!this.virtualWSClient.size) { // no more existing watch, we should release the websocket connection this.close(CloseEventCode.NoRealtimeListeners) } } }