import * as fs from "node:fs"; import * as os from "node:os"; import * as path from "node:path"; import { afterEach, describe, expect, it } from "vitest"; import { buildFromInclude } from "../../generator/index.js"; import { runMigrate } from "./migrate.js"; function writeFile(dir: string, relativePath: string, content: string): void { const fullPath = path.join(dir, relativePath); fs.mkdirSync(path.dirname(fullPath), { recursive: true }); fs.writeFileSync(fullPath, content); } const EXPECTED_COMPLEX_OUTPUT = `/** * Generated by tinybird migrate. * Review endpoint output schemas and any defaults before production use. */ import { defineKafkaConnection, defineDatasource, definePipe, defineMaterializedView, defineCopyPipe, node, t, engine, p } from "@tinybirdco/sdk"; // Connections export const stream = defineKafkaConnection("stream", { bootstrapServers: "localhost:9092", securityProtocol: "SASL_SSL", saslMechanism: "PLAIN", key: "api-key", secret: "api-secret", sslCaPem: "ca-pem-content", }); // Datasources /** * Events from Kafka stream */ export const events = defineDatasource("events", { description: "Events from Kafka stream", schema: { event_id: t.string().jsonPath("$.event_id"), user_id: t.uint64().jsonPath("$.user.id"), env: t.string().default("prod").jsonPath("$.env"), is_test: t.bool().default(false).jsonPath("$.meta.is_test"), updated_at: t.dateTime().jsonPath("$.updated_at"), payload: t.string().default("{}").codec("ZSTD(1)").jsonPath("$.payload"), }, engine: engine.replacingMergeTree({ sortingKey: ["event_id", "user_id"], partitionKey: "toYYYYMM(updated_at)", primaryKey: "event_id", ttl: "updated_at + toIntervalDay(30)", ver: "updated_at", settings: { "index_granularity": 8192, "enable_mixed_granularity_parts": true } }), kafka: { connection: stream, topic: "events_topic", groupId: "events-consumer", autoOffsetReset: "earliest", }, forwardQuery: \` SELECT * FROM events_mv \`, tokens: [ { name: "events_read", permissions: ["READ"] }, { name: "events_append", permissions: ["APPEND"] }, ], sharedWith: ["workspace_a", "workspace_b"], }); export const eventsRollup = defineDatasource("events_rollup", { jsonPaths: false, schema: { user_id: t.uint64(), total: t.uint64(), }, engine: engine.summingMergeTree({ sortingKey: "user_id", columns: ["total"] }), }); // Pipes export const copyEvents = defineCopyPipe("copy_events", { datasource: eventsRollup, copy_mode: "replace", copy_schedule: "@on-demand", nodes: [ node({ name: "copy_node", sql: \` SELECT event_id, user_id FROM events \`, }), ], tokens: [ { name: "copy_token" }, ], }); /** * Endpoint for filtered events */ export const eventsEndpoint = definePipe("events_endpoint", { description: "Endpoint for filtered events", params: { env: p.string().optional("prod"), user_id: p.uint64(), }, nodes: [ node({ name: "base", description: "Base filter", sql: \` SELECT event_id, user_id, payload FROM events WHERE user_id = {{UInt64(user_id)}} AND env = {{ String(env) }} \`, }), node({ name: "endpoint", sql: \` SELECT event_id AS event_id, user_id AS user_id FROM base \`, }), ], endpoint: { enabled: true, cache: { enabled: true, ttl: 120 } }, output: { event_id: t.string(), user_id: t.string(), }, tokens: [ { name: "endpoint_token" }, ], }); /** * Materialized rollup */ export const eventsMv = defineMaterializedView("events_mv", { description: "Materialized rollup", datasource: eventsRollup, deploymentMethod: "alter", nodes: [ node({ name: "rollup", sql: \` SELECT user_id, count() AS total FROM events GROUP BY user_id \`, }), ], tokens: [ { name: "mv_token" }, ], }); export const statsPipe = definePipe("stats_pipe", { params: { min_total: p.uint32().optional(10), }, nodes: [ node({ name: "agg", sql: \` SELECT user_id, count() AS total FROM events GROUP BY user_id \`, }), node({ name: "final", sql: \` SELECT user_id, total FROM agg WHERE total > {{ UInt32(min_total) }} \`, }), ], tokens: [ { name: "stats_token" }, ], }); `; const EXPECTED_PARTIAL_OUTPUT = `/** * Generated by tinybird migrate. * Review endpoint output schemas and any defaults before production use. */ import { defineKafkaConnection, defineDatasource, definePipe, defineMaterializedView, defineCopyPipe, node, t, engine, p } from "@tinybirdco/sdk"; // Connections export const stream = defineKafkaConnection("stream", { bootstrapServers: "localhost:9092", }); // Datasources export const events = defineDatasource("events", { jsonPaths: false, schema: { event_id: t.string(), user_id: t.uint64(), created_at: t.dateTime(), }, engine: engine.mergeTree({ sortingKey: "event_id" }), kafka: { connection: stream, topic: "events_topic", }, }); // Pipes export const eventsEndpoint = definePipe("events_endpoint", { params: { user_id: p.uint64(), }, nodes: [ node({ name: "source", sql: \` SELECT event_id, user_id FROM events \`, }), node({ name: "endpoint", sql: \` SELECT event_id AS event_id, user_id AS user_id FROM source WHERE user_id = {{UInt64(user_id)}} \`, }), ], endpoint: true, output: { event_id: t.string(), user_id: t.string(), }, tokens: [ { name: "endpoint_token" }, ], }); `; describe("runMigrate", () => { const tempDirs: string[] = []; afterEach(() => { for (const dir of tempDirs) { try { fs.rmSync(dir, { recursive: true }); } catch { // Ignore cleanup failures } } tempDirs.length = 0; }); it("migrates complex resources including endpoint, materialized, and copy pipes", async () => { const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "tinybird-migrate-")); tempDirs.push(tempDir); writeFile( tempDir, "stream.connection", `TYPE kafka KAFKA_BOOTSTRAP_SERVERS localhost:9092 KAFKA_SECURITY_PROTOCOL SASL_SSL KAFKA_SASL_MECHANISM PLAIN KAFKA_KEY api-key KAFKA_SECRET api-secret KAFKA_SSL_CA_PEM ca-pem-content ` ); writeFile( tempDir, "events.datasource", `DESCRIPTION > Events from Kafka stream SCHEMA > event_id String \`json:$.event_id\`, user_id UInt64 \`json:$.user.id\`, env String \`json:$.env\` DEFAULT 'prod', is_test Bool \`json:$.meta.is_test\` DEFAULT 0, updated_at DateTime \`json:$.updated_at\`, payload String \`json:$.payload\` DEFAULT '{}' CODEC(ZSTD(1)) ENGINE "ReplacingMergeTree" ENGINE_SORTING_KEY "event_id, user_id" ENGINE_PARTITION_KEY "toYYYYMM(updated_at)" ENGINE_PRIMARY_KEY "event_id" ENGINE_TTL "updated_at + toIntervalDay(30)" ENGINE_VER "updated_at" ENGINE_SETTINGS "index_granularity=8192, enable_mixed_granularity_parts=true" KAFKA_CONNECTION_NAME stream KAFKA_TOPIC events_topic KAFKA_GROUP_ID events-consumer KAFKA_AUTO_OFFSET_RESET earliest TOKEN events_read READ TOKEN events_append APPEND SHARED_WITH > workspace_a, workspace_b FORWARD_QUERY > SELECT * FROM events_mv ` ); writeFile( tempDir, "events_rollup.datasource", `SCHEMA > user_id UInt64, total UInt64 ENGINE "SummingMergeTree" ENGINE_SORTING_KEY "user_id" ENGINE_SUMMING_COLUMNS "total" ` ); writeFile( tempDir, "events_endpoint.pipe", `DESCRIPTION > Endpoint for filtered events NODE base DESCRIPTION > Base filter SQL > % SELECT event_id, user_id, payload FROM events WHERE user_id = {{UInt64(user_id)}} AND env = {{String(env, 'prod')}} NODE endpoint SQL > SELECT event_id AS event_id, user_id AS user_id FROM base TYPE endpoint CACHE 120 TOKEN endpoint_token READ ` ); writeFile( tempDir, "events_mv.pipe", `DESCRIPTION > Materialized rollup NODE rollup SQL > SELECT user_id, count() AS total FROM events GROUP BY user_id TYPE MATERIALIZED DATASOURCE events_rollup DEPLOYMENT_METHOD alter TOKEN mv_token READ ` ); writeFile( tempDir, "copy_events.pipe", `NODE copy_node SQL > SELECT event_id, user_id FROM events TYPE COPY TARGET_DATASOURCE events_rollup COPY_SCHEDULE @on-demand COPY_MODE replace TOKEN copy_token READ ` ); writeFile( tempDir, "stats_pipe.pipe", `NODE agg SQL > SELECT user_id, count() AS total FROM events GROUP BY user_id NODE final SQL > SELECT user_id, total FROM agg WHERE total > {{UInt32(min_total, 10)}} TOKEN stats_token READ ` ); const result = await runMigrate({ cwd: tempDir, patterns: ["."], strict: true, }); expect(result.success).toBe(true); expect(result.errors).toHaveLength(0); expect(result.migrated).toHaveLength(7); expect(result.migrated.filter((resource) => resource.kind === "connection")).toHaveLength(1); expect(result.migrated.filter((resource) => resource.kind === "datasource")).toHaveLength(2); expect(result.migrated.filter((resource) => resource.kind === "pipe")).toHaveLength(4); expect(path.basename(result.outputPath)).toBe("tinybird.migration.ts"); expect(fs.existsSync(result.outputPath)).toBe(true); const output = fs.readFileSync(result.outputPath, "utf-8"); expect(output).toBe(EXPECTED_COMPLEX_OUTPUT); }); it("continues processing and reports all errors while writing complex migratable resources", async () => { const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "tinybird-migrate-")); tempDirs.push(tempDir); writeFile( tempDir, "stream.connection", `TYPE kafka KAFKA_BOOTSTRAP_SERVERS localhost:9092 ` ); writeFile( tempDir, "events.datasource", `SCHEMA > event_id String, user_id UInt64, created_at DateTime ENGINE "MergeTree" ENGINE_SORTING_KEY "event_id" KAFKA_CONNECTION_NAME stream KAFKA_TOPIC events_topic ` ); writeFile( tempDir, "events_endpoint.pipe", `NODE source SQL > SELECT event_id, user_id FROM events NODE endpoint SQL > SELECT event_id AS event_id, user_id AS user_id FROM source WHERE user_id = {{UInt64(user_id)}} TYPE endpoint TOKEN endpoint_token READ ` ); writeFile( tempDir, "events_mv.pipe", `NODE rollup SQL > SELECT user_id, count() AS total FROM events GROUP BY user_id TYPE MATERIALIZED DATASOURCE missing_ds ` ); writeFile( tempDir, "broken.pipe", `NODE broken SQL > SELECT * FROM events TYPE endpoint UNSUPPORTED_DIRECTIVE true ` ); const result = await runMigrate({ cwd: tempDir, patterns: ["."], strict: true, }); expect(result.success).toBe(false); expect(result.errors).toHaveLength(2); expect(result.errors.map((error) => error.message)).toEqual( expect.arrayContaining([ 'Unsupported pipe directive in strict mode: "UNSUPPORTED_DIRECTIVE true"', 'Materialized pipe references missing/unmigrated datasource "missing_ds".', ]) ); expect(result.migrated.filter((resource) => resource.kind === "connection")).toHaveLength(1); expect(result.migrated.filter((resource) => resource.kind === "datasource")).toHaveLength(1); expect(result.migrated.filter((resource) => resource.kind === "pipe")).toHaveLength(1); expect(fs.existsSync(result.outputPath)).toBe(true); const output = fs.readFileSync(result.outputPath, "utf-8"); expect(output).toBe(EXPECTED_PARTIAL_OUTPUT); }); it("returns exact output content in dry-run mode for complex migratable resources", async () => { const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "tinybird-migrate-")); tempDirs.push(tempDir); writeFile( tempDir, "stream.connection", `TYPE kafka KAFKA_BOOTSTRAP_SERVERS localhost:9092 ` ); writeFile( tempDir, "events.datasource", `SCHEMA > event_id String, user_id UInt64, created_at DateTime ENGINE "MergeTree" ENGINE_SORTING_KEY "event_id" KAFKA_CONNECTION_NAME stream KAFKA_TOPIC events_topic ` ); writeFile( tempDir, "events_endpoint.pipe", `NODE source SQL > SELECT event_id, user_id FROM events NODE endpoint SQL > SELECT event_id AS event_id, user_id AS user_id FROM source WHERE user_id = {{UInt64(user_id)}} TYPE endpoint TOKEN endpoint_token READ ` ); const result = await runMigrate({ cwd: tempDir, patterns: ["."], strict: true, dryRun: true, }); expect(result.success).toBe(true); expect(result.errors).toHaveLength(0); expect(result.migrated).toHaveLength(3); expect(result.outputContent).toBe(EXPECTED_PARTIAL_OUTPUT); expect(fs.existsSync(result.outputPath)).toBe(false); }); it("migrates s3 connection and import datasource directives", async () => { const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "tinybird-migrate-")); tempDirs.push(tempDir); writeFile( tempDir, "s3sample.connection", `TYPE s3 S3_REGION "us-east-1" S3_ARN "arn:aws:iam::123456789012:role/tinybird-s3-access" ` ); writeFile( tempDir, "events_landing.datasource", `SCHEMA > timestamp DateTime, session_id String ENGINE "MergeTree" ENGINE_SORTING_KEY "timestamp" IMPORT_CONNECTION_NAME s3sample IMPORT_BUCKET_URI s3://my-bucket/events/*.csv IMPORT_SCHEDULE @auto IMPORT_FROM_TIMESTAMP 2024-01-01T00:00:00Z ` ); const result = await runMigrate({ cwd: tempDir, patterns: ["."], strict: true, }); expect(result.success).toBe(true); expect(result.errors).toHaveLength(0); expect(result.migrated.filter((resource) => resource.kind === "connection")).toHaveLength(1); expect(result.migrated.filter((resource) => resource.kind === "datasource")).toHaveLength(1); const output = fs.readFileSync(result.outputPath, "utf-8"); expect(output).toContain("defineS3Connection"); expect(output).toContain('export const s3sample = defineS3Connection("s3sample", {'); expect(output).toContain('region: "us-east-1"'); expect(output).toContain( 'arn: "arn:aws:iam::123456789012:role/tinybird-s3-access"' ); expect(output).toContain("s3: {"); expect(output).toContain("connection: s3sample"); expect(output).toContain('bucketUri: "s3://my-bucket/events/*.csv"'); expect(output).toContain('schedule: "@auto"'); expect(output).toContain('fromTimestamp: "2024-01-01T00:00:00Z"'); }); it("migrates gcs connection and import datasource directives", async () => { const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "tinybird-migrate-")); tempDirs.push(tempDir); writeFile( tempDir, "gcsample.connection", `TYPE gcs GCS_SERVICE_ACCOUNT_CREDENTIALS_JSON {{ tb_secret("GCS_SERVICE_ACCOUNT_CREDENTIALS_JSON") }} ` ); writeFile( tempDir, "events_gcs_landing.datasource", `SCHEMA > timestamp DateTime, session_id String ENGINE "MergeTree" ENGINE_SORTING_KEY "timestamp" IMPORT_CONNECTION_NAME gcsample IMPORT_BUCKET_URI gs://my-gcs-bucket/events/*.csv IMPORT_SCHEDULE @auto IMPORT_FROM_TIMESTAMP 2024-01-01T00:00:00Z ` ); const result = await runMigrate({ cwd: tempDir, patterns: ["."], strict: true, }); expect(result.success).toBe(true); expect(result.errors).toHaveLength(0); expect(result.migrated.filter((resource) => resource.kind === "connection")).toHaveLength(1); expect(result.migrated.filter((resource) => resource.kind === "datasource")).toHaveLength(1); const output = fs.readFileSync(result.outputPath, "utf-8"); expect(output).toContain("defineGCSConnection"); expect(output).toContain('export const gcsample = defineGCSConnection("gcsample", {'); expect(output).toContain( 'serviceAccountCredentialsJson: secret("GCS_SERVICE_ACCOUNT_CREDENTIALS_JSON"),' ); expect(output).toContain("gcs: {"); expect(output).toContain("connection: gcsample"); expect(output).toContain('bucketUri: "gs://my-gcs-bucket/events/*.csv"'); expect(output).toContain('schedule: "@auto"'); expect(output).toContain('fromTimestamp: "2024-01-01T00:00:00Z"'); }); it("migrates single-quoted gcs import directives", async () => { const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "tinybird-migrate-")); tempDirs.push(tempDir); writeFile( tempDir, "gcp_billing_reports.connection", `TYPE gcs GCS_SERVICE_ACCOUNT_CREDENTIALS_JSON {{tb_secret("GCP_SERVICE_ACCOUNT_CREDENTIALS_JSON")}} ` ); writeFile( tempDir, "gcp_production_billing_account_landing.datasource", `SCHEMA > usage_start_time DateTime64(3) ENGINE "MergeTree" ENGINE_SORTING_KEY "usage_start_time" IMPORT_CONNECTION_NAME 'gcp_billing_reports' IMPORT_BUCKET_URI 'gs://tinybird-oa-sot-fwd/gcp_production_billing_reports/billing_*.csv.gz' IMPORT_SCHEDULE '@on-demand' ` ); const result = await runMigrate({ cwd: tempDir, patterns: ["."], strict: true, }); expect(result.success).toBe(true); expect(result.errors).toHaveLength(0); const output = fs.readFileSync(result.outputPath, "utf-8"); expect(output).toContain( 'export const gcpBillingReports = defineGCSConnection("gcp_billing_reports", {' ); expect(output).toContain("gcs: {"); expect(output).toContain("connection: gcpBillingReports"); expect(output).toContain( 'bucketUri: "gs://tinybird-oa-sot-fwd/gcp_production_billing_reports/billing_*.csv.gz"' ); expect(output).toContain('schedule: "@on-demand"'); }); it("migrates dynamodb connection and import datasource directives", async () => { const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "tinybird-migrate-")); tempDirs.push(tempDir); writeFile( tempDir, "dynamo_sample.connection", `TYPE dynamodb DYNAMODB_ARN {{ tb_secret("DYNAMODB_ROLE_ARN") }} DYNAMODB_REGION us-east-1 ` ); writeFile( tempDir, "events_dynamo.datasource", `SCHEMA > id String \`json:$.Id\`, _record String \`json:$.NewImage\` ENGINE "ReplacingMergeTree" ENGINE_SORTING_KEY "id" IMPORT_CONNECTION_NAME dynamo_sample IMPORT_TABLE_ARN arn:aws:dynamodb:us-east-1:123456789012:table/events IMPORT_EXPORT_BUCKET s3://my-export-bucket ` ); const result = await runMigrate({ cwd: tempDir, patterns: ["."], strict: true, }); expect(result.success).toBe(true); expect(result.errors).toHaveLength(0); expect(result.migrated.filter((resource) => resource.kind === "connection")).toHaveLength(1); expect(result.migrated.filter((resource) => resource.kind === "datasource")).toHaveLength(1); const output = fs.readFileSync(result.outputPath, "utf-8"); expect(output).toContain("defineDynamoDBConnection"); expect(output).toContain('export const dynamoSample = defineDynamoDBConnection("dynamo_sample", {'); expect(output).toContain('region: "us-east-1"'); expect(output).toContain('arn: secret("DYNAMODB_ROLE_ARN")'); expect(output).toContain("dynamodb: {"); expect(output).toContain("connection: dynamoSample"); expect(output).toContain( 'tableArn: "arn:aws:dynamodb:us-east-1:123456789012:table/events"' ); expect(output).toContain('exportBucket: "s3://my-export-bucket"'); }); it("reports an error when DynamoDB directives use a non-dynamodb connection type", async () => { const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "tinybird-migrate-")); tempDirs.push(tempDir); writeFile( tempDir, "s3sample.connection", `TYPE s3 S3_REGION us-east-1 S3_ARN "arn:aws:iam::123456789012:role/tinybird-s3-access" ` ); writeFile( tempDir, "events_dynamo.datasource", `SCHEMA > id String ENGINE "ReplacingMergeTree" ENGINE_SORTING_KEY "id" IMPORT_CONNECTION_NAME s3sample IMPORT_TABLE_ARN arn:aws:dynamodb:us-east-1:123456789012:table/events IMPORT_EXPORT_BUCKET s3://my-export-bucket ` ); const result = await runMigrate({ cwd: tempDir, patterns: ["."], strict: true, }); expect(result.success).toBe(false); expect( result.errors.some((error) => error.message.includes( 'Datasource DynamoDB ingestion requires a dynamodb connection, found "s3".' ) ) ).toBe(true); }); it("reports an error when import directives use a non-bucket connection type", async () => { const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "tinybird-migrate-")); tempDirs.push(tempDir); writeFile( tempDir, "stream.connection", `TYPE kafka KAFKA_BOOTSTRAP_SERVERS localhost:9092 ` ); writeFile( tempDir, "events_landing.datasource", `SCHEMA > id String ENGINE "MergeTree" ENGINE_SORTING_KEY "id" IMPORT_CONNECTION_NAME stream IMPORT_BUCKET_URI gs://my-gcs-bucket/events/*.csv ` ); const result = await runMigrate({ cwd: tempDir, patterns: ["."], strict: true, }); expect(result.success).toBe(false); expect(result.errors.map((error) => error.message)).toEqual( expect.arrayContaining([ 'Datasource import directives require an s3 or gcs connection, found "kafka".', ]) ); }); it("migrates KAFKA_STORE_RAW_VALUE datasource directive", async () => { const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "tinybird-migrate-")); tempDirs.push(tempDir); writeFile( tempDir, "stream.connection", `TYPE kafka KAFKA_BOOTSTRAP_SERVERS localhost:9092 ` ); writeFile( tempDir, "events.datasource", `SCHEMA > event_id String ENGINE "MergeTree" ENGINE_SORTING_KEY "event_id" KAFKA_CONNECTION_NAME stream KAFKA_TOPIC events_topic KAFKA_STORE_RAW_VALUE True ` ); const result = await runMigrate({ cwd: tempDir, patterns: ["."], strict: true, }); expect(result.success).toBe(true); expect(result.errors).toHaveLength(0); const output = fs.readFileSync(result.outputPath, "utf-8"); expect(output).toContain("kafka: {"); expect(output).toContain("connection: stream"); expect(output).toContain('topic: "events_topic"'); expect(output).toContain("storeRawValue: true"); }); it("migrates INDEXES datasource block", async () => { const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "tinybird-migrate-")); tempDirs.push(tempDir); writeFile( tempDir, "events.datasource", `SCHEMA > id String, pipe_name String ENGINE "MergeTree" ENGINE_SORTING_KEY "id" INDEXES > pipe_name_set pipe_name TYPE set(100) GRANULARITY 1 id_bf lower(id) TYPE bloom_filter(0.001) GRANULARITY 4 ` ); const result = await runMigrate({ cwd: tempDir, patterns: ["."], strict: true, }); expect(result.success).toBe(true); expect(result.errors).toHaveLength(0); const output = fs.readFileSync(result.outputPath, "utf-8"); expect(output).toContain("indexes: ["); expect(output).toContain( '{ name: "pipe_name_set", expr: "pipe_name", type: "set(100)", granularity: 1 }' ); expect(output).toContain( '{ name: "id_bf", expr: "lower(id)", type: "bloom_filter(0.001)", granularity: 4 }' ); }); it("migrates kafka schema registry and engine is deleted directives", async () => { const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "tinybird-migrate-")); tempDirs.push(tempDir); writeFile( tempDir, "stream.connection", `TYPE kafka KAFKA_BOOTSTRAP_SERVERS kafka.example.com:9092 KAFKA_SECURITY_PROTOCOL SASL_SSL KAFKA_SASL_MECHANISM OAUTHBEARER KAFKA_SASL_OAUTHBEARER_METHOD AWS KAFKA_SASL_OAUTHBEARER_AWS_REGION eu-west-1 KAFKA_SASL_OAUTHBEARER_AWS_ROLE_ARN {{ tb_secret("KAFKA_AWS_ROLE_ARN") }} KAFKA_SASL_OAUTHBEARER_AWS_EXTERNAL_ID {{ tb_secret("KAFKA_AWS_EXTERNAL_ID") }} KAFKA_SCHEMA_REGISTRY_URL https://registry-user:registry-pass@registry.example.com # Optional registry auth details ` ); writeFile( tempDir, "events.datasource", `SCHEMA > event_id String ENGINE "MergeTree" ENGINE_SORTING_KEY "event_id" KAFKA_CONNECTION_NAME stream KAFKA_TOPIC events_topic KAFKA_STORE_RAW_VALUE True ` ); writeFile( tempDir, "events_state.datasource", `SCHEMA > # logical delete marker _is_deleted UInt8, event_id String, version_ts DateTime ENGINE "ReplacingMergeTree" ENGINE_SORTING_KEY "event_id" ENGINE_VER "version_ts" ENGINE_IS_DELETED "_is_deleted" ` ); writeFile( tempDir, "events_state_mv.pipe", `NODE latest SQL > SELECT toUInt8(0) AS _is_deleted, event_id, now() AS version_ts FROM events # materialized definition TYPE MATERIALIZED DATASOURCE events_state ` ); const result = await runMigrate({ cwd: tempDir, patterns: ["."], strict: true, }); expect(result.success).toBe(true); expect(result.errors).toHaveLength(0); const output = fs.readFileSync(result.outputPath, "utf-8"); expect(output).toContain( 'schemaRegistryUrl: "https://registry-user:registry-pass@registry.example.com"' ); expect(output).toContain('saslMechanism: "OAUTHBEARER"'); expect(output).toContain('saslOauthbearerMethod: "AWS"'); expect(output).toContain('saslOauthbearerAwsRegion: "eu-west-1"'); expect(output).toContain('saslOauthbearerAwsRoleArn: secret("KAFKA_AWS_ROLE_ARN")'); expect(output).toContain('saslOauthbearerAwsExternalId: secret("KAFKA_AWS_EXTERNAL_ID")'); expect(output).toContain("storeRawValue: true"); expect(output).toContain( 'engine: engine.replacingMergeTree({ sortingKey: "event_id", ver: "version_ts", isDeleted: "_is_deleted" })' ); }); it("migrates TYPE copy in lowercase", async () => { const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "tinybird-migrate-")); tempDirs.push(tempDir); writeFile( tempDir, "copy_target.datasource", `SCHEMA > id String ENGINE "MergeTree" ENGINE_SORTING_KEY "id" ` ); writeFile( tempDir, "copy_lower.pipe", `NODE copy_node SQL > SELECT id FROM copy_target TYPE copy TARGET_DATASOURCE copy_target ` ); const result = await runMigrate({ cwd: tempDir, patterns: ["."], strict: true, }); expect(result.success).toBe(true); expect(result.errors).toHaveLength(0); const output = fs.readFileSync(result.outputPath, "utf-8"); expect(output).toContain('export const copyLower = defineCopyPipe("copy_lower", {'); expect(output).toContain("datasource: copyTarget,"); }); it("migrates TYPE ENDPOINT in uppercase", async () => { const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "tinybird-migrate-")); tempDirs.push(tempDir); writeFile( tempDir, "endpoint_upper.pipe", `NODE endpoint SQL > SELECT 1 AS id FROM system.numbers LIMIT 1 TYPE ENDPOINT ` ); const result = await runMigrate({ cwd: tempDir, patterns: ["."], strict: true, }); expect(result.success).toBe(true); expect(result.errors).toHaveLength(0); const output = fs.readFileSync(result.outputPath, "utf-8"); expect(output).toContain('export const endpointUpper = definePipe("endpoint_upper", {'); expect(output).toContain("endpoint: true,"); }); it("supports Int/Integer aliases and column placeholder params", async () => { const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "tinybird-migrate-")); tempDirs.push(tempDir); writeFile( tempDir, "placeholder_aliases.pipe", `NODE endpoint SQL > SELECT event_id AS event_id FROM events WHERE score >= {{int(min_score, 10)}} ORDER BY {{column(sort_col)}} DESC LIMIT {{ Integer(limit, 50) }} TYPE endpoint ` ); const result = await runMigrate({ cwd: tempDir, patterns: ["."], strict: true, }); expect(result.success).toBe(true); expect(result.errors).toHaveLength(0); const output = fs.readFileSync(result.outputPath, "utf-8"); expect(output).toContain('export const placeholderAliases = definePipe("placeholder_aliases", {'); expect(output).toContain("params: {"); expect(output).toContain("limit: p.int32().optional(50),"); expect(output).toContain("min_score: p.int32().optional(10),"); expect(output).toContain("sort_col: p.column(),"); }); it("supports error() placeholder helper without treating it as a param", async () => { const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "tinybird-migrate-")); tempDirs.push(tempDir); writeFile( tempDir, "placeholder_error.pipe", `NODE endpoint SQL > % {% if not defined(start_date) and not defined(end_date) %} {{ error('start_date and end_date are required parameters') }} {% end %} SELECT 1 AS id WHERE now() >= {{DateTime(start_date, '2025-01-01 00:00:00')}} TYPE endpoint ` ); const result = await runMigrate({ cwd: tempDir, patterns: ["."], strict: true, }); expect(result.success).toBe(true); expect(result.errors).toHaveLength(0); const output = fs.readFileSync(result.outputPath, "utf-8"); expect(output).toContain('export const placeholderError = definePipe("placeholder_error", {'); expect(output).toContain("params: {"); expect(output).toContain('start_date: p.dateTime().optional("2025-01-01 00:00:00"),'); expect(output).not.toContain("error:"); }); it("uses the last explicit default and strips inline defaults from SQL", async () => { const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "tinybird-migrate-")); tempDirs.push(tempDir); writeFile( tempDir, "last_default.pipe", `NODE endpoint SQL > SELECT 1 WHERE d >= {{ String(start_date, '2025-03-01') }} AND d <= {{ String(start_date, '2025-04-01') }} TYPE endpoint ` ); const result = await runMigrate({ cwd: tempDir, patterns: ["."], strict: true, }); expect(result.success).toBe(true); expect(result.errors).toHaveLength(0); const output = fs.readFileSync(result.outputPath, "utf-8"); expect(output).toContain('start_date: p.string().optional("2025-04-01"),'); expect(output).toContain("{{ String(start_date) }}"); expect(output).not.toContain("{{ String(start_date, '2025-03-01') }}"); expect(output).not.toContain("{{ String(start_date, '2025-04-01') }}"); }); it("keeps the previous truthy default when a later duplicate uses a falsy default", async () => { const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "tinybird-migrate-")); tempDirs.push(tempDir); writeFile( tempDir, "truthy_default_precedence.pipe", `NODE endpoint SQL > SELECT 1 WHERE d >= {{ String(start_date, '2025-03-01') }} AND d <= {{ String(start_date, '') }} TYPE endpoint ` ); const result = await runMigrate({ cwd: tempDir, patterns: ["."], strict: true, }); expect(result.success).toBe(true); expect(result.errors).toHaveLength(0); const output = fs.readFileSync(result.outputPath, "utf-8"); expect(output).toContain('start_date: p.string().optional("2025-03-01"),'); expect(output).toContain("{{ String(start_date) }}"); }); it("extracts params from placeholder expressions with multiple function calls", async () => { const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "tinybird-migrate-")); tempDirs.push(tempDir); writeFile( tempDir, "expression_params.pipe", `NODE endpoint SQL > SELECT 1 LIMIT {{ Int32(limit, 20) }} OFFSET {{ Int32(page, 0) * Int32(limit, 20) }} TYPE endpoint ` ); const result = await runMigrate({ cwd: tempDir, patterns: ["."], strict: true, }); expect(result.success).toBe(true); expect(result.errors).toHaveLength(0); const output = fs.readFileSync(result.outputPath, "utf-8"); expect(output).toContain("limit: p.int32().optional(20),"); expect(output).toContain("page: p.int32().optional(0),"); expect(output).toContain("{{ Int32(limit) }}"); expect(output).toContain("{{ Int32(page) * Int32(limit) }}"); }); it("uses the last explicit type when a param appears with mixed placeholder types", async () => { const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "tinybird-migrate-")); tempDirs.push(tempDir); writeFile( tempDir, "last_type.pipe", `NODE endpoint SQL > SELECT 1 WHERE d >= {{ Date(start_date) }} AND d <= {{ String(start_date, '2025-04-01') }} TYPE endpoint ` ); const result = await runMigrate({ cwd: tempDir, patterns: ["."], strict: true, }); expect(result.success).toBe(true); expect(result.errors).toHaveLength(0); const output = fs.readFileSync(result.outputPath, "utf-8"); expect(output).toContain('start_date: p.string().optional("2025-04-01"),'); expect(output).toContain("{{ Date(start_date) }}"); expect(output).toContain("{{ String(start_date) }}"); }); it("parses multiline datasource blocks with flexible indentation", async () => { const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "tinybird-migrate-")); tempDirs.push(tempDir); writeFile( tempDir, "flex.datasource", `DESCRIPTION > Flexible indentation description SCHEMA > id String, env String ENGINE "MergeTree" ENGINE_SORTING_KEY "id" FORWARD_QUERY > SELECT id, env FROM upstream_ds SHARED_WITH > workspace_a, workspace_b ` ); const result = await runMigrate({ cwd: tempDir, patterns: ["."], strict: true, }); expect(result.success).toBe(true); expect(result.errors).toHaveLength(0); const output = fs.readFileSync(result.outputPath, "utf-8"); expect(output).toContain('description: "Flexible indentation description"'); expect(output).toContain("forwardQuery: `"); expect(output).toContain("SELECT id, env"); expect(output).toContain("FROM upstream_ds"); expect(output).toContain('sharedWith: ["workspace_a", "workspace_b"],'); }); it("parses node SQL blocks with flexible indentation", async () => { const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "tinybird-migrate-")); tempDirs.push(tempDir); writeFile( tempDir, "flex_pipe.pipe", `NODE endpoint_node DESCRIPTION > Endpoint node description SQL > % SELECT 1 AS id FROM system.numbers TYPE endpoint ` ); const result = await runMigrate({ cwd: tempDir, patterns: ["."], strict: true, }); expect(result.success).toBe(true); expect(result.errors).toHaveLength(0); const output = fs.readFileSync(result.outputPath, "utf-8"); expect(output).toContain('description: "Endpoint node description"'); expect(output).toContain("SELECT 1 AS id"); expect(output).toContain("FROM system.numbers"); expect(output).toContain("endpoint: true,"); }); it("migrates datasource without engine directives", async () => { const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "tinybird-migrate-")); tempDirs.push(tempDir); writeFile( tempDir, "no_engine.datasource", `SCHEMA > id String, name String ` ); const result = await runMigrate({ cwd: tempDir, patterns: ["."], strict: true, }); expect(result.success).toBe(true); expect(result.errors).toHaveLength(0); const output = fs.readFileSync(result.outputPath, "utf-8"); expect(output).toContain('export const noEngine = defineDatasource("no_engine", {'); expect(output).not.toContain("engine:"); expect(output).not.toContain(", engine,"); }); it("migrates datasource with Null engine", async () => { const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "tinybird-migrate-")); tempDirs.push(tempDir); writeFile( tempDir, "null_source.datasource", `SCHEMA > id String, timestamp DateTime ENGINE Null ` ); const result = await runMigrate({ cwd: tempDir, patterns: ["."], strict: true, }); expect(result.success).toBe(true); expect(result.errors).toHaveLength(0); const output = fs.readFileSync(result.outputPath, "utf-8"); expect(output).toContain('export const nullSource = defineDatasource("null_source", {'); expect(output).toContain("engine: engine.null(),"); }); it("infers MergeTree when engine options exist without ENGINE directive", async () => { const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "tinybird-migrate-")); tempDirs.push(tempDir); writeFile( tempDir, "implicit_engine.datasource", `SCHEMA > id String, event_date Date ENGINE_SORTING_KEY "id" ENGINE_PARTITION_KEY "toYYYYMM(event_date)" ` ); const result = await runMigrate({ cwd: tempDir, patterns: ["."], strict: true, }); expect(result.success).toBe(true); expect(result.errors).toHaveLength(0); const output = fs.readFileSync(result.outputPath, "utf-8"); expect(output).toContain('export const implicitEngine = defineDatasource("implicit_engine", {'); expect(output).toContain( 'engine: engine.mergeTree({ sortingKey: "id", partitionKey: "toYYYYMM(event_date)" }),' ); }); it("supports quoted datasource token names with spaces", async () => { const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "tinybird-migrate-")); tempDirs.push(tempDir); writeFile( tempDir, "token_spaces.datasource", `TOKEN "ingestion (Data Source append)" APPEND SCHEMA > id String ENGINE "MergeTree" ENGINE_SORTING_KEY "id" ` ); const result = await runMigrate({ cwd: tempDir, patterns: ["."], strict: true, }); expect(result.success).toBe(true); expect(result.errors).toHaveLength(0); const output = fs.readFileSync(result.outputPath, "utf-8"); expect(output).toContain( '{ name: "ingestion (Data Source append)", permissions: ["APPEND"] },' ); }); it("rejects unquoted datasource token names with spaces", async () => { const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "tinybird-migrate-")); tempDirs.push(tempDir); writeFile( tempDir, "bad_token.datasource", `TOKEN ingestion data APPEND SCHEMA > id String ENGINE "MergeTree" ENGINE_SORTING_KEY "id" ` ); const result = await runMigrate({ cwd: tempDir, patterns: ["."], strict: true, }); expect(result.success).toBe(false); expect(result.errors).toHaveLength(1); expect(result.errors[0]?.message).toContain("Unsupported TOKEN syntax in strict mode"); }); it("allows empty datasource DESCRIPTION block", async () => { const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "tinybird-migrate-")); tempDirs.push(tempDir); writeFile( tempDir, "empty_ds_desc.datasource", `DESCRIPTION > SCHEMA > id String ENGINE "MergeTree" ENGINE_SORTING_KEY "id" ` ); const result = await runMigrate({ cwd: tempDir, patterns: ["."], strict: true, }); expect(result.success).toBe(true); expect(result.errors).toHaveLength(0); const output = fs.readFileSync(result.outputPath, "utf-8"); expect(output).toContain( 'export const emptyDsDesc = defineDatasource("empty_ds_desc", {' ); expect(output).toContain('description: "",'); }); it("allows empty node DESCRIPTION block", async () => { const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "tinybird-migrate-")); tempDirs.push(tempDir); writeFile( tempDir, "empty_node_desc.pipe", `NODE helper DESCRIPTION > SQL > SELECT 1 AS id TYPE endpoint ` ); const result = await runMigrate({ cwd: tempDir, patterns: ["."], strict: true, }); expect(result.success).toBe(true); expect(result.errors).toHaveLength(0); const output = fs.readFileSync(result.outputPath, "utf-8"); expect(output).toContain('export const emptyNodeDesc = definePipe("empty_node_desc", {'); expect(output).toContain('description: "",'); }); it("supports keyword-style pipe param arguments", async () => { const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "tinybird-migrate-")); tempDirs.push(tempDir); writeFile( tempDir, "keyword_params.pipe", `NODE endpoint SQL > SELECT 1 AS id WHERE tenant_id = {{ String(tenant_id, description="Tenant ID to filter", default="") }} AND event_date >= {{ Date(from_date, description="Starting date", required=False) }} AND days >= {{ Int32(days, 1, description="Days to include", required=True) }} TYPE endpoint ` ); const result = await runMigrate({ cwd: tempDir, patterns: ["."], strict: true, }); expect(result.success).toBe(true); expect(result.errors).toHaveLength(0); const output = fs.readFileSync(result.outputPath, "utf-8"); expect(output).toContain('export const keywordParams = definePipe("keyword_params", {'); expect(output).toContain( 'tenant_id: p.string().optional("").describe("Tenant ID to filter"),' ); expect(output).toContain( 'from_date: p.date().optional().describe("Starting date"),' ); expect(output).toContain( 'days: p.int32().optional(1).describe("Days to include"),' ); }); it("ignores function-like text inside quoted descriptions when parsing placeholders", async () => { const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "tinybird-migrate-")); tempDirs.push(tempDir); writeFile( tempDir, "description_parentheses.pipe", `NODE endpoint SQL > % SELECT 1 AS id WHERE ts >= now() - interval {{Int32(days, 1, description="Number of days to analyze (defaults to 1 day)")}} day TYPE endpoint ` ); const result = await runMigrate({ cwd: tempDir, patterns: ["."], strict: true, }); expect(result.success).toBe(true); expect(result.errors).toHaveLength(0); const output = fs.readFileSync(result.outputPath, "utf-8"); expect(output).toContain( 'days: p.int32().optional(1).describe("Number of days to analyze (defaults to 1 day)"),' ); expect(output).toContain("{{ Int32(days) }}"); }); it("escapes pipe SQL backslashes in migrated TypeScript template literals", async () => { const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "tinybird-migrate-")); tempDirs.push(tempDir); writeFile( tempDir, "escape_percent.pipe", String.raw`NODE endpoint SQL > % SELECT replaceAll({{ String(value) }}, '\\%', '%') AS value TYPE endpoint ` ); const result = await runMigrate({ cwd: tempDir, patterns: ["."], strict: true, }); expect(result.success).toBe(true); expect(result.errors).toHaveLength(0); const output = fs.readFileSync(result.outputPath, "utf-8"); expect(output).toContain( String.raw`SELECT replaceAll({{ String(value) }}, '\\\\%', '%') AS value` ); fs.writeFileSync( result.outputPath, output.replace("@tinybirdco/sdk", path.resolve(process.cwd(), "src/index.ts")) ); const build = await buildFromInclude({ cwd: tempDir, includePaths: [result.outputPath], }); expect(build.resources.pipes).toHaveLength(1); expect(build.resources.pipes[0]?.content).toContain( String.raw`SELECT replaceAll({{ String(value) }}, '\\%', '%') AS value` ); }); it("migrates datasource with mixed explicit and default json paths", async () => { const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "tinybird-migrate-")); tempDirs.push(tempDir); writeFile( tempDir, "mixed_paths.datasource", `SCHEMA > event_id String \`json:$.payload.id\`, event_type String ENGINE "MergeTree" ENGINE_SORTING_KEY "event_id" ` ); const result = await runMigrate({ cwd: tempDir, patterns: ["."], strict: true, }); expect(result.success).toBe(true); expect(result.errors).toHaveLength(0); const output = fs.readFileSync(result.outputPath, "utf-8"); expect(output).toContain('event_id: t.string().jsonPath("$.payload.id")'); expect(output).toContain("event_type: t.string()"); expect(output).not.toContain("jsonPaths: false"); }); it("normalizes backticked schema column names to valid object keys", async () => { const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "tinybird-migrate-")); tempDirs.push(tempDir); writeFile( tempDir, "backticked.datasource", `SCHEMA > \`_is_deleted\` UInt8 \`json:$._is_deleted\`, \`id\` UUID \`json:$.id\` ENGINE "MergeTree" ENGINE_SORTING_KEY "id" ` ); const result = await runMigrate({ cwd: tempDir, patterns: ["."], strict: true, }); expect(result.success).toBe(true); expect(result.errors).toHaveLength(0); const output = fs.readFileSync(result.outputPath, "utf-8"); expect(output).toContain('_is_deleted: t.uint8().jsonPath("$._is_deleted")'); expect(output).toContain('id: t.uuid().jsonPath("$.id")'); expect(output).not.toContain("`_is_deleted`:"); expect(output).not.toContain("`id`:"); }); it("emits secret helper for tb_secret template values", async () => { const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "tinybird-migrate-")); tempDirs.push(tempDir); writeFile( tempDir, "stream.connection", `TYPE kafka KAFKA_BOOTSTRAP_SERVERS localhost:9092 ` ); writeFile( tempDir, "events.datasource", `SCHEMA > id UUID ENGINE "MergeTree" ENGINE_SORTING_KEY "id" KAFKA_CONNECTION_NAME stream KAFKA_TOPIC events_topic KAFKA_GROUP_ID {{ tb_secret("KAFKA_GROUP_ID_LOCAL_ds_accounts", "accounts_1737295200") }} ` ); const result = await runMigrate({ cwd: tempDir, patterns: ["."], strict: true, }); expect(result.success).toBe(true); expect(result.errors).toHaveLength(0); const output = fs.readFileSync(result.outputPath, "utf-8"); expect(output).toContain('import {'); expect(output).toContain('secret } from "@tinybirdco/sdk";'); expect(output).not.toContain("const secret = (name: string, defaultValue?: string) =>"); expect(output).toContain( 'groupId: secret("KAFKA_GROUP_ID_LOCAL_ds_accounts", "accounts_1737295200"),' ); expect(output).not.toContain( 'groupId: "{{ tb_secret(\\"KAFKA_GROUP_ID_LOCAL_ds_accounts\\", \\"accounts_1737295200\\") }}",' ); }); it("does not emit secret helper when no tb_secret template values are present", async () => { const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "tinybird-migrate-")); tempDirs.push(tempDir); writeFile( tempDir, "stream.connection", `TYPE kafka KAFKA_BOOTSTRAP_SERVERS localhost:9092 ` ); writeFile( tempDir, "events.datasource", `SCHEMA > id UUID ENGINE "MergeTree" ENGINE_SORTING_KEY "id" KAFKA_CONNECTION_NAME stream KAFKA_TOPIC events_topic KAFKA_GROUP_ID events-group ` ); const result = await runMigrate({ cwd: tempDir, patterns: ["."], strict: true, }); expect(result.success).toBe(true); expect(result.errors).toHaveLength(0); const output = fs.readFileSync(result.outputPath, "utf-8"); expect(output).not.toContain(", secret,"); expect(output).not.toContain("const secret = (name: string, defaultValue?: string) =>"); expect(output).toContain('groupId: "events-group",'); }); it("migrates Kafka sink pipes and emits sink config in TypeScript", async () => { const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "tinybird-migrate-")); tempDirs.push(tempDir); writeFile( tempDir, "events_kafka.connection", `TYPE kafka KAFKA_BOOTSTRAP_SERVERS localhost:9092 ` ); writeFile( tempDir, "events_sink.pipe", `NODE publish SQL > SELECT * FROM events WHERE env = {{String(env, 'prod')}} TYPE sink EXPORT_CONNECTION_NAME events_kafka EXPORT_KAFKA_TOPIC events_out EXPORT_SCHEDULE @on-demand ` ); const result = await runMigrate({ cwd: tempDir, patterns: ["."], strict: true, }); expect(result.success).toBe(true); expect(result.errors).toHaveLength(0); expect(result.migrated.filter((resource) => resource.kind === "connection")).toHaveLength(1); expect(result.migrated.filter((resource) => resource.kind === "pipe")).toHaveLength(1); const output = fs.readFileSync(result.outputPath, "utf-8"); expect(output).toContain('export const eventsSink = defineSinkPipe("events_sink", {'); expect(output).toContain("params: {"); expect(output).toContain('env: p.string().optional("prod"),'); expect(output).toContain("sink: {"); expect(output).toContain("connection: eventsKafka"); expect(output).toContain('topic: "events_out"'); expect(output).toContain('schedule: "@on-demand"'); expect(output).not.toContain('strategy:'); }); it("migrates S3 sink pipes and emits compression/strategy config in TypeScript", async () => { const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "tinybird-migrate-")); tempDirs.push(tempDir); writeFile( tempDir, "exports_s3.connection", `TYPE s3 S3_REGION "us-east-1" S3_ARN "arn:aws:iam::123456789012:role/tinybird-s3-access" ` ); writeFile( tempDir, "events_s3_sink.pipe", `NODE export SQL > SELECT * FROM events TYPE sink EXPORT_CONNECTION_NAME exports_s3 EXPORT_BUCKET_URI s3://exports/events/ EXPORT_FILE_TEMPLATE events_{date} EXPORT_SCHEDULE @once EXPORT_FORMAT ndjson EXPORT_STRATEGY create_new EXPORT_COMPRESSION gzip ` ); const result = await runMigrate({ cwd: tempDir, patterns: ["."], strict: true, }); expect(result.success).toBe(true); expect(result.errors).toHaveLength(0); const output = fs.readFileSync(result.outputPath, "utf-8"); expect(output).toContain('export const eventsS3Sink = defineSinkPipe("events_s3_sink", {'); expect(output).toContain("sink: {"); expect(output).toContain("connection: exportsS3"); expect(output).toContain('bucketUri: "s3://exports/events/"'); expect(output).toContain('fileTemplate: "events_{date}"'); expect(output).toContain('schedule: "@once"'); expect(output).toContain('format: "ndjson"'); expect(output).toContain('strategy: "create_new"'); expect(output).toContain('compression: "gzip"'); }); it("migrates legacy EXPORT_WRITE_STRATEGY sink directives", async () => { const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "tinybird-migrate-")); tempDirs.push(tempDir); writeFile( tempDir, "exports_s3.connection", `TYPE s3 S3_REGION "us-east-1" S3_ARN "arn:aws:iam::123456789012:role/tinybird-s3-access" ` ); writeFile( tempDir, "events_s3_sink.pipe", `NODE export SQL > SELECT * FROM events TYPE sink EXPORT_CONNECTION_NAME exports_s3 EXPORT_BUCKET_URI s3://exports/events/ EXPORT_FILE_TEMPLATE events_{date} EXPORT_SCHEDULE @once EXPORT_FORMAT ndjson EXPORT_WRITE_STRATEGY truncate ` ); const result = await runMigrate({ cwd: tempDir, patterns: ["."], strict: true, }); expect(result.success).toBe(true); expect(result.errors).toHaveLength(0); const output = fs.readFileSync(result.outputPath, "utf-8"); expect(output).toContain('export const eventsS3Sink = defineSinkPipe("events_s3_sink", {'); expect(output).toContain('strategy: "replace"'); }); it("reports an error when sink pipe references a missing connection", async () => { const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "tinybird-migrate-")); tempDirs.push(tempDir); writeFile( tempDir, "events_sink.pipe", `NODE publish SQL > SELECT * FROM events TYPE sink EXPORT_CONNECTION_NAME missing_connection EXPORT_KAFKA_TOPIC events_out EXPORT_SCHEDULE @on-demand ` ); const result = await runMigrate({ cwd: tempDir, patterns: ["."], strict: true, }); expect(result.success).toBe(false); expect(result.errors.map((error) => error.message)).toEqual( expect.arrayContaining([ 'Sink pipe references missing/unmigrated connection "missing_connection".', ]) ); }); it("reports an error when sink connection type does not match sink service", async () => { const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "tinybird-migrate-")); tempDirs.push(tempDir); writeFile( tempDir, "events_kafka.connection", `TYPE kafka KAFKA_BOOTSTRAP_SERVERS localhost:9092 ` ); writeFile( tempDir, "events_s3_sink.pipe", `NODE export SQL > SELECT * FROM events TYPE sink EXPORT_CONNECTION_NAME events_kafka EXPORT_BUCKET_URI s3://exports/events/ EXPORT_FILE_TEMPLATE events_{date} EXPORT_SCHEDULE @once EXPORT_FORMAT csv ` ); const result = await runMigrate({ cwd: tempDir, patterns: ["."], strict: true, }); expect(result.success).toBe(false); expect(result.errors.map((error) => error.message)).toEqual( expect.arrayContaining([ 'Sink pipe service "s3" is incompatible with connection "events_kafka" type "kafka".', ]) ); }); it("supports comments in sink pipe files", async () => { const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "tinybird-migrate-")); tempDirs.push(tempDir); writeFile( tempDir, "events_kafka.connection", `TYPE kafka KAFKA_BOOTSTRAP_SERVERS localhost:9092 ` ); writeFile( tempDir, "events_sink.pipe", `# this pipe publishes records NODE publish SQL > SELECT * FROM events TYPE sink # Kafka target EXPORT_CONNECTION_NAME events_kafka EXPORT_KAFKA_TOPIC events_out EXPORT_SCHEDULE @on-demand ` ); const result = await runMigrate({ cwd: tempDir, patterns: ["."], strict: true, }); expect(result.success).toBe(true); expect(result.errors).toHaveLength(0); }); it("emits defaultExpr for datasource SQL function defaults", async () => { const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "tinybird-migrate-")); tempDirs.push(tempDir); writeFile( tempDir, "events.datasource", `SCHEMA > id UUID DEFAULT generateUUIDv4(), payload String DEFAULT '{}' ENGINE "MergeTree" ENGINE_SORTING_KEY "id" ` ); const result = await runMigrate({ cwd: tempDir, patterns: ["."], strict: true, }); expect(result.success).toBe(true); expect(result.errors).toHaveLength(0); const output = fs.readFileSync(result.outputPath, "utf-8"); expect(output).toContain('id: t.uuid().defaultExpr("generateUUIDv4()"),'); expect(output).toContain('payload: t.string().default("{}"),'); }); });