/* * Copyright 2023 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ import { Channel, ChannelCredentials, ClientDuplexStream, Metadata, StatusObject, connectivityState, experimental, loadPackageDefinition, logVerbosity, status } from "@grpc/grpc-js"; import { XdsDecodeContext, XdsDecodeResult, XdsResourceType } from "./xds-resource-type/xds-resource-type"; import { XdsResourceName, parseXdsResourceName, xdsResourceNameToString } from "./resources"; import { Node } from "./generated/envoy/config/core/v3/Node"; import { BootstrapInfo, CertificateProviderConfig, XdsServerConfig, loadBootstrapInfo, serverConfigEqual } from "./xds-bootstrap"; import BackoffTimeout = experimental.BackoffTimeout; import { DiscoveryRequest } from "./generated/envoy/service/discovery/v3/DiscoveryRequest"; import { DiscoveryResponse__Output } from "./generated/envoy/service/discovery/v3/DiscoveryResponse"; import * as adsTypes from './generated/ads'; import * as lrsTypes from './generated/lrs'; import * as protoLoader from '@grpc/proto-loader'; import { AggregatedDiscoveryServiceClient } from "./generated/envoy/service/discovery/v3/AggregatedDiscoveryService"; import { LoadReportingServiceClient } from "./generated/envoy/service/load_stats/v3/LoadReportingService"; import { createGoogleDefaultCredentials } from "./google-default-credentials"; import { Any__Output } from "./generated/google/protobuf/Any"; import { LoadStatsRequest } from "./generated/envoy/service/load_stats/v3/LoadStatsRequest"; import { LoadStatsResponse__Output } from "./generated/envoy/service/load_stats/v3/LoadStatsResponse"; import { Locality, Locality__Output } from "./generated/envoy/config/core/v3/Locality"; import { Duration } from "./generated/google/protobuf/Duration"; import { registerXdsClientWithCsds } from "./csds"; import CertificateProvider = experimental.CertificateProvider; import FileWatcherCertificateProvider = experimental.FileWatcherCertificateProvider; const TRACER_NAME = 'xds_client'; function trace(text: string): void { experimental.trace(logVerbosity.DEBUG, TRACER_NAME, text); } let loadedProtos: adsTypes.ProtoGrpcType & lrsTypes.ProtoGrpcType | null = null; function loadAdsProtos(): adsTypes.ProtoGrpcType & lrsTypes.ProtoGrpcType { if (loadedProtos !== null) { return loadedProtos; } return (loadPackageDefinition(protoLoader .loadSync( [ 'envoy/service/discovery/v3/ads.proto', 'envoy/service/load_stats/v3/lrs.proto', ], { keepCase: true, longs: String, enums: String, defaults: true, oneofs: true, json: true, includeDirs: [ // Paths are relative to src/build __dirname + '/../../deps/envoy-api/', __dirname + '/../../deps/xds/', __dirname + '/../../deps/googleapis/', __dirname + '/../../deps/protoc-gen-validate/', ], } )) as unknown) as adsTypes.ProtoGrpcType & lrsTypes.ProtoGrpcType; } const clientVersion = require('../../package.json').version; export interface ResourceWatcherInterface { onGenericResourceChanged(resource: object): void; onError(status: StatusObject): void; onResourceDoesNotExist(): void; } export interface BasicWatcher { onResourceChanged(resource: UpdateType): void; onError(status: StatusObject): void; onResourceDoesNotExist(): void; } export class Watcher implements ResourceWatcherInterface { constructor(private internalWatcher: BasicWatcher) {} onGenericResourceChanged(resource: object): void { this.internalWatcher.onResourceChanged(resource as unknown as UpdateType); } onError(status: StatusObject) { this.internalWatcher.onError(status); } onResourceDoesNotExist() { this.internalWatcher.onResourceDoesNotExist(); } } const RESOURCE_TIMEOUT_MS = 15_000; class ResourceTimer { private timer: NodeJS.Timeout | null = null; private resourceSeen = false; constructor(private callState: AdsCallState, private type: XdsResourceType, private name: XdsResourceName) {} maybeCancelTimer() { if (this.timer) { clearTimeout(this.timer); this.timer = null; } } markSeen() { this.resourceSeen = true; this.maybeCancelTimer(); } markAdsStreamStarted() { this.maybeStartTimer(); } private maybeStartTimer() { if (this.resourceSeen) { return; } if (this.timer) { return; } const authorityState = this.callState.client.xdsClient.authorityStateMap.get(this.name.authority); if (!authorityState) { return; } const resourceState = authorityState.resourceMap.get(this.type)?.get(this.name.key); if (resourceState?.cachedResource) { return; } this.timer = setTimeout(() => { this.onTimer(); }, RESOURCE_TIMEOUT_MS); } private onTimer() { const authorityState = this.callState.client.xdsClient.authorityStateMap.get(this.name.authority); const resourceState = authorityState?.resourceMap.get(this.type)?.get(this.name.key); if (!resourceState) { return; } resourceState.meta.clientStatus = 'DOES_NOT_EXIST'; for (const watcher of resourceState.watchers) { watcher.onResourceDoesNotExist(); } } } interface AdsParseResult { type?: XdsResourceType; typeUrl?: string; version?: string; nonce?: string; errors: string[]; /** * authority -> set of keys */ resourcesSeen: Map>; haveValidResources: boolean; } /** * Responsible for parsing a single ADS response, one resource at a time */ class AdsResponseParser { private result: AdsParseResult = { errors: [], resourcesSeen: new Map(), haveValidResources: false }; private updateTime = new Date(); constructor(private adsCallState: AdsCallState) {} processAdsResponseFields(message: DiscoveryResponse__Output) { const type = this.adsCallState.client.xdsClient.getResourceType(message.type_url); if (!type) { throw new Error(`Unexpected type URL ${message.type_url}`); } this.result.type = type; this.result.typeUrl = message.type_url; this.result.nonce = message.nonce; this.result.version = message.version_info; } parseResource(index: number, resource: Any__Output) { const errorPrefix = `resource index ${index}:`; if (resource.type_url !== this.result.typeUrl) { this.result.errors.push(`${errorPrefix} incorrect resource type "${resource.type_url}" (should be "${this.result.typeUrl}")`); return; } if (!this.result.type) { this.adsCallState.client.trace('Received resource for uninitialized type ' + resource.type_url); return; } const decodeContext: XdsDecodeContext = { server: this.adsCallState.client.xdsServerConfig, bootstrap: this.adsCallState.client.xdsClient.getBootstrapInfo() }; let decodeResult: XdsDecodeResult; try { decodeResult = this.result.type.decode(decodeContext, resource); } catch (e) { this.result.errors.push(`${errorPrefix} ${(e as Error).message}`); return; } let parsedName: XdsResourceName; try { parsedName = parseXdsResourceName(decodeResult.name, this.result.type!.getTypeUrl()); } catch (e) { this.result.errors.push(`${errorPrefix} ${(e as Error).message}`); return; } this.adsCallState.typeStates.get(this.result.type!)?.subscribedResources.get(parsedName.authority)?.get(parsedName.key)?.markSeen(); if (this.result.type.allResourcesRequiredInSotW()) { if (!this.result.resourcesSeen.has(parsedName.authority)) { this.result.resourcesSeen.set(parsedName.authority, new Set()); } this.result.resourcesSeen.get(parsedName.authority)!.add(parsedName.key); } const resourceState = this.adsCallState.client.xdsClient.authorityStateMap.get(parsedName.authority)?.resourceMap.get(this.result.type)?.get(parsedName.key); if (!resourceState) { // No subscription for this resource this.adsCallState.client.trace('Received resource of type ' + this.result.type.getTypeUrl() + ' named ' + decodeResult.name + ' with no subscriptions'); return; } if (resourceState.deletionIgnored) { experimental.log(logVerbosity.INFO, `Received resource with previously ignored deletion: ${decodeResult.name}`); resourceState.deletionIgnored = false; } if (decodeResult.error) { this.result.errors.push(`${errorPrefix} ${decodeResult.error}`); process.nextTick(() => { for (const watcher of resourceState.watchers) { watcher.onError({code: status.UNAVAILABLE, details: decodeResult.error!, metadata: new Metadata()}); } }); resourceState.meta.clientStatus = 'NACKED'; resourceState.meta.failedVersion = this.result.version!; resourceState.meta.failedDetails = decodeResult.error; resourceState.meta.failedUpdateTime = this.updateTime; return; } if (!decodeResult.value) { this.adsCallState.client.trace('Failed to parse resource of type ' + this.result.type.getTypeUrl()); return; } this.adsCallState.client.trace('Parsed resource of type ' + this.result.type.getTypeUrl() + ': ' + JSON.stringify(decodeResult.value, (key, value) => (value && value.type === 'Buffer' && Array.isArray(value.data)) ? (value.data as Number[]).map(n => n.toString(16)).join('') : value, 2)); this.result.haveValidResources = true; if (this.result.type.resourcesEqual(resourceState.cachedResource, decodeResult.value)) { return; } resourceState.cachedResource = decodeResult.value; resourceState.meta = { clientStatus: 'ACKED', rawResource: resource, updateTime: this.updateTime, version: this.result.version! }; process.nextTick(() => { trace('Notifying ' + resourceState.watchers.size + ' watchers of ' + this.result.type?.getTypeUrl() + ' update'); for (const watcher of resourceState.watchers) { watcher.onGenericResourceChanged(decodeResult.value!); } }); } getResult() { return this.result; } } type AdsCall = ClientDuplexStream; interface ResourceTypeState { nonce?: string; error?: string; /** * authority -> key -> timer */ subscribedResources: Map>; } class AdsCallState { public typeStates: Map = new Map(); private receivedAnyResponse = false; private sentInitialMessage = false; private streamStarted = false; constructor(public client: XdsSingleServerClient, private call: AdsCall, private node: Node) { // Populate subscription map with existing subscriptions for (const [authority, authorityState] of client.xdsClient.authorityStateMap) { if (authorityState.client !== client) { continue; } for (const [type, typeMap] of authorityState.resourceMap) { for (const key of typeMap.keys()) { this.subscribe(type, {authority, key}, true); } } } for (const type of this.typeStates.keys()) { this.updateNames(type); } call.on('data', (message: DiscoveryResponse__Output) => { this.handleResponseMessage(message); }) call.on('status', (status: StatusObject) => { this.handleStreamStatus(status); }); call.on('error', () => {}); } private trace(text: string) { this.client.trace(text); } private handleResponseMessage(message: DiscoveryResponse__Output) { const parser = new AdsResponseParser(this); let handledAdsResponseFields: boolean; try { parser.processAdsResponseFields(message); handledAdsResponseFields = true; } catch (e) { this.trace('ADS response field parsing failed for type ' + message.type_url); handledAdsResponseFields = false; } if (handledAdsResponseFields) { for (const [index, resource] of message.resources.entries()) { parser.parseResource(index, resource); } const result = parser.getResult(); const typeState = this.typeStates.get(result.type!); if (!typeState) { this.trace('Type state not found for type ' + result.type!.getTypeUrl()); return; } typeState.nonce = result.nonce; if (result.errors.length > 0) { typeState.error = `xDS response validation errors: [${result.errors.join('; ')}]`; } else { delete typeState.error; } // Delete resources not seen in update if needed if (result.type!.allResourcesRequiredInSotW()) { for (const [authority, authorityState] of this.client.xdsClient.authorityStateMap) { if (authorityState.client !== this.client) { continue; } const typeMap = authorityState.resourceMap.get(result.type!); if (!typeMap) { continue; } for (const [key, resourceState] of typeMap) { if (!result.resourcesSeen.get(authority)?.has(key)) { /* Do nothing for resources that have no cached value. Those are * handled by the resource timer. */ if (!resourceState.cachedResource) { continue; } if (this.client.ignoreResourceDeletion) { experimental.log(logVerbosity.ERROR, 'Ignoring nonexistent resource ' + xdsResourceNameToString({authority, key}, result.type!.getTypeUrl())); resourceState.deletionIgnored = true; } else { resourceState.meta.clientStatus = 'DOES_NOT_EXIST'; process.nextTick(() => { for (const watcher of resourceState.watchers) { watcher.onResourceDoesNotExist(); } }); } } } } } if (result.haveValidResources || result.errors.length === 0) { this.client.resourceTypeVersionMap.set(result.type!, result.version!); } this.updateNames(result.type!); } } private* allWatchers() { for (const [type, typeState] of this.typeStates) { for (const [authority, authorityMap] of typeState.subscribedResources) { for (const key of authorityMap.keys()) { yield* this.client.xdsClient.authorityStateMap.get(authority)?.resourceMap.get(type)?.get(key)?.watchers ?? []; } } } } private handleStreamStatus(streamStatus: StatusObject) { this.trace( 'ADS stream ended. code=' + streamStatus.code + ' details= ' + streamStatus.details ); if (streamStatus.code !== status.OK && !this.receivedAnyResponse) { for (const watcher of this.allWatchers()) { watcher.onError(streamStatus); } } this.client.handleAdsStreamEnd(); } hasSubscribedResources(): boolean { for (const typeState of this.typeStates.values()) { for (const authorityMap of typeState.subscribedResources.values()) { if (authorityMap.size > 0) { return true; } } } return false; } subscribe(type: XdsResourceType, name: XdsResourceName, delaySend: boolean = false) { let typeState = this.typeStates.get(type); if (!typeState) { typeState = { nonce: '', subscribedResources: new Map() }; this.typeStates.set(type, typeState); } let authorityMap = typeState.subscribedResources.get(name.authority); if (!authorityMap) { authorityMap = new Map(); typeState.subscribedResources.set(name.authority, authorityMap); } if (!authorityMap.has(name.key)) { const timer = new ResourceTimer(this, type, name); if (this.streamStarted) { timer.markAdsStreamStarted(); } authorityMap.set(name.key, timer); if (!delaySend) { this.updateNames(type); } } } unsubscribe(type: XdsResourceType, name: XdsResourceName) { const typeState = this.typeStates.get(type); if (!typeState) { return; } const authorityMap = typeState.subscribedResources.get(name.authority); if (!authorityMap) { return; } authorityMap.delete(name.key); if (authorityMap.size === 0) { typeState.subscribedResources.delete(name.authority); } this.updateNames(type); } resourceNamesForRequest(type: XdsResourceType): string[] { const typeState = this.typeStates.get(type); if (!typeState) { return []; } const result: string[] = []; for (const [authority, authorityMap] of typeState.subscribedResources) { for (const [key, timer] of authorityMap) { result.push(xdsResourceNameToString({authority, key}, type.getTypeUrl())); } } return result; } updateNames(type: XdsResourceType) { const typeState = this.typeStates.get(type); if (!typeState) { return; } const request: DiscoveryRequest = { node: this.sentInitialMessage ? null : this.node, type_url: type.getFullTypeUrl(), response_nonce: typeState.nonce, resource_names: this.resourceNamesForRequest(type), version_info: this.client.resourceTypeVersionMap.get(type), error_detail: typeState.error ? { code: status.UNAVAILABLE, message: typeState.error} : null }; this.trace('Sending discovery request: ' + JSON.stringify(request, undefined, 2)); this.call.write(request); this.sentInitialMessage = true; } end() { this.call.end(); } /** * Should be called when the channel state is READY after starting the * stream. */ markStreamStarted() { this.streamStarted = true; for (const [type, typeState] of this.typeStates) { for (const [authority, authorityMap] of typeState.subscribedResources) { for (const resourceTimer of authorityMap.values()) { resourceTimer.markAdsStreamStarted(); } } } } } type LrsCall = ClientDuplexStream; function localityEqual( loc1: Locality__Output, loc2: Locality__Output ): boolean { return ( loc1.region === loc2.region && loc1.zone === loc2.zone && loc1.sub_zone === loc2.sub_zone ); } export interface XdsClusterDropStats { addUncategorizedCallDropped(): void; addCallDropped(category: string): void; } export interface XdsClusterLocalityStats { addCallStarted(): void; addCallFinished(fail: boolean): void; } interface DroppedRequests { category: string; dropped_count: number; } interface UpstreamLocalityStats { locality: Locality; total_issued_requests: number; total_successful_requests: number; total_error_requests: number; total_requests_in_progress: number; } /** * An interface representing the ClusterStats message type, restricted to the * fields used in this module to ensure compatibility with both v2 and v3 APIs. */ interface ClusterStats { cluster_name: string; cluster_service_name: string; dropped_requests: DroppedRequests[]; total_dropped_requests: number; upstream_locality_stats: UpstreamLocalityStats[]; load_report_interval: Duration } interface ClusterLocalityStats { locality: Locality__Output; callsStarted: number; callsSucceeded: number; callsFailed: number; callsInProgress: number; refcount: number; } interface ClusterLoadReport { callsDropped: Map; uncategorizedCallsDropped: number; localityStats: Set; intervalStart: [number, number]; } interface StatsMapEntry { clusterName: string; edsServiceName: string; refCount: number; stats: ClusterLoadReport; } class ClusterLoadReportMap { private statsMap: Set = new Set(); get( clusterName: string, edsServiceName: string ): ClusterLoadReport | undefined { for (const statsObj of this.statsMap) { if ( statsObj.clusterName === clusterName && statsObj.edsServiceName === edsServiceName ) { return statsObj.stats; } } return undefined; } /** * Get the indicated map entry if it exists, or create a new one if it does * not. Increments the refcount of that entry, so a call to this method * should correspond to a later call to unref * @param clusterName * @param edsServiceName * @returns */ getOrCreate(clusterName: string, edsServiceName: string): ClusterLoadReport { for (const statsObj of this.statsMap) { if ( statsObj.clusterName === clusterName && statsObj.edsServiceName === edsServiceName ) { statsObj.refCount += 1; return statsObj.stats; } } const newStats: ClusterLoadReport = { callsDropped: new Map(), uncategorizedCallsDropped: 0, localityStats: new Set(), intervalStart: process.hrtime(), }; this.statsMap.add({ clusterName, edsServiceName, refCount: 1, stats: newStats, }); return newStats; } *entries(): IterableIterator< [{ clusterName: string; edsServiceName: string }, ClusterLoadReport] > { for (const statsEntry of this.statsMap) { yield [ { clusterName: statsEntry.clusterName, edsServiceName: statsEntry.edsServiceName, }, statsEntry.stats, ]; } } unref(clusterName: string, edsServiceName: string) { for (const statsObj of this.statsMap) { if ( statsObj.clusterName === clusterName && statsObj.edsServiceName === edsServiceName ) { statsObj.refCount -=1; if (statsObj.refCount === 0) { this.statsMap.delete(statsObj); } return; } } } get size() { return this.statsMap.size; } } class LrsCallState { private statsTimer: NodeJS.Timeout | null = null; private sentInitialMessage = false; constructor(private client: XdsSingleServerClient, private call: LrsCall, private node: Node) { call.on('data', (message: LoadStatsResponse__Output) => { this.handleResponseMessage(message); }) call.on('status', (status: StatusObject) => { this.handleStreamStatus(status); }); call.on('error', () => {}); this.sendStats(); } destroy() { if (this.statsTimer) { clearInterval(this.statsTimer); this.statsTimer = null; } return null; } private handleStreamStatus(status: StatusObject) { this.client.trace( 'LRS stream ended. code=' + status.code + ' details= ' + status.details ); this.client.handleLrsStreamEnd(); } private handleResponseMessage(message: LoadStatsResponse__Output) { this.client.trace('Received LRS response'); this.client.onLrsStreamReceivedMessage(); if ( !this.statsTimer || message.load_reporting_interval?.seconds !== this.client.latestLrsSettings?.load_reporting_interval?.seconds || message.load_reporting_interval?.nanos !== this.client.latestLrsSettings?.load_reporting_interval?.nanos ) { /* Only reset the timer if the interval has changed or was not set * before. */ if (this.statsTimer) { clearInterval(this.statsTimer); } /* Convert a google.protobuf.Duration to a number of milliseconds for * use with setInterval. */ const loadReportingIntervalMs = Number.parseInt(message.load_reporting_interval!.seconds) * 1000 + message.load_reporting_interval!.nanos / 1_000_000; this.client.trace('Received LRS response with load reporting interval ' + loadReportingIntervalMs + ' ms'); this.statsTimer = setInterval(() => { this.sendStats(); }, loadReportingIntervalMs); } this.client.latestLrsSettings = message; } private sendLrsMessage(clusterStats: ClusterStats[]) { const request: LoadStatsRequest = { node: this.sentInitialMessage ? null : this.node, cluster_stats: clusterStats }; this.client.trace('Sending LRS message ' + JSON.stringify(request, undefined, 2)); this.call.write(request); this.sentInitialMessage = true; } private get latestLrsSettings() { return this.client.latestLrsSettings; } private sendStats() { if (!this.latestLrsSettings) { this.sendLrsMessage([]); return; } const clusterStats: ClusterStats[] = []; for (const [ { clusterName, edsServiceName }, stats, ] of this.client.clusterStatsMap.entries()) { if ( this.latestLrsSettings.send_all_clusters || this.latestLrsSettings.clusters.indexOf(clusterName) > 0 ) { const upstreamLocalityStats: UpstreamLocalityStats[] = []; for (const localityStats of stats.localityStats) { // Skip localities with 0 requests if ( localityStats.callsStarted > 0 || localityStats.callsSucceeded > 0 || localityStats.callsFailed > 0 ) { upstreamLocalityStats.push({ locality: localityStats.locality, total_issued_requests: localityStats.callsStarted, total_successful_requests: localityStats.callsSucceeded, total_error_requests: localityStats.callsFailed, total_requests_in_progress: localityStats.callsInProgress, }); localityStats.callsStarted = 0; localityStats.callsSucceeded = 0; localityStats.callsFailed = 0; } } const droppedRequests: DroppedRequests[] = []; let totalDroppedRequests = 0; for (const [category, count] of stats.callsDropped.entries()) { if (count > 0) { droppedRequests.push({ category, dropped_count: count, }); totalDroppedRequests += count; } } totalDroppedRequests += stats.uncategorizedCallsDropped; // Clear out dropped call stats after sending them stats.callsDropped.clear(); stats.uncategorizedCallsDropped = 0; const interval = process.hrtime(stats.intervalStart); stats.intervalStart = process.hrtime(); // Skip clusters with 0 requests if (upstreamLocalityStats.length > 0 || totalDroppedRequests > 0) { clusterStats.push({ cluster_name: clusterName, cluster_service_name: edsServiceName, dropped_requests: droppedRequests, total_dropped_requests: totalDroppedRequests, upstream_locality_stats: upstreamLocalityStats, load_report_interval: { seconds: interval[0], nanos: interval[1], }, }); } } } this.sendLrsMessage(clusterStats); } } class XdsSingleServerClient { public ignoreResourceDeletion: boolean; private adsBackoff: BackoffTimeout; private lrsBackoff: BackoffTimeout; private adsClient: AggregatedDiscoveryServiceClient; private adsCallState: AdsCallState | null = null; private lrsClient: LoadReportingServiceClient; private lrsCallState: LrsCallState | null = null; public clusterStatsMap = new ClusterLoadReportMap(); public latestLrsSettings: LoadStatsResponse__Output | null = null; /** * The number of authorities that are using this client. Streams should only * be started if refcount > 0 */ private refcount = 0; /** * Map of type to latest accepted version string for that type */ public resourceTypeVersionMap: Map = new Map(); constructor(public xdsClient: XdsClient, bootstrapNode: Node, public xdsServerConfig: XdsServerConfig) { this.adsBackoff = new BackoffTimeout(() => { this.maybeStartAdsStream(); }); this.adsBackoff.unref(); this.lrsBackoff = new BackoffTimeout(() => { this.maybeStartLrsStream(); }); this.lrsBackoff.unref(); this.ignoreResourceDeletion = xdsServerConfig.server_features.includes('ignore_resource_deletion'); const channelArgs = { // 5 minutes 'grpc.keepalive_time_ms': 5 * 60 * 1000 } const credentialsConfigs = xdsServerConfig.channel_creds; let channelCreds: ChannelCredentials | null = null; for (const config of credentialsConfigs) { if (config.type === 'google_default') { channelCreds = createGoogleDefaultCredentials(); break; } else if (config.type === 'insecure') { channelCreds = ChannelCredentials.createInsecure(); break; } } const serverUri = this.xdsServerConfig.server_uri this.trace('Starting xDS client connected to server URI ' + this.xdsServerConfig.server_uri); /* Bootstrap validation rules guarantee that a matching channel credentials * config exists in the list. */ const channel = new Channel(serverUri, channelCreds!, channelArgs); const protoDefinitions = loadAdsProtos(); this.adsClient = new protoDefinitions.envoy.service.discovery.v3.AggregatedDiscoveryService( serverUri, channelCreds!, {channelOverride: channel} ); channel.watchConnectivityState(channel.getConnectivityState(false), Infinity, () => { this.handleAdsConnectivityStateUpdate(); }); this.lrsClient = new protoDefinitions.envoy.service.load_stats.v3.LoadReportingService( serverUri, channelCreds!, {channelOverride: channel} ); } private handleAdsConnectivityStateUpdate() { const state = this.adsClient.getChannel().getConnectivityState(false); if (state === connectivityState.READY) { this.adsCallState?.markStreamStarted(); } if (state === connectivityState.TRANSIENT_FAILURE) { for (const authorityState of this.xdsClient.authorityStateMap.values()) { if (authorityState.client !== this) { continue; } for (const typeMap of authorityState.resourceMap.values()) { for (const resourceState of typeMap.values()) { for (const watcher of resourceState.watchers) { watcher.onError({ code: status.UNAVAILABLE, details: 'No connection established to xDS server', metadata: new Metadata() }); } } } } } this.adsClient.getChannel().watchConnectivityState(state, Infinity, () => { this.handleAdsConnectivityStateUpdate(); }); } onAdsStreamReceivedMessage() { this.adsBackoff.stop(); this.adsBackoff.reset(); } handleAdsStreamEnd() { this.adsCallState = null; /* The backoff timer would start the stream when it finishes. If it is not * running, restart the stream immediately. */ if (!this.adsBackoff.isRunning()) { this.maybeStartAdsStream(); } } private maybeStartAdsStream() { if (this.adsCallState || this.refcount < 1) { return; } this.trace('Starting ADS stream'); const metadata = new Metadata({waitForReady: true}); const call = this.adsClient.StreamAggregatedResources(metadata); this.adsCallState = new AdsCallState(this, call, this.xdsClient.adsNode!); this.adsBackoff.runOnce(); } onLrsStreamReceivedMessage() { this.lrsBackoff.stop(); this.lrsBackoff.reset(); } handleLrsStreamEnd() { this.lrsCallState = this.lrsCallState ? this.lrsCallState.destroy() : null; /* The backoff timer would start the stream when it finishes. If it is not * running, restart the stream immediately. */ if (!this.lrsBackoff.isRunning()) { this.maybeStartLrsStream(); } } private maybeStartLrsStream() { if (this.lrsCallState || this.refcount < 1 || this.clusterStatsMap.size < 1) { return; } this.trace('Starting LRS stream'); const metadata = new Metadata({waitForReady: true}); const call = this.lrsClient.StreamLoadStats(metadata); this.lrsCallState = new LrsCallState(this, call, this.xdsClient.lrsNode!); this.lrsBackoff.runOnce(); } trace(text: string) { trace(this.xdsServerConfig.server_uri + ' ' + text); } subscribe(type: XdsResourceType, name: XdsResourceName) { this.trace('subscribe(type=' + type.getTypeUrl() + ', name=' + xdsResourceNameToString(name, type.getTypeUrl()) + ')'); this.trace(JSON.stringify(name)); this.maybeStartAdsStream(); this.adsCallState?.subscribe(type, name); } unsubscribe(type: XdsResourceType, name: XdsResourceName) { this.trace('unsubscribe(type=' + type.getTypeUrl() + ', name=' + xdsResourceNameToString(name, type.getTypeUrl()) + ')'); this.adsCallState?.unsubscribe(type, name); if (this.adsCallState && !this.adsCallState.hasSubscribedResources()) { this.adsCallState.end(); this.adsCallState = null; } } ref() { this.refcount += 1; } unref() { this.refcount -= 1; } addClusterDropStats( clusterName: string, edsServiceName: string ): XdsClusterDropStats { this.trace('addClusterDropStats(clusterName=' + clusterName + ', edsServiceName=' + edsServiceName + ')'); const clusterStats = this.clusterStatsMap.getOrCreate( clusterName, edsServiceName ); this.maybeStartLrsStream(); return { addUncategorizedCallDropped: () => { clusterStats.uncategorizedCallsDropped += 1; }, addCallDropped: (category) => { const prevCount = clusterStats.callsDropped.get(category) ?? 0; clusterStats.callsDropped.set(category, prevCount + 1); }, }; } removeClusterDropStats(clusterName: string, edsServiceName: string) { this.trace('removeClusterDropStats(clusterName=' + clusterName + ', edsServiceName=' + edsServiceName + ')'); this.clusterStatsMap.unref(clusterName, edsServiceName); } addClusterLocalityStats( clusterName: string, edsServiceName: string, locality: Locality__Output ): XdsClusterLocalityStats { this.trace('addClusterLocalityStats(clusterName=' + clusterName + ', edsServiceName=' + edsServiceName + ', locality=' + JSON.stringify(locality) + ')'); const clusterStats = this.clusterStatsMap.getOrCreate( clusterName, edsServiceName ); this.maybeStartLrsStream(); let localityStats: ClusterLocalityStats | null = null; for (const statsObj of clusterStats.localityStats) { if (localityEqual(locality, statsObj.locality)) { localityStats = statsObj; break; } } if (localityStats === null) { localityStats = { locality: locality, callsInProgress: 0, callsStarted: 0, callsSucceeded: 0, callsFailed: 0, refcount: 0, }; clusterStats.localityStats.add(localityStats); } /* Help the compiler understand that this object is always non-null in the * closure */ const finalLocalityStats: ClusterLocalityStats = localityStats; return { addCallStarted: () => { finalLocalityStats.callsStarted += 1; finalLocalityStats.callsInProgress += 1; }, addCallFinished: (fail) => { if (fail) { finalLocalityStats.callsFailed += 1; } else { finalLocalityStats.callsSucceeded += 1; } finalLocalityStats.callsInProgress -= 1; }, }; } removeClusterLocalityStats( clusterName: string, edsServiceName: string, locality: Locality__Output ) { this.trace('removeClusterLocalityStats(clusterName=' + clusterName + ', edsServiceName=' + edsServiceName + ', locality=' + JSON.stringify(locality) + ')'); const clusterStats = this.clusterStatsMap.get(clusterName, edsServiceName); if (!clusterStats) { return; } for (const statsObj of clusterStats.localityStats) { if (localityEqual(locality, statsObj.locality)) { statsObj.refcount -= 1; if (statsObj.refcount === 0) { clusterStats.localityStats.delete(statsObj); } break; } } this.clusterStatsMap.unref(clusterName, edsServiceName); } } interface ClientMapEntry { serverConfig: XdsServerConfig; client: XdsSingleServerClient; } type ClientResourceStatus = 'REQUESTED' | 'DOES_NOT_EXIST' | 'ACKED' | 'NACKED'; interface ResourceMetadata { clientStatus: ClientResourceStatus; rawResource?: Any__Output; updateTime?: Date; version?: string; failedVersion?: string; failedDetails?: string; failedUpdateTime?: Date; } interface ResourceState { watchers: Set; cachedResource: object | null; meta: ResourceMetadata; deletionIgnored: boolean; } interface AuthorityState { client: XdsSingleServerClient; /** * type -> key -> state */ resourceMap: Map>; } const userAgentName = 'gRPC Node Pure JS'; function createCertificateProvider(config: CertificateProviderConfig) { switch (config.pluginName) { case 'file_watcher': return new FileWatcherCertificateProvider(config.config); default: throw new Error(`Unexpected certificate provider plugin name ${config.pluginName}`); } } export class XdsClient { /** * authority -> authority state */ public authorityStateMap: Map = new Map(); private clients: ClientMapEntry[] = []; private typeRegistry: Map = new Map(); private bootstrapInfo: BootstrapInfo | null = null; private certificateProviderRegistry: Map = new Map(); private certificateProviderRegistryPopulated = false; constructor(bootstrapInfoOverride?: BootstrapInfo) { if (bootstrapInfoOverride) { this.bootstrapInfo = bootstrapInfoOverride; } registerXdsClientWithCsds(this); } getBootstrapInfo() { if (!this.bootstrapInfo) { this.bootstrapInfo = loadBootstrapInfo(); } return this.bootstrapInfo; } get adsNode(): Node | undefined { if (!this.bootstrapInfo) { return undefined; } return { ...this.bootstrapInfo.node, user_agent_name: userAgentName, user_agent_version: clientVersion, client_features: ['envoy.lb.does_not_support_overprovisioning'], } } get lrsNode(): Node | undefined { if (!this.bootstrapInfo) { return undefined; } return { ...this.bootstrapInfo.node, user_agent_name: userAgentName, user_agent_version: clientVersion, client_features: ['envoy.lrs.supports_send_all_clusters'], }; } private getOrCreateClient(authority: string): XdsSingleServerClient { const bootstrapInfo = this.getBootstrapInfo(); let serverConfig: XdsServerConfig; if (authority === 'old:') { serverConfig = bootstrapInfo.xdsServers[0]; } else { if (authority in bootstrapInfo.authorities) { serverConfig = bootstrapInfo.authorities[authority].xdsServers?.[0] ?? bootstrapInfo.xdsServers[0]; } else { throw new Error(`Authority ${authority} not found in bootstrap authorities list`); } } for (const entry of this.clients) { if (serverConfigEqual(serverConfig, entry.serverConfig)) { return entry.client; } } const client = new XdsSingleServerClient(this, bootstrapInfo.node, serverConfig); this.clients.push({client, serverConfig}); return client; } private getClient(server: XdsServerConfig) { for (const entry of this.clients) { if (serverConfigEqual(server, entry.serverConfig)) { return entry.client; } } return undefined; } getResourceType(typeUrl: string) { return this.typeRegistry.get(typeUrl); } watchResource(type: XdsResourceType, name: string, watcher: ResourceWatcherInterface) { trace('watchResource(type=' + type.getTypeUrl() + ', name=' + name + ')'); if (this.typeRegistry.has(type.getTypeUrl())) { if (this.typeRegistry.get(type.getTypeUrl()) !== type) { throw new Error(`Resource type does not match previously used type with the same type URL: ${type.getTypeUrl()}`); } } else { this.typeRegistry.set(type.getTypeUrl(), type); this.typeRegistry.set(type.getFullTypeUrl(), type); } const resourceName = parseXdsResourceName(name, type.getTypeUrl()); let authorityState = this.authorityStateMap.get(resourceName.authority); if (!authorityState) { authorityState = { client: this.getOrCreateClient(resourceName.authority), resourceMap: new Map() }; authorityState.client.ref(); this.authorityStateMap.set(resourceName.authority, authorityState); } let keyMap = authorityState.resourceMap.get(type); if (!keyMap) { keyMap = new Map(); authorityState.resourceMap.set(type, keyMap); } let entry = keyMap.get(resourceName.key); let isNewSubscription = false; if (!entry) { isNewSubscription = true; entry = { watchers: new Set(), cachedResource: null, deletionIgnored: false, meta: { clientStatus: 'REQUESTED' } }; keyMap.set(resourceName.key, entry); } entry.watchers.add(watcher); if (entry.cachedResource) { process.nextTick(() => { if (entry?.cachedResource) { watcher.onGenericResourceChanged(entry.cachedResource); } }); } if (isNewSubscription) { authorityState.client.subscribe(type, resourceName); } } cancelResourceWatch(type: XdsResourceType, name: string, watcher: ResourceWatcherInterface) { trace('cancelResourceWatch(type=' + type.getTypeUrl() + ', name=' + name + ')'); const resourceName = parseXdsResourceName(name, type.getTypeUrl()); const authorityState = this.authorityStateMap.get(resourceName.authority); if (!authorityState) { return; } const entry = authorityState.resourceMap.get(type)?.get(resourceName.key); if (entry) { entry.watchers.delete(watcher); if (entry.watchers.size === 0) { authorityState.resourceMap.get(type)!.delete(resourceName.key); authorityState.client.unsubscribe(type, resourceName); if (authorityState.resourceMap.get(type)!.size === 0) { authorityState.resourceMap.delete(type); if (authorityState.resourceMap.size === 0) { authorityState.client.unref(); this.authorityStateMap.delete(resourceName.authority); } } } } } addClusterDropStats(lrsServer: XdsServerConfig, clusterName: string, edsServiceName: string): XdsClusterDropStats { const client = this.getClient(lrsServer); if (!client) { return { addUncategorizedCallDropped: () => {}, addCallDropped: (category) => {}, }; } return client.addClusterDropStats(clusterName, edsServiceName); } removeClusterDropStats(lrsServer: XdsServerConfig, clusterName: string, edsServiceName: string) { this.getClient(lrsServer)?.removeClusterDropStats(clusterName, edsServiceName); } addClusterLocalityStats(lrsServer: XdsServerConfig, clusterName: string, edsServiceName: string, locality: Locality__Output): XdsClusterLocalityStats { const client = this.getClient(lrsServer); if (!client) { return { addCallStarted: () => {}, addCallFinished: (fail) => {}, }; } return client.addClusterLocalityStats(clusterName, edsServiceName, locality); } removeClusterLocalityStats(lrsServer: XdsServerConfig, clusterName: string, edsServiceName: string, locality: Locality__Output) { this.getClient(lrsServer)?.removeClusterLocalityStats(clusterName, edsServiceName, locality); } getCertificateProvider(instanceName: string): CertificateProvider | undefined { if (!this.certificateProviderRegistryPopulated) { for (const [name, config] of Object.entries(this.getBootstrapInfo().certificateProviders)) { this.certificateProviderRegistry.set(name, createCertificateProvider(config)); } this.certificateProviderRegistryPopulated = true; } return this.certificateProviderRegistry.get(instanceName); } /** * Returns a valid JSON-stringifiable object, to avoid causing a circular * reference error when an object containing this object is stringified. * @returns */ toJSON(): object { return {}; } } let singletonXdsClient: XdsClient | null = null; export function getSingletonXdsClient(): XdsClient { if (singletonXdsClient === null) { singletonXdsClient = new XdsClient(); } return singletonXdsClient; }