import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import * as serverExports from "@langfuse/shared/src/server"; import { env } from "../../env"; import { logger } from "@langfuse/shared/src/server"; import { ClickhouseWriter, TableName } from "../ClickhouseWriter"; // Mock recordHistogram, recordCount, recordGauge vi.mock("@langfuse/shared/src/server", async (importOriginal) => { const original = (await importOriginal()) as {}; return { ...original, recordHistogram: vi.fn(), recordCount: vi.fn(), recordGauge: vi.fn(), logger: { info: vi.fn(), debug: vi.fn(), warn: vi.fn(), error: vi.fn(), }, }; }); vi.mock("../../env", async (importOriginal) => { const original = (await importOriginal()) as {}; return { ...original, env: { LANGFUSE_INGESTION_CLICKHOUSE_WRITE_BATCH_SIZE: 100, LANGFUSE_INGESTION_CLICKHOUSE_WRITE_INTERVAL_MS: 5000, LANGFUSE_INGESTION_CLICKHOUSE_MAX_ATTEMPTS: 3, }, }; }); const clickhouseClientMock = { insert: vi.fn(), }; describe("ClickhouseWriter", () => { let writer: ClickhouseWriter; beforeEach(() => { vi.useFakeTimers(); writer = ClickhouseWriter.getInstance(clickhouseClientMock); }); afterEach(async () => { vi.restoreAllMocks(); vi.useRealTimers(); // Reset singleton instance await writer.shutdown(); ClickhouseWriter.instance = null; }); it("should be a singleton", () => { const instance1 = ClickhouseWriter.getInstance(); const instance2 = ClickhouseWriter.getInstance(); expect(instance1).toBe(instance2); }); it("should initialize with correct values", () => { expect(writer.batchSize).toBe( env.LANGFUSE_INGESTION_CLICKHOUSE_WRITE_BATCH_SIZE, ); expect(writer.writeInterval).toBe( env.LANGFUSE_INGESTION_CLICKHOUSE_WRITE_INTERVAL_MS, ); expect(writer.maxAttempts).toBe( env.LANGFUSE_INGESTION_CLICKHOUSE_MAX_ATTEMPTS, ); }); it("should add items to the queue", () => { const traceData = { id: "1", name: "test" }; writer.addToQueue(TableName.Traces, traceData as any); expect(writer["queue"][TableName.Traces]).toHaveLength(1); expect(writer["queue"][TableName.Traces][0].data).toEqual(traceData); }); it("should flush when queue reaches batch size", async () => { const mockInsert = vi .spyOn(clickhouseClientMock, "insert") .mockResolvedValue(); for (let i = 0; i < writer.batchSize; i++) { writer.addToQueue(TableName.Traces, { id: `${i}`, name: "test" } as any); } await vi.advanceTimersByTimeAsync(writer.writeInterval); expect(mockInsert).toHaveBeenCalledTimes(1); expect(writer["queue"][TableName.Traces]).toHaveLength(0); }); it("should flush at regular intervals", async () => { const mockInsert = vi .spyOn(clickhouseClientMock, "insert") .mockResolvedValue(); writer.addToQueue(TableName.Traces, { id: "1", name: "test" }); await vi.advanceTimersByTimeAsync(writer.writeInterval); expect(mockInsert).toHaveBeenCalledTimes(1); }); it("should handle errors and retry", async () => { const mockInsert = vi .spyOn(clickhouseClientMock, "insert") .mockRejectedValueOnce(new Error("DB Error")) .mockResolvedValueOnce(); writer.addToQueue(TableName.Traces, { id: "1", name: "test" }); await vi.advanceTimersByTimeAsync(writer.writeInterval); expect(mockInsert).toHaveBeenCalledTimes(1); expect(logger.error).toHaveBeenCalled(); expect(writer["queue"][TableName.Traces]).toHaveLength(1); expect(writer["queue"][TableName.Traces][0].attempts).toBe(2); await vi.advanceTimersByTimeAsync(writer.writeInterval); expect(mockInsert).toHaveBeenCalledTimes(2); expect(writer["queue"][TableName.Traces]).toHaveLength(0); }); it("should drop records after max attempts", async () => { const mockInsert = vi .spyOn(clickhouseClientMock, "insert") .mockRejectedValue(new Error("DB Error")); writer.addToQueue(TableName.Traces, { id: "1", name: "test" }); for (let i = 0; i < writer.maxAttempts; i++) { await vi.advanceTimersByTimeAsync(writer.writeInterval); } await vi.advanceTimersByTimeAsync(writer.writeInterval); expect(mockInsert).toHaveBeenCalledTimes(writer.maxAttempts); expect( logger.error.mock.calls.some((call) => call[0].includes("Max attempts reached"), ), ).toBe(true); expect(writer["queue"][TableName.Traces]).toHaveLength(0); }); it("should truncate logged events after dropping", async () => { const mockInsert = vi .spyOn(clickhouseClientMock, "insert") .mockRejectedValue(new Error("DB Error")); const baseString = "Lorem ipsum dolor sit amet, consectetur adipiscing elit. "; const repeatCount = Math.ceil(150000); const input = baseString.repeat(repeatCount); writer.addToQueue(TableName.Traces, { id: "1", name: "test", input }); for (let i = 0; i < writer.maxAttempts; i++) { await vi.advanceTimersByTimeAsync(writer.writeInterval); } await vi.advanceTimersByTimeAsync(writer.writeInterval); expect(mockInsert).toHaveBeenCalledTimes(writer.maxAttempts); expect( logger.error.mock.calls.some((call) => call[1]?.item?.input?.includes("TRUNCATED: Field exceeded size limit"), ), ).toBe(true); expect(writer["queue"][TableName.Traces]).toHaveLength(0); }); it("should shutdown gracefully", async () => { writer.addToQueue(TableName.Traces, { id: "1", name: "test" }); const mockInsert = vi .spyOn(clickhouseClientMock, "insert") .mockResolvedValue(); await writer.shutdown(); expect(mockInsert).toHaveBeenCalledTimes(1); expect(writer["intervalId"]).toBeNull(); expect(logger.info).toHaveBeenCalledWith( "ClickhouseWriter shutdown complete.", ); }); it("should handle multiple table types", async () => { const mockInsert = vi .spyOn(clickhouseClientMock, "insert") .mockResolvedValue(); writer.addToQueue(TableName.Traces, { id: "1", name: "trace" }); writer.addToQueue(TableName.Scores, { id: "2", name: "score" }); writer.addToQueue(TableName.Observations, { id: "3", name: "observation" }); await vi.advanceTimersByTimeAsync(writer.writeInterval); expect(mockInsert).toHaveBeenCalledTimes(3); expect(writer["queue"][TableName.Traces]).toHaveLength(0); expect(writer["queue"][TableName.Scores]).toHaveLength(0); expect(writer["queue"][TableName.Observations]).toHaveLength(0); }); it("should not flush when isIntervalFlushInProgress is true", async () => { const mockInsert = vi .spyOn(clickhouseClientMock, "insert") .mockResolvedValue(); writer["isIntervalFlushInProgress"] = true; writer.addToQueue(TableName.Traces, { id: "1", name: "test" }); await vi.advanceTimersByTimeAsync(writer.writeInterval); expect(mockInsert).not.toHaveBeenCalled(); expect(writer["queue"][TableName.Traces]).toHaveLength(1); }); it("should set up interval correctly in start method", () => { const setIntervalSpy = vi.spyOn(global, "setInterval"); writer["start"](); expect(setIntervalSpy).toHaveBeenCalledWith( expect.any(Function), writer.writeInterval, ); }); it("should flush all queues when flushAll is called directly", async () => { const mockInsert = vi .spyOn(clickhouseClientMock, "insert") .mockResolvedValue(); writer.addToQueue(TableName.Traces, { id: "1", name: "trace" }); writer.addToQueue(TableName.Scores, { id: "2", name: "score" }); await writer["flushAll"](true); expect(mockInsert).toHaveBeenCalledTimes(2); expect(writer["queue"][TableName.Traces]).toHaveLength(0); expect(writer["queue"][TableName.Scores]).toHaveLength(0); }); it("should handle adding items to queue while flush is in progress", async () => { const mockInsert = vi .spyOn(clickhouseClientMock, "insert") .mockImplementation(() => { writer.addToQueue(TableName.Traces, { id: "2", name: "test2" }); return Promise.resolve(); }); writer.addToQueue(TableName.Traces, { id: "1", name: "test1" }); await vi.advanceTimersByTimeAsync(writer.writeInterval); expect(mockInsert).toHaveBeenCalledTimes(1); expect(writer["queue"][TableName.Traces]).toHaveLength(1); expect(writer["queue"][TableName.Traces][0].data.id).toBe("2"); }); it("should handle concurrent writes during high load", async () => { const mockInsert = vi .spyOn(clickhouseClientMock, "insert") .mockResolvedValue(); const concurrentWrites = 1000; const writes = Array.from({ length: concurrentWrites }, (_, i) => writer.addToQueue(TableName.Traces, { id: `${i}`, name: `test${i}` }), ); await Promise.all(writes); await vi.advanceTimersByTimeAsync(writer.writeInterval); expect(mockInsert).toHaveBeenCalledTimes( Math.ceil(concurrentWrites / writer.batchSize), ); expect(writer["queue"][TableName.Traces].length).toBeLessThan( writer.batchSize, ); }); it("should report wait time and processing time metrics correctly", async () => { const metricsDistributionSpy = vi.spyOn(serverExports, "recordHistogram"); const mockInsert = vi .spyOn(clickhouseClientMock, "insert") .mockResolvedValue(); writer.addToQueue(TableName.Traces, { id: "1", name: "test" }); await vi.advanceTimersByTimeAsync(writer.writeInterval); expect(metricsDistributionSpy).toHaveBeenCalledWith( "langfuse.queue.clickhouse_writer.wait_time", expect.any(Number), { unit: "milliseconds" }, ); expect(metricsDistributionSpy).toHaveBeenCalledWith( "langfuse.queue.clickhouse_writer.processing_time", expect.any(Number), { unit: "milliseconds" }, ); }); it("should handle different types of Clickhouse client errors", async () => { const mockInsert = vi .spyOn(clickhouseClientMock, "insert") .mockRejectedValueOnce(new Error("Network error")) .mockRejectedValueOnce(new Error("Timeout")) .mockResolvedValueOnce(); writer.addToQueue(TableName.Traces, { id: "1", name: "test" }); await vi.advanceTimersByTimeAsync(writer.writeInterval); expect(logger.error).toHaveBeenCalledWith( expect.stringContaining("Network error"), ); await vi.advanceTimersByTimeAsync(writer.writeInterval); expect(logger.error).toHaveBeenCalledWith( expect.stringContaining("Timeout"), ); await vi.advanceTimersByTimeAsync(writer.writeInterval); expect(writer["queue"][TableName.Traces]).toHaveLength(0); }); it("should handle partial queue flush correctly", async () => { const mockInsert = vi .spyOn(clickhouseClientMock, "insert") .mockResolvedValue(); const partialQueueSize = Math.floor(writer.batchSize / 2); for (let i = 0; i < partialQueueSize; i++) { writer.addToQueue(TableName.Traces, { id: `${i}`, name: "test" } as any); } await vi.advanceTimersByTimeAsync(writer.writeInterval); expect(mockInsert).toHaveBeenCalledTimes(1); expect(mockInsert).toHaveBeenCalledWith( expect.objectContaining({ values: expect.arrayContaining( new Array(partialQueueSize).fill(expect.any(Object)), ), }), ); expect(writer["queue"][TableName.Traces]).toHaveLength(0); }); it("should continue functioning after encountering an error", async () => { const mockInsert = vi .spyOn(clickhouseClientMock, "insert") .mockRejectedValueOnce(new Error("DB Error")) .mockResolvedValue(); writer.addToQueue(TableName.Traces, { id: "1", name: "test1" }); await vi.advanceTimersByTimeAsync(writer.writeInterval); writer.addToQueue(TableName.Traces, { id: "2", name: "test2" }); await vi.advanceTimersByTimeAsync(writer.writeInterval); expect(mockInsert).toHaveBeenCalledTimes(2); expect(writer["queue"][TableName.Traces]).toHaveLength(0); }); describe("truncation logic", () => { it("should truncate oversized input field", () => { const largeInput = "a".repeat(2 * 1024 * 1024); // 2MB string const record = { id: "1", input: largeInput, output: "normal output", metadata: { key: "value" }, } as any; const truncatedRecord = writer["truncateOversizedRecord"]( TableName.Traces, record, ); expect(truncatedRecord.id).toBe("1"); expect((truncatedRecord as any).output).toBe("normal output"); expect((truncatedRecord as any).metadata).toEqual({ key: "value" }); expect((truncatedRecord as any).input).toContain( "[TRUNCATED: Field exceeded size limit]", ); expect((truncatedRecord as any).input.length).toBeLessThan( largeInput.length, ); expect((truncatedRecord as any).input).toMatch( /^a+\[TRUNCATED: Field exceeded size limit]$/, ); }); it("should truncate oversized output field", () => { const largeOutput = "b".repeat(2 * 1024 * 1024); // 2MB string const record = { id: "1", input: "normal input", output: largeOutput, metadata: { key: "value" }, }; const truncatedRecord = writer["truncateOversizedRecord"]( TableName.Traces, record, ); expect(truncatedRecord.id).toBe("1"); expect(truncatedRecord.input).toBe("normal input"); expect(truncatedRecord.metadata).toEqual({ key: "value" }); expect(truncatedRecord.output).toContain( "[TRUNCATED: Field exceeded size limit]", ); expect(truncatedRecord.output.length).toBeLessThan(largeOutput.length); expect(truncatedRecord.output).toMatch( /^b+\[TRUNCATED: Field exceeded size limit\]$/, ); }); it("should truncate oversized metadata values", () => { const largeMetadataValue = "c".repeat(2 * 1024 * 1024); // 2MB string const record = { id: "1", input: "normal input", output: "normal output", metadata: { normalKey: "normal value", largeKey: largeMetadataValue, anotherNormalKey: "another normal value", }, }; const truncatedRecord = writer["truncateOversizedRecord"]( TableName.Traces, record, ); expect(truncatedRecord.id).toBe("1"); expect(truncatedRecord.input).toBe("normal input"); expect(truncatedRecord.output).toBe("normal output"); expect(truncatedRecord.metadata.normalKey).toBe("normal value"); expect(truncatedRecord.metadata.anotherNormalKey).toBe( "another normal value", ); expect(truncatedRecord.metadata.largeKey).toContain( "[TRUNCATED: Field exceeded size limit]", ); expect(truncatedRecord.metadata.largeKey.length).toBeLessThan( largeMetadataValue.length, ); expect(truncatedRecord.metadata.largeKey).toMatch( /^c+\[TRUNCATED: Field exceeded size limit\]$/, ); }); it("should not truncate normal-sized fields", () => { const normalRecord = { id: "1", input: "normal input", output: "normal output", metadata: { key: "value" }, }; const truncatedRecord = writer["truncateOversizedRecord"]( TableName.Traces, normalRecord, ); expect(truncatedRecord).toEqual(normalRecord); }); it("should handle size errors with truncation in retry logic", async () => { const largeInput = "a".repeat(2 * 1024 * 1024); // 2MB string const record = { id: "1", input: largeInput, output: "normal output", } as any; const mockInsert = vi .spyOn(clickhouseClientMock, "insert") .mockRejectedValueOnce( new Error( "size of json object is extremely large and expected not greater than 1MB", ), ) .mockResolvedValueOnce(); writer.addToQueue(TableName.Traces, record); await vi.advanceTimersByTimeAsync(writer.writeInterval); expect(mockInsert).toHaveBeenCalledTimes(1); expect(logger.error).toHaveBeenCalledWith( expect.stringContaining("size of json object is extremely large"), ); // Second attempt with truncated data await vi.advanceTimersByTimeAsync(writer.writeInterval); expect(mockInsert).toHaveBeenCalledTimes(2); expect(logger.warn).toHaveBeenCalledWith( expect.stringContaining("Truncating oversized records"), expect.objectContaining({ attemptNumber: 1, error: "size of json object is extremely large and expected not greater than 1MB", }), ); expect(writer["queue"][TableName.Traces]).toHaveLength(0); // Verify that the second call used truncated data const secondCallArgs = mockInsert.mock.calls[1][0]; expect(secondCallArgs.values[0].input).toContain( "[TRUNCATED: Field exceeded size limit]", ); }); it("should handle string length errors with batch splitting", async () => { const mockInsert = vi .spyOn(clickhouseClientMock, "insert") .mockRejectedValueOnce(new Error("invalid string length")) .mockResolvedValue(); // Add 4 records to test splitting const records = Array.from({ length: 4 }, (_, i) => ({ id: `${i}`, name: `test${i}`, })); records.forEach((record) => { writer.addToQueue(TableName.Traces, record as any); }); await vi.advanceTimersByTimeAsync(writer.writeInterval); // After first interval: should have done initial call + retry with first half expect(mockInsert).toHaveBeenCalled(); expect(logger.warn).toHaveBeenCalledWith( expect.stringContaining("Splitting batch and retrying"), expect.objectContaining({ error: "invalid string length", batchSize: 4, }), ); // Check that queue now has the second half (2 records) at the front expect(writer["queue"][TableName.Traces]).toHaveLength(2); expect(writer["queue"][TableName.Traces][0].data.id).toBe("2"); expect(writer["queue"][TableName.Traces][1].data.id).toBe("3"); // Advance timer again to process the requeued items await vi.advanceTimersByTimeAsync(writer.writeInterval); expect(writer["queue"][TableName.Traces]).toHaveLength(0); }); }); });