import type { BlipClient } from '../client.ts' import { ConnectionSender } from '../sender/sender.ts' import { type Command, type CommandMethods, type Domain, type Identity, Node, type NodeLike } from '../types/index.ts' import { randomId } from '../utils/random.ts' import { type URI, uriToString } from '../utils/uri.ts' type Unpacked = T extends Array ? U : T type PromiseOrIterator = S extends true ? AsyncIterable> : Promise type NamespaceCommand = Omit, 'uri' | 'id' | 'to'> & { uri: URI } export type SendCommandOptions = { to?: NodeLike ownerIdentity?: Identity | Domain domain?: Domain } export type ConsumeOptions = SendCommandOptions & { take?: number skip?: number fetchall?: boolean max?: number /** * If true, it will try to parallel fetch data in chunks * Be careful as this can be slower in situations with few data and will consume much more bandwidth * Set together with max or fetchall */ optimistic?: boolean } export class Namespace { constructor( protected readonly blipClient: BlipClient, private readonly ownerSubdomain: string, private readonly defaultOptions?: ConsumeOptions, ) {} get identity() { return `postmaster@${this.ownerSubdomain ? `${this.ownerSubdomain}.` : ''}${this.defaultOptions?.domain ?? 'msging.net'}` as Identity } public sendCommand( command: NamespaceCommand, opts: ConsumeOptions & { collection: true; stream: true }, ): PromiseOrIterator public sendCommand( command: NamespaceCommand, opts?: ConsumeOptions & { collection?: boolean; stream?: false }, ): PromiseOrIterator public sendCommand( command: NamespaceCommand, opts?: ConsumeOptions & { collection?: boolean; stream?: boolean }, ): Promise | AsyncIterable> { const options = { ...this.defaultOptions, ...opts } if (options.stream) { return this.sendIterableCommand(command, options) } else if (options.collection) { return this.sendCollectionCommand(command, options) } else { return this.sendPlainCommand(command, options) } } private prepareCommandContext( command: NamespaceCommand, options: ConsumeOptions, ) { const domain = options?.domain ?? (this.blipClient.sender instanceof ConnectionSender ? this.blipClient.sender.domain : 'msging.net') return { path: options?.ownerIdentity ? `lime://${options.ownerIdentity}${command.uri.path}` : command.uri.path, query: new Map(command.uri.query), owner: new Node('postmaster', `${this.ownerSubdomain ? `${this.ownerSubdomain}.` : ''}${domain}`), take: options?.take ?? Math.min(options?.max ?? 100, 100), } } private buildCommand( base: NamespaceCommand, path: string, query: Map, owner: Node, options: ConsumeOptions, ): Command { return { id: randomId(), to: options.to ?? owner, ...base, uri: uriToString({ path, query }), } as Command } private extractCollection(response: unknown): Array { if (response && typeof response === 'object') { const r = response as Record if (Array.isArray(r.items)) return r.items as Array if (Array.isArray(r.data)) return r.data as Array } return [] } private async *sendIterableCommand( command: Omit, 'uri' | 'id' | 'to'> & { uri: URI }, options: ConsumeOptions, ): AsyncIterable> { const { path, query, owner, take } = this.prepareCommandContext(command, options) let skip = options.skip ?? 0 let yielded = 0 while (true) { const q = new Map(query).set('$take', take.toString()).set('$skip', skip.toString()) const commandToSend = this.buildCommand(command, path, q, owner, options) const response = await this.blipClient.sender.sendCommand(commandToSend) const items = this.extractCollection>(response) if (!items.length) { break } for (const item of items) { yield item if (options.max !== undefined && ++yielded >= options.max) { return } } if (!options.fetchall || items.length < take) { break } skip += take } } private async sendPlainCommand( command: Omit, 'uri' | 'id' | 'to'> & { uri: URI }, options: ConsumeOptions, ): Promise { const { path, query, owner } = this.prepareCommandContext(command, options) const commandToSend = this.buildCommand(command, path, query, owner, options) const response = await this.blipClient.sender.sendCommand(commandToSend) return response as TResponse } private async sendCollectionCommand( command: Omit, 'uri' | 'id' | 'to'> & { uri: URI }, options: ConsumeOptions, ): Promise { const { path, query, owner, take } = this.prepareCommandContext(command, options) if (options.optimistic && !options.max && !options.fetchall) { throw new Error('Optimistic consume requires max or fetchall to be set') } let skip = options.skip ?? 0 let results: Array> = [] query.set('$take', take.toString()) while (true) { const remaining = options.max !== undefined ? options.max - results.length : Number.POSITIVE_INFINITY const pageCapacity = options.optimistic ? Math.min(Math.ceil(remaining / take), 20) : 1 const items = await Promise.all( Array.from({ length: pageCapacity }, async (_, i: number) => { const q = new Map(query).set('$skip', (skip + i * take).toString()) const cmd = this.buildCommand(command, path, q, owner, options) const response = await this.blipClient.sender.sendCommand(cmd) return this.extractCollection>(response) }), ) const flattened = items.flat() results = results.concat(flattened) if ( !options.fetchall || flattened.length < take * pageCapacity || (options.max !== undefined && results.length >= options.max) ) { break } skip += take * pageCapacity } return results as TResponse } }