/* --------------------------------------------------------- * Copyright (C) Microsoft Corporation. All rights reserved. *-------------------------------------------------------- */ import { EventEmitter } from 'events' import { PutBuilder } from './builder' import { castGrpcError, type EBError, LeaseInvalidError, CancelledError } from '../base/errors' import * as RPC from './rpc' import { debounce, type NSApplicator } from '../base/util' import type { CallOptions } from '../base/options' import type IDriver from '../driver/i-driver' function throwIfError(value: T | Error): T { if (value instanceof Error) { throw value } return value } function leaseExpired(lease: RPC.ILeaseKeepAliveResponse) { return lease.TTL === '0' } /** * Implements RPC.ICallable. Wraps a pool and adds the `leaseID` to outgoing * put requests before executing them. */ class LeaseClientWrapper implements IDriver { public readonly callOptionsFactory constructor( private readonly driver: IDriver, private readonly lease: { leaseID: Promise emitLoss: (err: EBError) => void }, ) { this.callOptionsFactory = this.driver.callOptionsFactory } // eslint-disable-next-line @typescript-eslint/no-empty-function -- adapt async init(): Promise {} // eslint-disable-next-line @typescript-eslint/no-empty-function -- adapt close() {} public async exec(service: keyof typeof RPC.Services, method: string, payload: any): Promise { return await this.driver.exec(service, method, payload).catch((err: unknown) => { if (err instanceof LeaseInvalidError) { this.lease.emitLoss(err) } throw err as Error }) } public markFailed(host: any, error: Error): void { this.driver.markFailed(host, error) } public withConnection(): never { throw new Error('not supported') } } export enum LeaseState { Pending, Alive, Revoked, } /** * Options provided to the lease. Allows specifying call options in addition * to a flag to configure whether to keep the lease alive automatically. */ export interface ILeaseOptions extends CallOptions { /** * Configures whether the lease is kept alive automatically. Defaults to `true`. */ autoKeepAlive?: boolean } /** * Lease is a high-level manager for etcd leases. * Leases are great for things like service discovery: * * ``` * const os = require('os'); * const { Etcd3 } = require('etcd3'); * const client = new Etcd3(); * * const hostPrefix = 'available-hosts/'; * * function grantLease() { * const lease = client.lease(10); // set a TTL of 10 seconds * * lease.on('lost', err => { * console.log('We lost our lease as a result of this error:', err); * console.log('Trying to re-grant it...'); * grantLease(); * }) * * await lease.put(hostPrefix + os.hostname()).value(''); * } * * function getAvailableHosts() { * const keys = await client.getAll().prefix(hostPrefix).keys(); * return keys.map(key => key.slice(hostPrefix.length)); * } * ``` * @noInheritDoc */ export class Lease extends EventEmitter { private readonly leaseID: Promise private innerState = LeaseState.Pending private readonly client = new RPC.LeaseClient(this.driver) private lastKeepAlive: number private readonly defaultOptions: CallOptions public get state() { return this.innerState } constructor( private readonly driver: IDriver, private readonly namespace: NSApplicator, private readonly ttl: number, options: ILeaseOptions = {}, ) { super() const { autoKeepAlive, deadline, ...rest } = options this.defaultOptions = rest if (!ttl || ttl < 1) { throw new RangeError(`The TTL in an etcd lease must be at least 1 second. Got: ${ttl}`) } this.leaseID = this.client .leaseGrant({ TTL: ttl }, options) .then((res) => { if (this.innerState === LeaseState.Revoked) { return res.ID } this.innerState = LeaseState.Alive this.lastKeepAlive = Date.now() if (autoKeepAlive !== false) { this.keepalive() } return res.ID }) .catch((err: unknown) => { this.emitLoss(err as EBError) // return, don't throw, from here so that if no one is listening to // grant() we don't crash the process. return err as Error }) } /** * Grant waits for the lease to be granted. You generally don't need to * call this, as any operations with `.put` will queue automatically. * * Calling this multiple times is safe; it won't try to request multipl leases. * * It rejects if the lease cannot be granted, in additon to the `lost` * event firing. */ public async grant(): Promise { return await this.leaseID.then(throwIfError) } /** * Revoke frees the lease from etcd. Keys that the lease owns will be * evicted. */ public async revoke(options: CallOptions | undefined = this.defaultOptions): Promise { this.close() const id = await this.leaseID if (id instanceof Error) { // if an error, we didn't grant in the first place return } try { await this.client.leaseRevoke({ ID: id }, options) } catch (e) { if (!(e instanceof LeaseInvalidError)) { throw e as Error } } } /** * releasePassively stops making heartbeats for the lease, and allows it * to expire automatically when its TTL rolls around. Use `revoke()` to * actively tell etcd to terminate the lease. */ public release() { this.close() } /** * Put returns a put builder that operates within the current lease. */ public put(key: string | Buffer): PutBuilder { return new PutBuilder( new RPC.KVClient(new LeaseClientWrapper(this.driver, { leaseID: this.leaseID, emitLoss: (err: EBError) => { this.emitLoss(err) } })), this.namespace, key, ).lease(this.grant()) } /** * keepaliveOnce fires an immediate keepalive for the lease. */ public async keepaliveOnce( options: CallOptions | undefined = this.defaultOptions, ): Promise { return await Promise.all([this.client.leaseKeepAlive(options), this.grant()]).then(async ([stream, id]) => await new Promise((resolve, reject) => { stream.on('data', resolve) stream.on('error', (err) => { reject(castGrpcError(err)) }) stream.write({ ID: id }) }).then((res) => { stream.end() if (leaseExpired(res)) { const err = new LeaseInvalidError(res.ID) this.emitLoss(err) throw err } this.lastKeepAlive = Date.now() return res })) } /** * Returns whether etcd has told us that this lease revoked. */ public revoked(): boolean { return this.innerState === LeaseState.Revoked } /** * A `lost` event is fired when etcd indicates that we've lost the lease * on this client. This can be a result of a number of events: * - We've not been able to contact etcd for a while and our TTL has * definitely expired (emits a LeaseInvalidError) * - We contacted etcd and it said that the lease was expired, or revoked * (emits a LeaseInvalidError). * - We weren't able to get an initial grant for the lease. * This is NOT fired if `revoke()` is called manually. */ public on(event: 'lost', handler: (err: EBError) => void): this /** * keepaliveFired is emitted whenever we start * trying to send a lease keepalive. * * keepaliveEstablished is emitted when a stream opens that we'll use for * keepalives. This is mostly for testing. */ public on(event: 'keepaliveFired' | 'keepaliveEstablished', handler: () => void): this /** * keepaliveSucceeded is emitted when we successfully hit etcd * with a keepalive for this lease. * * keepaliveFailed is emitted when an error happens in the keepalive loop. * We may be able to recover (e.g. by connecting to a different server), * the lease should not be considered revoked until `lost` is emitted. */ public on(event: 'keepaliveSucceeded' | 'keepaliveFailed', handler: (res: RPC.ILeaseKeepAliveResponse) => void): this /** * Implements EventEmitter.on(...). */ public on(event: string, handler: (...args: any[]) => void): this { return super.on(event, handler) } private teardown: () => void = () => { /* noop */ } /** * Tears down resources associated with the lease. */ private close() { this.innerState = LeaseState.Revoked this.teardown() } /** * Emits the error as having caused this lease to die, and tears * down the lease. */ private emitLoss(err: EBError) { this.close() this.emit('lost', err) } /** * keepalive starts a loop keeping the lease alive. */ private keepalive() { // When the cluster goes down, we keep trying to reconnect. But if we're // far past the end of our key's TTL, there's no way we're going to be // able to renew it. Fire a "lost". if (Date.now() - this.lastKeepAlive > 2 * 1000 * this.ttl) { this.close() this.emit( 'lost', new LeaseInvalidError('We lost connection to etcd and our lease has expired.'), ) return } this.client .leaseKeepAlive() .then(async (stream) => { if (this.innerState === LeaseState.Revoked) { stream.end() return } // this is what the official Go client uses, good enough: const keepAliveInterval = (1000 * this.ttl) / 3 const keepaliveTimer = setInterval(() => { this.fireKeepAlive(stream).then(() => { // ep }).catch((error: unknown) => { console.error('Keepalive failed:', error) }) } , keepAliveInterval) const keepAliveTimeout = debounce(1000 * this.ttl, () => { this.handleKeepaliveError(new CancelledError('GRPC watch stream has timed out.')) }, ) this.teardown = () => { this.teardown = () => undefined keepAliveTimeout.cancel() clearInterval(keepaliveTimer) stream.end() } keepAliveTimeout() // start the debounce stream .on('error', (err) => { this.handleKeepaliveError(err) }) .on('data', (res) => { if (leaseExpired(res)) { this.handleKeepaliveError(new LeaseInvalidError(res.ID)) return } this.lastKeepAlive = Date.now() keepAliveTimeout() this.emit('keepaliveSucceeded', res) }) this.emit('keepaliveEstablished') await this.fireKeepAlive(stream) }) .catch((err: unknown) => { this.handleKeepaliveError(err as Error) }) } private async fireKeepAlive(stream: RPC.IRequestStream) { this.emit('keepaliveFired') await this.grant() .then((id) => { stream.write({ ID: id }) }) .catch(() => { this.close() }) // will only throw if the initial grant failed } private handleKeepaliveError(err: Error) { if (this.innerState === LeaseState.Revoked) { return // often write-after-end, or something along those lines } this.emit('keepaliveFailed', castGrpcError(err)) this.teardown() if (err instanceof LeaseInvalidError) { this.emitLoss(err) } else { setTimeout(() => { this.keepalive() }, 100) } } }