import { Address } from '@ton/core' import type { TonClient, Transaction } from '@ton/ton' import type { LogFilter } from '../chain.ts' import { CCIPLogsRequiresStartError, CCIPLogsWatchRequiresFinalityError } from '../errors/index.ts' import { CCIPLogsAddressRequiredError } from '../errors/specialized.ts' import type { ChainTransaction, LeanNumbers } from '../types.ts' import { signalToPromise } from '../utils.ts' const DEFAULT_POLL_INTERVAL = 5000 async function* fetchTxsForward( opts: LeanNumbers & { pollInterval?: number }, { provider }: { provider: TonClient }, ) { const limit = Math.min(Number(opts.page) || 99, 99) // forward collect all matching txs in array const allTxs = [] as Transaction[] const notBefore = opts.startBlock == null ? undefined : BigInt(opts.startBlock) let batch: typeof allTxs, until: bigint | undefined = notBefore do { batch = await provider.getTransactions(Address.parse(opts.address!), { limit, ...(!!allTxs.length && { lt: allTxs[allTxs.length - 1]!.lt.toString(), hash: allTxs[allTxs.length - 1]!.hash().toString('base64'), }), ...(notBefore != null && { to_lt: notBefore.toString() }), }) until ??= batch[0]?.lt while (batch.length > 0 && batch[batch.length - 1]!.now < Number(opts.startTime ?? 0)) { batch.length-- // truncate tail of txs which are older than requested start } while (notBefore != null && batch.length > 0 && batch[batch.length - 1]!.lt < notBefore) { batch.length-- // truncate tail of txs which are older than requested startBlock } allTxs.push(...batch) // concat in descending order } while (batch.length >= limit) allTxs.reverse() // forward const notAfter = (typeof opts.endBlock !== 'number' && typeof opts.endBlock !== 'bigint') || Number(opts.endBlock) < 0 ? undefined : BigInt(opts.endBlock) while (notAfter != null && allTxs.length > 0 && allTxs[allTxs.length - 1]!.lt > notAfter) { allTxs.length-- // truncate head (after reverse) of txs newer than requested end } yield* allTxs // all past logs if (allTxs.length) until = allTxs[allTxs.length - 1]!.lt // if not watch mode, returns while (opts.watch && (!(opts.watch instanceof AbortSignal) || !opts.watch.aborted)) { const lastReq = performance.now() batch = await provider.getTransactions(Address.parse(opts.address!), { limit, to_lt: until?.toString(), }) batch.reverse() // forward for (const tx of batch) { until = tx.lt yield tx } let delay$ = AbortSignal.timeout( Math.max( Math.ceil((opts.pollInterval || DEFAULT_POLL_INTERVAL) - (performance.now() - lastReq)), 1, ), ) if (opts.watch instanceof AbortSignal) { if (opts.watch.aborted) break delay$ = AbortSignal.any([opts.watch, delay$]) } await signalToPromise(delay$).catch(() => false) } } /** * Internal method to get transactions for an address with pagination. * @param opts - Log filter options. * @returns Async generator of TON transactions. */ export async function* streamTransactionsForAddress( opts: LeanNumbers> & { pollInterval?: number }, ctx: { provider: TonClient getTransaction: (tx: Transaction) => Promise }, ): AsyncGenerator { if (!opts.address) throw new CCIPLogsAddressRequiredError() opts.endBlock ??= 'latest' const hasStart = opts.startBlock != null || opts.startTime != null if (!hasStart) throw new CCIPLogsRequiresStartError() if ( opts.watch && (((typeof opts.endBlock === 'number' || typeof opts.endBlock === 'bigint') && Number(opts.endBlock) > 0) || opts.endBefore) ) throw new CCIPLogsWatchRequiresFinalityError( typeof opts.endBlock === 'bigint' ? Number(opts.endBlock) : opts.endBlock, ) const allTransactions = fetchTxsForward(opts, ctx) // Process transactions for await (const tx of allTransactions) { yield await ctx.getTransaction(tx) } }