import { map } from '@tanstack/db-ivm' import { ConditionalSelect, PropRef, Value as ValClass, isExpressionLike, } from '../ir.js' import { AggregateNotSupportedError } from '../../errors.js' import { compileExpression, isCaseWhenConditionTrue } from './evaluators.js' import { containsAggregate } from './group-by.js' import type { Aggregate, BasicExpression, Select, SelectValueExpression, } from '../ir.js' import type { KeyedStream, NamespacedAndKeyedStream, NamespacedRow, } from '../../types.js' /** * Type for operations array used in select processing */ type SelectOp = | { kind: `merge` targetPath: Array source: (row: NamespacedRow) => any } | { kind: `field`; alias: string; compiled: (row: NamespacedRow) => any } /** * Unwraps any Value expressions */ function unwrapVal(input: any): any { if (input instanceof ValClass) return input.value return input } /** * Processes a merge operation by merging source values into the target path */ function processMerge( op: Extract, namespacedRow: NamespacedRow, selectResults: Record, ): void { const value = op.source(namespacedRow) if (value && typeof value === `object`) { // Ensure target object exists let cursor: any = selectResults const path = op.targetPath if (path.length === 0) { // Top-level merge for (const [k, v] of Object.entries(value)) { selectResults[k] = unwrapVal(v) } } else { for (let i = 0; i < path.length; i++) { const seg = path[i]! if (i === path.length - 1) { const dest = (cursor[seg] ??= {}) if (typeof dest === `object`) { for (const [k, v] of Object.entries(value)) { dest[k] = unwrapVal(v) } } } else { const next = cursor[seg] if (next == null || typeof next !== `object`) { cursor[seg] = {} } cursor = cursor[seg] } } } } } /** * Processes a non-merge operation by setting the field value at the specified alias path */ function processNonMergeOp( op: Extract, namespacedRow: NamespacedRow, selectResults: Record, ): void { // Support nested alias paths like "meta.author.name" const path = op.alias.split(`.`) if (path.length === 1) { selectResults[op.alias] = op.compiled(namespacedRow) } else { let cursor: any = selectResults for (let i = 0; i < path.length - 1; i++) { const seg = path[i]! const next = cursor[seg] if (next == null || typeof next !== `object`) { cursor[seg] = {} } cursor = cursor[seg] } cursor[path[path.length - 1]!] = unwrapVal(op.compiled(namespacedRow)) } } /** * Processes a single row to generate select results */ function processRow( [key, namespacedRow]: [unknown, NamespacedRow], ops: Array, ): [unknown, typeof namespacedRow & { $selected: any }] { const selectResults: Record = {} for (const op of ops) { if (op.kind === `merge`) { processMerge(op, namespacedRow, selectResults) } else { processNonMergeOp(op, namespacedRow, selectResults) } } // Return the namespaced row with $selected added return [ key, { ...namespacedRow, $selected: selectResults, }, ] as [unknown, typeof namespacedRow & { $selected: typeof selectResults }] } /** * Processes the SELECT clause and places results in $selected * while preserving the original namespaced row for ORDER BY access */ export function processSelect( pipeline: NamespacedAndKeyedStream, select: Select, _allInputs: Record, ): NamespacedAndKeyedStream { // Build ordered operations to preserve authoring order (spreads and fields) const ops: Array = [] addFromObject([], select, ops) return pipeline.pipe(map((row) => processRow(row, ops))) } function compileSelectObject( obj: Record, ): (row: NamespacedRow) => any { const ops: Array = [] addFromObject([], obj, ops) return (row) => { const selectResults: Record = {} for (const op of ops) { if (op.kind === `merge`) { processMerge(op, row, selectResults) } else { processNonMergeOp(op, row, selectResults) } } return selectResults } } function compileSelectValue( value: SelectValueExpression | null | undefined, ): (row: NamespacedRow) => any { if (value == null) { return () => value } if (isConditionalSelectValue(value)) { if (containsAggregate(value)) { return () => null } return compileConditionalSelect(value) } if (value instanceof ValClass) { return () => value.value } if (value.type === `includesSubquery`) { return () => null } if (isNestedSelectObject(value)) { return compileSelectObject(value) } if ( isAggregateExpression(value as BasicExpression | Aggregate) || containsAggregate(value as BasicExpression | Aggregate) ) { return () => null } if (!isExpressionLike(value)) { return () => value } return compileExpression(value as BasicExpression) } function compileConditionalSelect( conditional: ConditionalSelect, ): (row: NamespacedRow) => any { const branches = conditional.branches.map((branch) => ({ condition: compileExpression(branch.condition), value: compileSelectValue(branch.value), })) const defaultFn = conditional.defaultValue === undefined ? undefined : compileSelectValue(conditional.defaultValue) return (row) => { for (const branch of branches) { if (isCaseWhenConditionTrue(branch.condition(row))) { return branch.value(row) } } return defaultFn !== undefined ? defaultFn(row) : null } } /** * Helper function to check if an expression is an aggregate */ function isAggregateExpression( expr: BasicExpression | Aggregate, ): expr is Aggregate { return expr.type === `agg` } /** * Processes a single argument in a function context */ export function processArgument( arg: BasicExpression | Aggregate, namespacedRow: NamespacedRow, ): any { if (isAggregateExpression(arg)) { throw new AggregateNotSupportedError() } // Pre-compile the expression and evaluate immediately const compiledExpression = compileExpression(arg) const value = compiledExpression(namespacedRow) return value } /** * Helper function to check if an object is a nested select object * * .select({ * id: users.id, * profile: { // <-- this is a nested select object * name: users.name, * email: users.email * } * }) */ function isNestedSelectObject(obj: any): boolean { return obj && typeof obj === `object` && !isExpressionLike(obj) } /** * Helper function to process select objects and build operations array */ function addFromObject( prefixPath: Array, obj: any, ops: Array, ) { for (const [key, value] of Object.entries(obj)) { if (key.startsWith(`__SPREAD_SENTINEL__`)) { const rest = key.slice(`__SPREAD_SENTINEL__`.length) const splitIndex = rest.lastIndexOf(`__`) const pathStr = splitIndex >= 0 ? rest.slice(0, splitIndex) : rest const isRefExpr = value && typeof value === `object` && `type` in (value as any) && (value as any).type === `ref` if (pathStr.includes(`.`) || isRefExpr) { // Merge into the current destination (prefixPath) from the referenced source path const targetPath = [...prefixPath] const expr = isRefExpr ? (value as BasicExpression) : (new PropRef(pathStr.split(`.`)) as BasicExpression) const compiled = compileExpression(expr) ops.push({ kind: `merge`, targetPath, source: compiled }) } else { // Table-level: pathStr is the alias; merge from namespaced row at the current prefix const tableAlias = pathStr const targetPath = [...prefixPath] ops.push({ kind: `merge`, targetPath, source: (row) => (row as any)[tableAlias], }) } continue } const expression = value as any if (isConditionalSelectValue(expression)) { if (containsAggregate(expression)) { ops.push({ kind: `field`, alias: [...prefixPath, key].join(`.`), compiled: () => null, }) continue } ops.push({ kind: `field`, alias: [...prefixPath, key].join(`.`), compiled: compileConditionalSelect(expression), }) continue } if (expression && expression.type === `includesSubquery`) { // Placeholder — field will be set to a child Collection by the output layer ops.push({ kind: `field`, alias: [...prefixPath, key].join(`.`), compiled: () => null, }) continue } if (isNestedSelectObject(expression)) { // Nested selection object addFromObject([...prefixPath, key], expression, ops) continue } if (isAggregateExpression(expression) || containsAggregate(expression)) { // Placeholder for group-by processing later. // Both plain aggregates (count(...)) and expressions wrapping // aggregates (coalesce(count(...), 0)) are deferred to processGroupBy. ops.push({ kind: `field`, alias: [...prefixPath, key].join(`.`), compiled: () => null, }) } else { if (expression === undefined || !isExpressionLike(expression)) { ops.push({ kind: `field`, alias: [...prefixPath, key].join(`.`), compiled: () => expression, }) continue } // If the expression is a Value wrapper, embed the literal to avoid re-compilation mishaps if (expression instanceof ValClass) { const val = expression.value ops.push({ kind: `field`, alias: [...prefixPath, key].join(`.`), compiled: () => val, }) } else { ops.push({ kind: `field`, alias: [...prefixPath, key].join(`.`), compiled: compileExpression(expression as BasicExpression), }) } } } } function isConditionalSelectValue(value: unknown): value is ConditionalSelect { return ( value instanceof ConditionalSelect || (value != null && typeof value === `object` && (value as { type?: unknown }).type === `conditionalSelect` && Array.isArray((value as { branches?: unknown }).branches)) ) }