/** * Copyright (c) 2022 EdgerOS Team. * All rights reserved. * * Detailed license information can be found in the LICENSE file. * * Author : xueqiang * Date : 2024-08-06 10:08:45 * LastEditors : 薛强 * LastEditTime : 2024-11-21 16:08:18 */ import { DBClient, LeaseInvalidError, type Lease } from '../' import { onceEvent } from '../base/util' import { test } from '@edgeros/tapes' import { containSubset, createTestClientAndKeys, getOptions, tearDownTestClient, unmockedDelay, } from './util' import assert from 'assert' import { proxy, TrafficDirection } from '../../deps/proxy' import { sleep } from '../../deps/common' const before = async function () { return await createTestClientAndKeys() } const after = async function (lease: Lease | undefined, client: DBClient) { if (!lease?.revoked()) { await lease?.revoke() } await tearDownTestClient(client) } const watchEmission = function (event: string | symbol, lease: Lease) { const output = { data: '', fired: false } lease.once(event, (data) => { console.warn('........................', data) // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment -- i output.data = data output.fired = true }) return output } test('lease() - throws if trying to use too short of a ttl, or an undefined ttl', async (t) => { const client = await before() t.throws(() => client.lease(0), /must be at least 1 second/) t.pass('lease() - throws if trying to use too short of a ttl, or an undefined ttl - ') await after(undefined, client) t.end() }) test('lease() - provides basic lease lifecycle', async (t) => { const client = await before() const lease = client.lease(100) await lease.put('leased').value('foo') assert.equal((await client.get('leased').exec()).kvs[0].lease, await lease.grant()) await lease.revoke() assert.equal(await client.get('leased').buffer(), null) t.pass('lease() - provides basic lease lifecycle - ') // eslint-disable-next-line @typescript-eslint/ban-ts-comment -- i // @ts-ignore await lease.client.leaseLeases() await after(lease, client) t.end() }) test('lease() - attaches leases through transactions', async (t) => { const client = await before() const lease = client.lease(100) await lease.put('leased').value('foo') const result = await client .if('foo1', 'Value', '==', 'bar1') .then(lease.put('leased').value('foo')) .commit() assert.equal(result.succeeded, true) assert.equal((await client.get('leased').exec()).kvs[0].lease, await lease.grant()) await lease.revoke() assert.equal(await client.get('leased').buffer(), null) t.pass('lease() - attaches leases through transactions - ') await after(lease, client) t.end() }) test('lease() - runs immediate keepalives', async (t) => { const client = await before() const lease = client.lease(100) // eslint-disable-next-line @typescript-eslint/ban-ts-comment -- i // @ts-ignore const contain = containSubset(await lease.keepaliveOnce(), { ID: await lease.grant(), TTL: '100', }) t.ok(contain, 'lease() - runs immediate keepalives - ') await lease.keepaliveOnce() await after(lease, client) t.end() }) test('lease() - emits a lost event if the lease is invalidated', async (t) => { const client = await before() const lease = client.lease(100) let err: any = undefined lease.on('lost', (e) => { t.ok(lease.revoked()) err = e }) t.notOk(lease.revoked()) await client.leaseClient.leaseRevoke({ ID: await lease.grant() }) await lease .keepaliveOnce() .then(() => { throw new Error('expected to reject') }) .catch((err2: unknown) => { t.equal(err2, err) t.ok(err2 instanceof LeaseInvalidError) t.ok(lease.revoked()) }) t.pass('lease() - emits a lost event if the lease is invalidated - ') await after(lease, client) t.end() }) test('lease() - emits a loss if the touched key is lost', async (t) => { const client = await before() const lease = client.lease(100, { autoKeepAlive: false }) // eslint-disable-next-line @typescript-eslint/ban-ts-comment -- i // @ts-ignore lease.leaseID = Promise.resolve('123456789') const lost = onceEvent(lease, 'lost') try { await lease.put('foo').value('bar') } catch (e) { t.ok(e instanceof LeaseInvalidError) t.equal(e, await lost) t.ok(lease.revoked()) } t.pass('lease() - emits a loss if the touched key is lost - ') await after(lease, client) t.end() }) test('lease() - allows disabling auto keep alives', async (t) => { const client = await before() const lease = client.lease(60, { autoKeepAlive: false }) const kaFired = watchEmission('keepaliveFired', lease) await sleep(2000) t.ok(!kaFired.fired, 'lease() - allows disabling auto keep alives - ') await after(lease, client) t.end() }) async function cronsBefore(client: DBClient, time = 60) { const lease = client.lease(time) await onceEvent(lease, 'keepaliveEstablished') return lease } test('crons - touches the lease ttl at the correct interval', async (t) => { try { const client = await before() const lease = await cronsBefore(client, 2) watchEmission('keepaliveFired', lease) // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment -- i const res = await onceEvent(lease, 'keepaliveSucceeded') // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access -- i t.equal(res.TTL, '2') t.pass('crons - touches the lease ttl at the correct interval - ') await after(lease, client) t.end() } catch (err) { console.log(err) } }) test('crons - stops touching the lease if released passively', async (t) => { const client = await before() const lease = await cronsBefore(client) const kaFired = watchEmission('keepaliveFired', lease) lease.release() await sleep(2000) t.notOk(kaFired.fired) t.pass('crons - stops touching the lease if released passively - ') await after(lease, client) t.end() }) test('lease() - is resilient to network interruptions', async (t) => { const client = await before() await proxy.activate() const proxiedClient = new DBClient(getOptions()) await proxiedClient.init() const lease = proxiedClient.lease(100) await lease.grant() proxy.suspend() await onceEvent(lease, 'keepaliveFailed') proxy.unsuspend() await onceEvent(lease, 'keepaliveSucceeded') await lease.revoke() proxiedClient.close() await proxy.deactivate() proxy.suspend() t.pass('lease() - is resilient to network interruptions - ') await after(lease, client) t.end() }) test('lease() - marks leases as failed if the server is not contacted for a while', async (t) => { const client = await before() await proxy.activate() const proxiedClient = new DBClient(getOptions()) await proxiedClient.init() const lease = proxiedClient.lease(1) await lease.grant() proxy.suspend() // eslint-disable-next-line @typescript-eslint/ban-ts-comment -- i // @ts-ignore lease.lastKeepAlive = Date.now() - 2000 // speed things up a little // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment -- i const err = await onceEvent(lease, 'lost') // eslint-disable-next-line @typescript-eslint/no-unsafe-argument, @typescript-eslint/no-unsafe-member-access -- i t.match(err.message, /our lease has expired/) proxiedClient.close() await proxy.deactivate() proxy.suspend() t.pass('lease() - marks leases as failed if the server is not contacted for a while - ') await after(lease, client) t.end() }) test('crons - marks leases as failed if etcd does not respond to keepalives in time (#110)', async (t) => { const client = await before() let lease = await cronsBefore(client) await lease.revoke() await proxy.activate() const proxiedClient = new DBClient(getOptions()) await proxiedClient.init() lease = proxiedClient.lease(1) await lease.grant() proxy.pause(TrafficDirection.FromEtcd) const failedEvent = watchEmission('keepaliveFailed', lease) // await sleep(5000) await unmockedDelay(2) // drain task queues assert.ok(!failedEvent.fired) await sleep(1000) await unmockedDelay(2) // drain task queues console.log('==========', failedEvent.fired) assert.ok(failedEvent.fired) proxy.resume(TrafficDirection.FromEtcd) await lease.revoke() proxiedClient.close() // eslint-disable-next-line @typescript-eslint/no-floating-promises -- i proxy.deactivate() proxy.suspend() t.pass('crons - marks leases as failed if etcd does not respond to keepalives in time (#110) - ') await after(lease, client) t.end() })