/* --------------------------------------------------------- * Copyright (C) Microsoft Corporation. All rights reserved. *-------------------------------------------------------- */ import { type Rangable, Range } from './range' import * as RPC from './rpc' import type { CallOptions } from '../base/options' import { type NSApplicator, PromiseWrap, toBuffer } from '../base/util' const emptyBuffer = Buffer.from([]) /** * Comparators can be passed to various operations in the ComparatorBuilder. */ export const comparator = { '==': RPC.CompareResult.Equal, '===': RPC.CompareResult.Equal, '>': RPC.CompareResult.Greater, '<': RPC.CompareResult.Less, '!=': RPC.CompareResult.NotEqual, '!==': RPC.CompareResult.NotEqual, } export interface ICompareTarget { value: RPC.CompareTarget key: keyof RPC.ICompare } export interface IOperation { op: () => Promise } /** * compareTarget are the types of things that can be compared against. */ export const compareTarget: { [key in keyof typeof RPC.CompareTarget]: keyof RPC.ICompare } = { Value: 'value', Version: 'version', Create: 'create_revision', Mod: 'mod_revision', Lease: 'lease', } /** * assertWithin throws a helpful error message if the value provided isn't * a key in the given map. */ function assertWithin(map: T, value: keyof T, thing: string) { if (!(value in map)) { const keys = Object.keys(map).join('" "') throw new Error(`Unexpected "${String(value)}" in ${thing}. Possible values are: "${keys}"`) } } /** * RangeBuilder is a primitive builder for range queries on the kv store. * It's extended by the Single and MultiRangeBuilders, which contain * the concrete methods to execute the built query. */ export abstract class RangeBuilder extends PromiseWrap implements IOperation { protected request: RPC.IRangeRequest = { key: Buffer.alloc(0) } protected callOptions: CallOptions | undefined constructor(protected readonly namespace: NSApplicator) { super() } /** * revision is the point-in-time of the key-value store to use for the range. */ public revision(rev: number | string): this { this.request.revision = rev return this } /** * serializable sets the range request to use serializable member-local reads. */ public serializable(serializable: boolean): this { this.request.serializable = serializable return this } /** * minModRevision sets the minimum modified revision of keys to return. */ public minModRevision(minModRevision: number | string): this { this.request.min_mod_revision = minModRevision return this } /** * maxModRevision sets the maximum modified revision of keys to return. */ public maxModRevision(maxModRevision: number | string): this { this.request.max_mod_revision = maxModRevision return this } /** * minCreateRevision sets the minimum create revision of keys to return. */ public minCreateRevision(minCreateRevision: number | string): this { this.request.min_create_revision = minCreateRevision return this } /** * maxCreateRevision sets the maximum create revision of keys to return. */ public maxCreateRevision(maxCreateRevision: number | string): this { this.request.max_create_revision = maxCreateRevision return this } /** * Sets the GRPC call options for this request. */ public options(options: CallOptions | undefined): this { this.callOptions = options return this } /** * Returns the request op for this builder, used in transactions. */ public async op(): Promise { return await Promise.resolve({ request_range: this.namespace.applyToRequest(this.request) }) } } /** * SingleRangeBuilder is a query builder that looks up a single key. */ export class SingleRangeBuilder extends RangeBuilder { constructor( private readonly kv: RPC.KVClient, namespace: NSApplicator, key: string | Buffer, ) { super(namespace) this.request.key = toBuffer(key) this.request.limit = 1 } /** * Runs the built request and parses the returned key as JSON, * or returns `null` if it isn't found. */ public async json(): Promise { return await this.string().then(JSON.parse) } /** * Runs the built request and returns the value of the returned key as a * string, or `null` if it isn't found. */ public async string(encoding: BufferEncoding = 'utf8'): Promise { return await this.exec().then(res => res.kvs.length === 0 ? null : res.kvs[0].value.toString(encoding), ) } /** * Runs the built request, and returns the value parsed as a number. Resolves * as NaN if the value can't be parsed as a number. */ public async number(): Promise { return await this.string().then(value => (value === null ? null : Number(value))) } /** * Runs the built request and returns the value of the returned key as a * buffer, or `null` if it isn't found. */ public async buffer(): Promise { return await this.exec().then(res => (res.kvs.length === 0 ? null : res.kvs[0].value)) } /** * Returns whether the key exists. */ public async exists(): Promise { this.request.keys_only = true return await this.exec().then(r => r.count !== '0') } /** * Runs the built request and returns the raw response from etcd. */ public async exec(): Promise { return await this.kv.range(this.namespace.applyToRequest(this.request), this.callOptions) } /** * @override */ protected async createPromise(): Promise { return await this.string() } } /** * MultiRangeBuilder is a query builder that looks up multiple keys. */ export class MultiRangeBuilder extends RangeBuilder> { constructor( private readonly kv: RPC.KVClient, namespace: NSApplicator, ) { super(namespace) this.prefix(emptyBuffer) } /** * Prefix instructs the query to scan for all keys that have the provided * prefix. */ public prefix(value: string | Buffer): this { return this.inRange(Range.prefix(value)) } /** * inRange instructs the builder to get keys in the specified byte range. */ public inRange(r: Rangable): this { const range = Range.from(r) this.request.key = range.start this.request.range_end = range.end return this } /** * All will instruct etcd to get all keys. */ public all(): this { return this.prefix('') } /** * Limit sets the maximum number of results to retrieve. */ public limit(count: number): this { this.request.limit = isFinite(count) ? count : 0 return this } /** * Sort specifies how the result should be sorted. */ public sort(target: keyof typeof RPC.SortTarget, order: keyof typeof RPC.SortOrder): this { assertWithin(RPC.SortTarget, target, 'sort order in client.get().sort(...)') assertWithin(RPC.SortOrder, order, 'sort order in client.get().sort(...)') this.request.sort_target = RPC.SortTarget[target] this.request.sort_order = RPC.SortOrder[order] return this } /** * count returns the number of keys that match the query. */ public async count(): Promise { this.request.count_only = true return await this.exec().then(res => Number(res.count)) } /** * Keys returns an array of keys matching the query. */ public async keys(encoding: BufferEncoding = 'utf8'): Promise { this.request.keys_only = true return await this.exec().then(res => res.kvs.map(kv => kv.key.toString(encoding))) } /** * Keys returns an array of keys matching the query, as buffers. */ public async keyBuffers(): Promise { this.request.keys_only = true return await this.exec().then(res => res.kvs.map(kv => kv.key)) } /** * Runs the built request and parses the returned keys as JSON. */ public async json(): Promise> { return await this.mapValues(buf => JSON.parse(buf.toString())) } /** * Runs the built request and returns the value of the returned key as a * string, or `null` if it isn't found. */ public async strings(encoding: BufferEncoding = 'utf8'): Promise> { return await this.mapValues(buf => buf.toString(encoding)) } /** * Runs the built request and returns the values of keys as numbers. May * resolve to NaN if the keys do not contain numbers. */ public async numbers(): Promise> { return await this.mapValues(buf => Number(buf.toString())) } /** * Runs the built request and returns the value of the returned key as a * buffers. */ public async buffers(): Promise> { return await this.mapValues(b => b) } /** * Runs the built request and returns the raw response from etcd. */ public async exec(): Promise { return await this.kv .range(this.namespace.applyToRequest(this.request), this.callOptions) .then((res) => { for (const kv of res.kvs) { kv.key = this.namespace.unprefix(kv.key) } return res }) } /** * @override */ protected async createPromise(): Promise> { return await this.strings() } /** * Dispatches a call to the server, and creates a map by running the * iterator over the values returned. */ private async mapValues(iterator: (buf: Buffer) => T): Promise> { return await this.exec().then((res) => { const output: Record = {} for (const kv of res.kvs) { output[kv.key.toString()] = iterator(kv.value) } return output }) } } /** * DeleteBuilder builds a deletion. */ export class DeleteBuilder extends PromiseWrap { private readonly request: RPC.IDeleteRangeRequest = { key: Buffer.alloc(0) } private callOptions: CallOptions | undefined constructor( private readonly kv: RPC.KVClient, private readonly namespace: NSApplicator, ) { super() } /** * key sets a single key to be deleted. */ public key(value: string | Buffer): this { this.request.key = toBuffer(value) this.request.range_end = undefined return this } /** * key sets a single key to be deleted. */ public prefix(value: string | Buffer): this { return this.range(Range.prefix(value)) } /** * Sets the byte range of values to delete. */ public range(range: Range): this { this.request.key = range.start this.request.range_end = range.end return this } /** * All will instruct etcd to wipe all keys. */ public all(): this { return this.prefix('') } /** * inRange instructs the builder to delete keys in the specified byte range. */ public inRange(r: Rangable): this { const range = Range.from(r) this.request.key = range.start this.request.range_end = range.end return this } /** * getPrevious instructs etcd to *try* to get the previous value of the * key before setting it. One may not always be available if a compaction * takes place. */ public async getPrevious(): Promise { this.request.prev_kv = true return await this.exec().then(res => res.prev_kvs) } /** * Sets the GRPC call options for this request. */ public options(options: CallOptions | undefined): this { this.callOptions = options return this } /** * exec runs the delete put request. */ public async exec(): Promise { return await this.kv.deleteRange(this.namespace.applyToRequest(this.request), this.callOptions) } /** * Returns the request op for this builder, used in transactions. */ public async op(): Promise { return await Promise.resolve({ request_delete_range: this.namespace.applyToRequest(this.request), }) } /** * @override */ protected async createPromise(): Promise { return await this.exec() } } /** * PutBuilder builds a "put" request to etcd. */ export class PutBuilder extends PromiseWrap { private readonly request: RPC.IPutRequest = { key: Buffer.alloc(0) } private leaseFilter?: string | number | Promise private callOptions: CallOptions | undefined constructor( private readonly kv: RPC.KVClient, private readonly namespace: NSApplicator, key: string | Buffer, ) { super() this.request.key = toBuffer(key) } /** * value sets the value that will be stored in the key. */ public value(value: string | Buffer | number): this { this.request.value = toBuffer(value) return this } /** * Sets the lease value to use for storing the key. You usually don't * need to use this directly, use `client.lease()` instead! */ public lease(lease: number | string | Promise): this { this.leaseFilter = lease return this } /** * Updates the key on its current lease, regardless of what that lease is. */ public ignoreLease(): this { this.request.ignore_lease = true return this } /** * Sets the GRPC call options for this request. */ public options(options: CallOptions | undefined): this { this.callOptions = options return this } /** * getPrevious instructs etcd to *try* to get the previous value of the * key before setting it. One may not always be available if a compaction * takes place. */ public async getPrevious(): Promise { this.request.prev_kv = true return await this.exec().then(res => ({ ...res.prev_kv, header: res.header })) } /** * Touch updates the key's revision without changing its value. This is * equivalent to the etcd 'ignore value' flag. */ public async touch(): Promise { this.request.value = undefined this.request.ignore_value = true return await this.exec() } /** * exec runs the put request. */ public async exec(): Promise { await this.applyLease() return await this.kv.put(this.namespace.applyToRequest(this.request), this.callOptions) } /** * Returns the request op for this builder, used in transactions. */ public async op(): Promise { await this.applyLease() return { request_put: this.namespace.applyToRequest(this.request) } } /** * @override */ protected async createPromise(): Promise { return await this.exec() } private async applyLease() { if (!this.leaseFilter) { return } if (typeof this.leaseFilter === 'number' || typeof this.leaseFilter === 'string') { this.request.lease = this.leaseFilter } this.request.lease = await this.leaseFilter } } /** * ComparatorBuilder builds a comparison between keys. This can be used * for atomic operations in etcd, such as locking: * * ``` * const id = uuid.v4(); * * function lock() { * return client.if('my_lock', 'Create', '==', 0) * .then(client.put('my_lock').value(id)) * .else(client.get('my_lock')) * .commit() * .then(result => console.log(result.succeeded === id ? 'lock acquired' : 'already locked')); * } * * function unlock() { * return client.if('my_lock', 'Value', '==', id) * .then(client.delete().key('my_lock')) * .commit(); * } * ``` */ export class ComparatorBuilder { private readonly request: { compare: Array> success: Array> failure: Array> } = { compare: [], success: [], failure: [] } private callOptions: CallOptions | undefined constructor( private readonly kv: RPC.KVClient, private readonly namespace: NSApplicator, ) {} /** * Sets the GRPC call options for this request. */ public options(options: CallOptions | undefined): this { this.callOptions = options return this } /** * Adds a new clause to the transaction. */ public and( key: string | Buffer, column: keyof typeof RPC.CompareTarget, cmp: keyof typeof comparator, value: string | Buffer | number, ): this { assertWithin(compareTarget, column, 'comparison target in client.and(...)') assertWithin(comparator, cmp, 'comparator in client.and(...)') if (column === 'Value') { value = toBuffer(value as string | Buffer) } this.request.compare.push( Promise.resolve({ key: this.namespace.applyKey(toBuffer(key)), result: comparator[cmp], target: RPC.CompareTarget[column], [compareTarget[column]]: value, }), ) return this } /** * Adds one or more consequent clauses to be executed if the comparison * is truthy. */ public then(...clauses: Array): this { this.request.success = this.mapOperations(clauses) return this } /** * Adds one or more consequent clauses to be executed if the comparison * is falsey. */ public else(...clauses: Array): this { this.request.failure = this.mapOperations(clauses) return this } /** * Runs the generated transaction and returns its result. */ public async commit(): Promise { return await this.kv.txn( { compare: await Promise.all(this.request.compare), failure: await Promise.all(this.request.failure), success: await Promise.all(this.request.success), }, this.callOptions, ) } /** * Low-level method to add */ public mapOperations(ops: Array): Array> { return ops.map(async (op) => { if (typeof (op as IOperation).op === 'function') { return await (op as IOperation).op() } return await Promise.resolve(op as RPC.IRequestOp) }) } }