/** * Pipe definition for Tinybird * Define SQL transformations and endpoints as TypeScript with full type safety */ import type { AnyTypeValidator } from "./types.js"; import type { AnyParamValidator } from "./params.js"; import type { DatasourceDefinition, SchemaDefinition, ColumnDefinition } from "./datasource.js"; import type { TokenDefinition, PipeTokenScope } from "./token.js"; import type { KafkaConnectionDefinition, S3ConnectionDefinition } from "./connection.js"; /** Symbol for brand typing pipes - use Symbol.for() for global registry */ export declare const PIPE_BRAND: unique symbol; /** Symbol for brand typing nodes - use Symbol.for() for global registry */ export declare const NODE_BRAND: unique symbol; /** * Parameter definition for a pipe */ export type ParamsDefinition = Record; /** * Output schema definition for a pipe */ export type OutputDefinition = Record; /** * Node configuration options */ export interface NodeOptions { /** Node name (must be valid identifier) */ name: string; /** SQL query for this node */ sql: string; /** Human-readable description */ description?: string; } /** * A node definition within a pipe */ export interface NodeDefinition { readonly [NODE_BRAND]: true; /** Node name */ readonly _name: string; /** Type marker for inference */ readonly _type: "node"; /** SQL query */ readonly sql: string; /** Description */ readonly description?: string; } /** * Create a node within a pipe * * @param options - Node configuration * @returns A node definition * * @example * ```ts * import { node } from '@tinybirdco/sdk'; * * const filteredNode = node({ * name: 'filtered', * sql: ` * SELECT * * FROM events * WHERE timestamp >= {{DateTime(start_date)}} * `, * }); * ``` */ export declare function node(options: NodeOptions): NodeDefinition; /** * Check if a value is a node definition */ export declare function isNodeDefinition(value: unknown): value is NodeDefinition; /** * Endpoint configuration for a pipe */ export interface EndpointConfig { /** Whether this pipe is exposed as an API endpoint */ enabled: boolean; /** Cache configuration */ cache?: { /** Whether caching is enabled */ enabled: boolean; /** Cache TTL in seconds */ ttl?: number; }; } /** * Materialized view configuration for a pipe */ export interface MaterializedConfig = DatasourceDefinition> { /** Target datasource where materialized data is written */ datasource: TDatasource; /** * Deployment method for materialized views. * Use 'alter' to update existing materialized views using ALTER TABLE ... MODIFY QUERY * instead of recreating the table. This preserves existing data and reduces deployment time. */ deploymentMethod?: "alter"; } /** * Copy pipe configuration */ export interface CopyConfig = DatasourceDefinition> { /** Target datasource where copied data is written */ datasource: TDatasource; /** * Copy mode: how data is ingested * - 'append': Appends the result to the target data source (default) * - 'replace': Every run completely replaces the destination Data Source content */ copy_mode?: "append" | "replace"; /** * Copy schedule: when the copy job runs * - A cron expression (e.g., "0 * * * *" for hourly) * - "@on-demand" for manual execution only * Defaults to "@on-demand" if not specified */ copy_schedule?: string; } /** * Sink export strategy. * - 'create_new': write new files on each run * - 'replace': replace destination data on each run */ export type SinkStrategy = "create_new" | "replace"; /** * S3 sink compression codec. */ export type SinkCompression = "none" | "gzip" | "snappy"; /** * Kafka sink configuration */ export interface KafkaSinkConfig { /** Kafka connection used to publish records */ connection: KafkaConnectionDefinition; /** Destination Kafka topic */ topic: string; /** Sink schedule (for example: @on-demand, @once, cron expression) */ schedule: string; } /** * S3 sink configuration */ export interface S3SinkConfig { /** S3 connection used to write exported files */ connection: S3ConnectionDefinition; /** Destination bucket URI (for example: s3://bucket/prefix/) */ bucketUri: string; /** Output filename template (supports Tinybird placeholders) */ fileTemplate: string; /** Output format (for example: csv, ndjson) */ format: string; /** Sink schedule (for example: @on-demand, @once, cron expression) */ schedule: string; /** Export strategy */ strategy?: SinkStrategy; /** Compression codec */ compression?: SinkCompression; } /** * Sink pipe configuration (Kafka or S3 only) */ export type SinkConfig = KafkaSinkConfig | S3SinkConfig; /** * Inline token configuration for pipe access */ export interface InlinePipeTokenConfig { /** Token name */ name: string; } /** * Token reference with pipe-specific scope */ export interface PipeTokenReference { /** The token definition */ token: TokenDefinition; /** Scope for this pipe (READ only) */ scope: PipeTokenScope; } /** * Token configuration for pipe access. * Can be either an inline definition or a reference to a defined token. */ export type PipeTokenConfig = InlinePipeTokenConfig | PipeTokenReference; /** * Options for defining a pipe (reusable SQL logic, no endpoint) */ export interface PipeOptions { /** Human-readable description of the pipe */ description?: string; /** Parameter definitions for query inputs */ params?: TParams; /** Nodes in the transformation pipeline */ nodes: readonly NodeDefinition[]; /** Output schema (optional for reusable pipes, required for endpoints) */ output?: TOutput; /** Whether this pipe is an API endpoint (shorthand for { enabled: true }). Mutually exclusive with materialized, copy, and sink. */ endpoint?: boolean | EndpointConfig; /** Materialized view configuration. Mutually exclusive with endpoint, copy, and sink. */ materialized?: MaterializedConfig; /** Copy pipe configuration. Mutually exclusive with endpoint and materialized. */ copy?: CopyConfig; /** Access tokens for this pipe */ tokens?: readonly PipeTokenConfig[]; } /** * Options for defining a sink pipe */ export interface SinkPipeOptions { /** Human-readable description of the sink pipe */ description?: string; /** Parameter definitions for query inputs */ params?: TParams; /** Nodes in the transformation pipeline */ nodes: readonly NodeDefinition[]; /** Sink export configuration */ sink: SinkConfig; /** Sink pipes are not endpoints */ endpoint?: never; /** Sink pipes are not materialized views */ materialized?: never; /** Sink pipes are not copy pipes */ copy?: never; /** Access tokens for this sink pipe */ tokens?: readonly PipeTokenConfig[]; } type PipeRuntimeOptions = (PipeOptions & { sink?: never; }) | SinkPipeOptions; /** * Options for defining an endpoint (API-exposed pipe) */ export interface EndpointOptions { /** Human-readable description of the endpoint */ description?: string; /** Parameter definitions for query inputs */ params?: TParams; /** Nodes in the transformation pipeline */ nodes: readonly NodeDefinition[]; /** Output schema (required for type safety) */ output: TOutput; /** Cache configuration */ cache?: { /** Whether caching is enabled */ enabled: boolean; /** Cache TTL in seconds */ ttl?: number; }; /** Access tokens for this endpoint */ tokens?: readonly PipeTokenConfig[]; } /** * Options for defining a copy pipe */ export interface CopyPipeOptions> { /** Human-readable description of the copy pipe */ description?: string; /** Nodes in the transformation pipeline */ nodes: readonly NodeDefinition[]; /** Target datasource where copied data is written */ datasource: TDatasource; /** * Copy mode: how data is ingested * - 'append': Appends the result to the target data source (default) * - 'replace': Every run completely replaces the destination Data Source content */ copy_mode?: "append" | "replace"; /** * Copy schedule: when the copy job runs * - A cron expression (e.g., "0 * * * *" for hourly) * - "@on-demand" for manual execution only * Defaults to "@on-demand" if not specified */ copy_schedule?: string; /** Access tokens for this copy pipe */ tokens?: readonly PipeTokenConfig[]; } /** * A pipe definition with full type information */ export interface PipeDefinition { readonly [PIPE_BRAND]: true; /** Pipe name */ readonly _name: string; /** Type marker for inference */ readonly _type: "pipe"; /** Parameter definitions */ readonly _params: TParams; /** Output schema (optional for reusable pipes) */ readonly _output?: TOutput; /** Full options */ readonly options: PipeRuntimeOptions; } export declare function definePipe(name: string, options: PipeOptions): PipeDefinition; /** * Define a Tinybird sink pipe * * Sink pipes export query results to external systems via Kafka or S3. * * @param name - The sink pipe name (must be valid identifier) * @param options - Sink pipe configuration * @returns A pipe definition configured as a sink pipe */ export declare function defineSinkPipe(name: string, options: SinkPipeOptions): PipeDefinition>; /** * Options for defining a materialized view */ export interface MaterializedViewOptions> { /** Human-readable description of the materialized view */ description?: string; /** Nodes in the transformation pipeline */ nodes: readonly NodeDefinition[]; /** Target datasource where materialized data is written */ datasource: TDatasource; /** * Deployment method for materialized views. * Use 'alter' to update existing materialized views using ALTER TABLE ... MODIFY QUERY * instead of recreating the table. This preserves existing data and reduces deployment time. */ deploymentMethod?: "alter"; /** Access tokens for this pipe */ tokens?: readonly PipeTokenConfig[]; } /** * Helper type to extract the output definition from a datasource schema */ type DatasourceSchemaToOutput = { [K in keyof TSchema]: TSchema[K] extends ColumnDefinition ? V : TSchema[K] extends AnyTypeValidator ? TSchema[K] : never; }; /** * Define a Tinybird materialized view * * This is a convenience function that simplifies creating materialized views. * The output schema is automatically derived from the target datasource, ensuring * type safety between the pipe output and the target. * * @param name - The pipe name (must be valid identifier) * @param options - Materialized view configuration * @returns A pipe definition configured as a materialized view * * @example * ```ts * import { defineDatasource, defineMaterializedView, node, t, engine } from '@tinybirdco/sdk'; * * // Target datasource for aggregated data * const salesByHour = defineDatasource('sales_by_hour', { * schema: { * day: t.date(), * country: t.string().lowCardinality(), * total_sales: t.simpleAggregateFunction('sum', t.uint64()), * }, * engine: engine.aggregatingMergeTree({ * sortingKey: ['day', 'country'], * }), * }); * * // Materialized view - output schema is inferred from datasource * export const salesByHourMv = defineMaterializedView('sales_by_hour_mv', { * description: 'Aggregate sales per hour', * datasource: salesByHour, * nodes: [ * node({ * name: 'daily_sales', * sql: ` * SELECT * toStartOfDay(starting_date) as day, * country, * sum(sales) as total_sales * FROM teams * GROUP BY day, country * `, * }), * ], * deploymentMethod: 'alter', // optional * }); * ``` */ export declare function defineMaterializedView>(name: string, options: MaterializedViewOptions): PipeDefinition, DatasourceSchemaToOutput>; /** * Define a Tinybird endpoint * * This is a convenience function for creating API endpoints. * Endpoints are pipes that are exposed as HTTP API endpoints. * * @param name - The endpoint name (must be valid identifier) * @param options - Endpoint configuration including params, nodes, and output schema * @returns A pipe definition configured as an endpoint * * @example * ```ts * import { defineEndpoint, node, p, t } from '@tinybirdco/sdk'; * * export const topEvents = defineEndpoint('top_events', { * description: 'Get top events by count', * params: { * start_date: p.dateTime(), * end_date: p.dateTime(), * limit: p.int32().optional(10), * }, * nodes: [ * node({ * name: 'aggregated', * sql: ` * SELECT * event_type, * count() as event_count * FROM events * WHERE timestamp BETWEEN {{DateTime(start_date)}} AND {{DateTime(end_date)}} * GROUP BY event_type * ORDER BY event_count DESC * LIMIT {{Int32(limit, 10)}} * `, * }), * ], * output: { * event_type: t.string(), * event_count: t.uint64(), * }, * }); * ``` */ export declare function defineEndpoint(name: string, options: EndpointOptions): PipeDefinition; /** * Define a Tinybird copy pipe * * Copy pipes capture the result of a pipe at a moment in time and write * the result into a target data source. They can be run on a schedule, * or executed on demand. * * Unlike materialized views which continuously update as new events are inserted, * copy pipes generate a single snapshot at a specific point in time. * * @param name - The copy pipe name (must be valid identifier) * @param options - Copy pipe configuration * @returns A pipe definition configured as a copy pipe * * @example * ```ts * import { defineCopyPipe, defineDatasource, node, t, engine } from '@tinybirdco/sdk'; * * // Target datasource for daily snapshots * const dailySalesSnapshot = defineDatasource('daily_sales_snapshot', { * schema: { * snapshot_date: t.date(), * country: t.string(), * total_sales: t.uint64(), * }, * engine: engine.mergeTree({ * sortingKey: ['snapshot_date', 'country'], * }), * }); * * // Copy pipe that runs daily at midnight * export const dailySalesCopy = defineCopyPipe('daily_sales_copy', { * description: 'Daily snapshot of sales by country', * datasource: dailySalesSnapshot, * copy_schedule: '0 0 * * *', // Daily at midnight UTC * copy_mode: 'append', * nodes: [ * node({ * name: 'snapshot', * sql: ` * SELECT * today() AS snapshot_date, * country, * sum(sales) AS total_sales * FROM sales * WHERE date = today() - 1 * GROUP BY country * `, * }), * ], * }); * ``` */ export declare function defineCopyPipe>(name: string, options: CopyPipeOptions): PipeDefinition, DatasourceSchemaToOutput>; /** * Check if a value is a pipe definition */ export declare function isPipeDefinition(value: unknown): value is PipeDefinition; /** * Get the endpoint configuration from a pipe */ export declare function getEndpointConfig(pipe: PipeDefinition): EndpointConfig | null; /** * Get the materialized view configuration from a pipe */ export declare function getMaterializedConfig(pipe: PipeDefinition): MaterializedConfig | null; /** * Check if a pipe is a materialized view */ export declare function isMaterializedView(pipe: PipeDefinition): boolean; /** * Get the copy pipe configuration from a pipe */ export declare function getCopyConfig(pipe: PipeDefinition): CopyConfig | null; /** * Check if a pipe is a copy pipe */ export declare function isCopyPipe(pipe: PipeDefinition): boolean; /** * Get the sink configuration from a pipe */ export declare function getSinkConfig(pipe: PipeDefinition): SinkConfig | null; /** * Check if a pipe is a sink pipe */ export declare function isSinkPipe(pipe: PipeDefinition): boolean; /** * Get all node names from a pipe */ export declare function getNodeNames(pipe: PipeDefinition): string[]; /** * Get a specific node by name */ export declare function getNode(pipe: PipeDefinition, name: string): NodeDefinition | undefined; /** * Helper type to extract params from a pipe definition */ export type ExtractParams = T extends PipeDefinition ? P : never; /** * Helper type to extract output from a pipe definition */ export type ExtractOutput = T extends PipeDefinition ? O : never; /** * SQL template helper for referencing datasources and other nodes * This is a simple helper - for complex templating, use raw strings * * @example * ```ts * import { sql, events } from './datasources/events'; * * const query = sql`SELECT * FROM ${events} WHERE id = 1`; * // Results in: "SELECT * FROM events WHERE id = 1" * ``` */ export declare function sql(strings: TemplateStringsArray, ...values: (DatasourceDefinition | NodeDefinition | string | number)[]): string; export {}; //# sourceMappingURL=pipe.d.ts.map