/* * Copyright (c) 2022 EdgerOS Team. * All rights reserved. * * Detailed license information can be found in the LICENSE file. * * Author : 薛强 * Date : 2024-11-19 17:55:19 * LastEditors : 薛强 * LastEditTime : 2024-11-26 15:09:56 */ import path from 'path' import { grpc, loadSync, type ChannelOptions } from '../../../deps/grpc' import { circuitBreaker, ConsecutiveBreaker, handleWhen, type IPolicy, isBrokenCircuitError, retry, } from '../../../deps/cockatiel' import { castGrpcError, ClientClosedError, ClientRuntimeError, isRecoverableError, EtcdInvalidAuthTokenError, } from '../../base/errors' import type { IOptions, CallOptionsFactory } from '../../base/options' import type { CallContext, Services, CallMethod, CallService } from '../../api/rpc' import { resolveCallOptions } from '../../base/util' import { IS_JSRE } from '../../../deps/common' import type IDriver from '../i-driver' // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment, @typescript-eslint/no-unsafe-member-access -- ignore const DirName = IS_JSRE ? (module as any).directory : __dirname const metadata = new grpc.Metadata() // eslint-disable-next-line @typescript-eslint/no-unsafe-argument -- ignore const services = loadSync(path.join(DirName, '../../../proto/rpc.proto')) // ({ // keepCase: true, // longs: String, // enums: String, // defaults: true, // oneofs: true, // }); // const services = grpc.loadPackageDefinition(packageDefinition); const etcdserverpb = services.etcdserverpb as Record /** * Strips the https?:// from the start of the connection string. * @param {string} name [description] */ function removeProtocolPrefix(name: string) { return name.replace(/^https?:\/\//, '') } /** * Executes a grpc service calls, casting the error (if any) and wrapping * into a Promise. */ async function runServiceCall( client: grpc.Client, options: grpc.CallOptions | undefined, method: string, payload: unknown, ): Promise { return await new Promise((resolve, reject) => { // eslint-disable-next-line @typescript-eslint/no-unsafe-call, @typescript-eslint/no-unsafe-member-access -- any (client as any)[method](payload, metadata, options || {}, (err: Error | null, res: any) => { if (err) { reject(castGrpcError(err)) } else { resolve(res) } }) }) } const defaultCircuitBreaker = () => circuitBreaker(handleWhen(isRecoverableError), { halfOpenAfter: 5_000, breaker: new ConsecutiveBreaker(3), }) /** * A Host is one instance of the etcd server, which can contain multiple * services. It holds GRPC clients to communicate with the host, and will * be removed from the connection pool upon server failures. */ export class Host { private readonly host: string private closed = false // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment -- ignore private cachedServices: { [name in keyof typeof Services]?: grpc.Client } = Object.create(null) constructor( host: string, private readonly channelCredentials: grpc.ChannelCredentials, private readonly channelOptions?: ChannelOptions, public readonly faultHandling: IPolicy = defaultCircuitBreaker(), ) { this.host = removeProtocolPrefix(host) } /** * Returns the given GRPC service on the current host. */ public getServiceClient(name: keyof typeof Services): grpc.Client { const service = this.cachedServices[name] if (service) { return service } if (this.closed) { throw new ClientClosedError(name) } const newService = new etcdserverpb[name]( this.host, this.channelCredentials, // eslint-disable-next-line @typescript-eslint/no-unsafe-argument -- ignore this.channelOptions, ) this.cachedServices[name] = newService return newService } /** * Closes the all clients for the given host, allowing them to be * reestablished on subsequent calls. */ public resetAllServices() { for (const service of Object.values(this.cachedServices)) { // workaround: https://github.com/grpc/grpc-node/issues/1487 const state = service.getChannel().getConnectivityState(false) if (state === grpc.connectivityState.CONNECTING) { service.waitForReady(Date.now() + 10_00, () => setImmediate(() => { service.close() })) } else { service.close() } } // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment -- ignore this.cachedServices = Object.create(null) } /** * Close frees resources associated with the host, tearing down any * existing client */ public close() { this.resetAllServices() this.closed = true } } /** * Connection wraps GRPC hosts. Note that this wraps the hosts themselves; each * host can contain multiple discreet services. */ export class ETCDDriver implements IDriver { /** * Toggles whether hosts are looped through in a deterministic order. * For use in tests, should not be toggled in production/ */ public static deterministicOrder = false public readonly callOptionsFactory: CallOptionsFactory | undefined private readonly hosts: Host[] private readonly globalPolicy: IPolicy private readonly options: IOptions constructor(opt: IOptions) { this.callOptionsFactory = opt.defaultCallOptions this.globalPolicy = opt.faultHandling?.global ?? retry(handleWhen(isRecoverableError), { maxAttempts: 3 }) const { hosts = '127.0.0.1:2379' } = opt const credentials = grpc.credentials.createInsecure() if (typeof hosts === 'string') { this.hosts = [ new Host(hosts, credentials, opt.grpcOptions, opt.faultHandling?.host?.(hosts)), ] } else if (hosts.length === 0) { throw new Error('Cannot construct an etcd client with no hosts specified') } else { this.hosts = hosts.map( h => new Host(h, credentials, opt.grpcOptions, opt.faultHandling?.host?.(h)), ) } } async init(options: any): Promise { // console.log('etcd driver inited.', options); } /** * Tears down all ongoing connections and resoruces. */ public close() { this.hosts.forEach((host) => { host.close() }) } /** * @override */ public async exec( serviceName: CallService, method: CallMethod, payload: unknown, options?: grpc.CallOptions, ): Promise { const shuffleGen = this.shuffledHosts() let lastError: Error | undefined = undefined try { // eslint-disable-next-line @typescript-eslint/promise-function-async -- i return await this.globalPolicy.execute(() => this.withConnection( serviceName, async ({ client }) => { const ctx = { service: serviceName, method, params: payload, isStream: false, } const resolvedOpts = resolveCallOptions(options, this.callOptionsFactory, ctx as CallContext) try { return await runServiceCall(client, resolvedOpts, method, payload) } catch (err) { if (err instanceof EtcdInvalidAuthTokenError) { return await this.exec(serviceName, method, payload, options) } lastError = err as Error throw lastError } }, shuffleGen, ), ) } catch (e) { // If we ran into an error that caused the a circuit to open, but we had // an error before that happened, throw the original error rather than // the broken circuit error. // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition -- ignore if (isBrokenCircuitError(e) && lastError && !isBrokenCircuitError(lastError)) { throw lastError as Error } else { throw e as Error } } } /** * @override */ public async withConnection( service: keyof typeof Services, fn: (args: { resource: Host, client: grpc.Client }) => Promise | T, shuffleGenerator = this.shuffledHosts(), ): Promise { let lastError: Error | undefined = undefined // eslint-disable-next-line @typescript-eslint/prefer-for-of -- ignore for (let i = 0; i < this.hosts.length; i++) { const host = shuffleGenerator.next().value as Host let didCallThrough = false try { return await host.faultHandling.execute(async () => { didCallThrough = true return await fn({ resource: host, client: host.getServiceClient(service) }) }) } catch (e: unknown) { if (isRecoverableError(e as Error)) { host.resetAllServices() } // Check if the call was blocked by some circuit breaker/bulkhead policy // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition -- i if (didCallThrough) { throw castGrpcError(e as Error) } lastError = e as Error } } if (!lastError) { throw new ClientRuntimeError('Connection pool has no hosts') } throw castGrpcError(lastError) } /** * @override */ public markFailed(resource: Host, error: Error): void { error = castGrpcError(error) let threw = false if (isRecoverableError(error)) { resource.resetAllServices() } resource.faultHandling .execute(() => { if (!threw) { threw = true throw error } }) .catch(() => undefined) } /** * A generator function that endlessly loops through hosts in a * fisher-yates shuffle for each iteration. */ private *shuffledHosts() { const hosts = this.hosts.slice() while (true) { for (let i = hosts.length - 1; i >= 0; i--) { const idx = ETCDDriver.deterministicOrder ? i : Math.floor((i + 1) * Math.random()); [hosts[idx], hosts[i]] = [hosts[i], hosts[idx]] yield hosts[i] } } } }