import { DBClient, type Watcher, type IKeyValue } from '../' import { onceEvent } from '../base/util' import { getOptions, tearDownTestClient, } from './util' import { test } from '@edgeros/tapes' import assert from 'assert' import { proxy } from '../../deps/proxy' import { IS_JSRE } from '../../deps/common' async function before() { const client = new DBClient(getOptions()) await client.init() return client } async function after(client: DBClient) { await tearDownTestClient(client) } function getWatchers(client: DBClient) { // eslint-disable-next-line @typescript-eslint/dot-notation -- i return client['watchManager']['watchers'] } async function expectWatching(client: DBClient, watcher: Watcher, key: string | Buffer) { return await Promise.all([ client.put(key).value('updated!'), onceEvent(watcher, 'put').then((res: IKeyValue) => { assert.equal(res.key.toString(), key) assert.equal(res.value.toString(), 'updated!') }), ]).then(() => watcher) } async function expectNotWatching(client: DBClient, watcher: Watcher, key: string | Buffer) { let watching = false const listener = () => (watching = true) watcher.on('put', listener) await client.put(key).value('updated!') return await new Promise((resolve) => { setTimeout(() => { assert.equal(watching, false) resolve(watcher) }, 2000) }) } test('watch() - subscription - subscribes before the connection is established', async (t) => { const client = await before() const watcher = await client.watch().key('foo1').create() await expectWatching(client, watcher, 'foo1') t.deepEqual(getWatchers(client), [watcher], 'watch() - subscription - subscribes before the connection is established - ') await watcher.cancel() await after(client) t.end() }) test('watch() - subscription - subscribes while the connection is still being established', async (t) => { const client = await before() const watcher1 = client.watch().key('foo1').create() const watcher2 = client.watch().key('bar').create() const watchers = await Promise.all([ watcher1.then(async w => await expectWatching(client, w, 'foo1')), watcher2.then(async w => await expectWatching(client, w, 'bar')), ]) t.deepEqual(getWatchers(client), watchers, 'watch() - subscription - subscribes while the connection is still being established - ') await (await watcher1).cancel() await (await watcher2).cancel() await after(client) t.end() }) test('watch() - subscription - subscribes in series', async (t) => { const client = await before() const watcher1 = client.watch().key('foo1').watcher() const watcher2 = client.watch().key('bar').watcher() const events: string[] = [] watcher1.on('connecting', () => events.push('connecting1')) watcher1.on('connected', () => events.push('connected1')) watcher2.on('connecting', () => events.push('connecting2')) watcher2.on('connected', () => events.push('connected2')) await onceEvent(watcher2, 'connected') t.deepEqual(events, ['connecting1', 'connected1', 'connecting2', 'connected2'], 'watch() - subscription - subscribes in series - ') await watcher1.cancel() await watcher2.cancel() await after(client) t.end() }) test('watch() - subscription - subscribes after the connection is fully established', async (t) => { const client = await before() const watcher1 = await client.watch().key('foo1').create() await expectWatching(client, watcher1, 'foo1') const watcher2 = await client.watch().key('bar').create() await expectWatching(client, watcher2, 'bar') t.deepEqual(getWatchers(client), [watcher1, watcher2], 'watch() - subscription - subscribes after the connection is fully established - ') await watcher1.cancel() await watcher2.cancel() await after(client) t.end() }) test('watch() - subscription - allows successive resubscription (issue #51)', async (t) => { const client = await before() const watcher1 = await client.watch().key('foo1').create() await expectWatching(client, watcher1, 'foo1') await watcher1.cancel() const watcher2 = await client.watch().key('foo1').create() await expectWatching(client, watcher2, 'foo1') await watcher2.cancel() t.pass('watch() - subscription - allows successive resubscription (issue #51) - ') await after(client) t.end() }) test('watch() - unsubscribing - unsubscribes while the connection is established', async (t) => { const client = await before() const watcher = await client.watch().key('foo1').create() watcher.lastRevision() await watcher.cancel() await expectNotWatching(client, watcher, 'foo1') t.deepEqual(getWatchers(client), [], 'watch() - unsubscribing - unsubscribes while the connection is established - ') await after(client) t.end() }) test('watch() - network interruptions', async (t) => { const client = await before() await proxy.activate() const proxiedClient = new DBClient(getOptions()) await proxiedClient.init() const watcher = await proxiedClient.watch().key('foo1').create() proxy.suspend() watcher.on('disconnected', () => { console.warn('disconnect========') }) await onceEvent(watcher, 'disconnected') proxy.unsuspend() await onceEvent(watcher, 'connected') await expectWatching(proxiedClient, watcher, 'foo1') await watcher.cancel() proxiedClient.close() await proxy.deactivate() t.pass('watch() - network interruptions - ') await after(client) t.end() }) test('watch() - caps watchers revisions', async (t) => { const client = await before() await proxy.activate() const proxiedClient2 = new DBClient(getOptions()) await proxiedClient2.init() const watcher2 = await proxiedClient2.watch().key('foo1').create() proxy.suspend() await onceEvent(watcher2, 'disconnected') const actualRevision = Number(watcher2.request.start_revision) watcher2.request.start_revision = 999999 proxy.unsuspend() await onceEvent(watcher2, 'connected') assert.equal(Number(watcher2.request.start_revision), actualRevision) await watcher2.cancel() proxiedClient2.close() await proxy.deactivate() t.pass('watch() - caps watchers revisions - ') await after(client) t.end() }) // jsre 上此场景会抛异常 if (!IS_JSRE) { test('watch() - unsubscribes while the connection is being reestablished', async (t) => { const client = await before() await proxy.activate() const proxiedClient = new DBClient(getOptions()) await proxiedClient.init() const watcher = await proxiedClient.watch().key('foo1').create() proxy.suspend() proxy.unsuspend() await watcher.cancel() t.deepEqual(getWatchers(client), [], 'watch() - unsubscribes while the connection is being reestablished - ') proxiedClient.close() await proxy.deactivate() await after(client) t.end() }) } // test('watch() - with options', async (t) => { // t.plan(3) // await proxy.deactivate() // const client = await before() // const watcher = await client.watch() // .prefix('/a') // .inRange(Range.from({ start: '/a/1', end: '/a/3' })) // .ignore('delete') // .only('put') // .startRevision('0') // .withPreviousKV() // .create() // let deleteNum = 0 // const putKeyMap = new Map() // const pkvArr = [] // const wPromise = new Promise((resolve, reject) => { // watcher.on('put', (kv, pkv) => { // if (pkv) { // pkvArr.push(pkv) // } // putKeyMap.set(kv.key.toString(), kv) // }) // watcher.on('delete', (kv) => { // deleteNum++ // }) // setTimeout(async () => { // await watcher.cancel() // resolve(null) // }, 2000) // }) // await client.put('/a/1').value('1').exec() // await client.put('/a/2').value('2').exec() // await client.put('/a/3').value('3').exec() // await client.delete().key('/a/3').exec() // await client.delete().key('/a/2').exec() // await client.put('/a/1').value('2').exec() // await wPromise // t.ok(deleteNum === 0, 'watch() - with options ignore delete') // t.ok(pkvArr.length === 1, 'watch() - with options withPreviousKV') // t.ok(putKeyMap.size === 2, 'watch() - with options inRange') // t.end() // await after(client) // }) // test('watch manager', async (t) => { // await proxy.deactivate() // const client = await before() // const watcher = await client.watch().key('foo1').create() // // eslint-disable-next-line @typescript-eslint/dot-notation -- i // const manager = watcher['manager'] // manager.state = 4 // try { manager.attach(watcher) } catch (error) { // t.pass('watch manager attach state error.') // } // try { manager.getStream() } catch (error) { // t.pass('watch manager getStream error.') // } // manager.state = 1 // const stream = manager.stream // manager.stream = {} // try { manager.getStream() } catch (error) { // t.pass('watch manager getStream state == Connected error.') // } // try { manager.establishStream() } catch (error) { // t.pass('watch manager establishStream state != Idle error.') // } // manager.state = 3 // try { manager.destroyStream() } catch (error) { // t.pass('watch manager destroyStream state state != Connected error.') // } // manager.state = 2 // try { manager.destroyStream() } catch (error) { // t.pass('watch manager destroyStream Cannot call with active watchers.') // } // manager.stream = stream // await watcher.cancel() // t.end() // await after(client) // })