/* --------------------------------------------------------- * Copyright (C) Microsoft Corporation. All rights reserved. *-------------------------------------------------------- */ import { ComparatorBuilder, PutBuilder } from './builder' import type IDriver from '../driver/i-driver' import { Lease } from './lease' import * as RPC from './rpc' import type { NSApplicator } from '../base/util' import type { CallOptions } from '../base/options' import { LockFailedError } from '../base/errors' /** * A Lock can be used for distributed locking to create atomic operations * across multiple systems. An EtcdLockFailedError is thrown if the lock * can't be acquired. * * Under the hood, the Lock uses a lease on a key which is revoked when the * the lock is released. If the server the lock is running on dies, or the * network is disconnected, etcd will time out the lock. * * Bear in mind that this means that in certain rare situations (a network * disconnect or wholesale etcd failure), the caller may lose the lock while * operations may still be running. * * A quick example: * * ``` * const { Etcd3 } = require('etcd3'); * const client = new Etcd3(); * * client.lock('my_resource').do(() => { * // The lock will automatically be released when this promise returns * return doMyAtomicAction(); * }); * ``` */ export class Lock { private leaseTTL = 30 private lease: Lease | null private callOptions: CallOptions | undefined constructor( private readonly driver: IDriver, private readonly namespace: NSApplicator, private readonly key: string | Buffer, ) {} /** * Sets the TTL of the lease underlying the lock. The lease TTL defaults * to 30 seconds. */ public ttl(seconds: number): this { if (this.lease) { throw new Error('Cannot set a lock TTL after acquiring the lock') } this.leaseTTL = seconds return this } /** * Sets the GRPC call options for this request. */ public options(options: CallOptions): this { this.callOptions = options return this } /** * Acquire attempts to acquire the lock, rejecting if it's unable to. */ public async acquire(): Promise { const lease = (this.lease = new Lease(this.driver, this.namespace, this.leaseTTL)) const kv = new RPC.KVClient(this.driver) return await lease.grant().then(async leaseID => await new ComparatorBuilder(kv, this.namespace) .and(this.key, 'Create', '==', 0) .then(new PutBuilder(kv, this.namespace, this.key).value('').lease(leaseID)) .options(this.callOptions) .commit() .then(async (res) => { if (res.succeeded) { return this } return await this.release() .catch(() => undefined) .then(() => { throw new LockFailedError(`Failed to acquire a lock on ${this.key.toString()}`) }) })) } /** * Returns the lease associated with this lock, if any. Returns null if * the lock has not been acquired. */ public async leaseId(): Promise { return this.lease ? await this.lease.grant() : await Promise.resolve(null) } /** * Release frees the lock. */ public async release(): Promise { if (!this.lease) { throw new Error('Attempted to release a lock which was not acquired') } await this.lease.revoke(this.callOptions) } /** * `do()` wraps the inner function. It acquires the lock before running * the function, and releases the lock after any promise the function * returns resolves or throws. */ public async do(fn: () => T | Promise): Promise { return await this.acquire() .then(fn) .then(async value => await this.release().then(() => value)) .catch(async (err: unknown) => await this.release().then(() => { throw err as Error }), ) } }