/* Copyright 2026 Marimo. All rights reserved. */ import { DefaultChatTransport, type HttpChatTransportInitOptions, type UIMessage, type UIMessageChunk, } from "ai"; /** * Thin wrapper around the DefaultChatTransport that calls a callback when a chunk is received. */ export class StreamingChunkTransport< UI_MESSAGE extends UIMessage, > extends DefaultChatTransport { private onChunkReceived: (chunk: UIMessageChunk) => void; constructor( options: HttpChatTransportInitOptions, onChunkReceived: (chunk: UIMessageChunk) => void, ) { super(options); this.onChunkReceived = onChunkReceived; } protected override processResponseStream( stream: ReadableStream, ): ReadableStream { const onChunkReceived = this.onChunkReceived; return super.processResponseStream(stream).pipeThrough( new TransformStream({ async transform(chunk, controller) { onChunkReceived(chunk); controller.enqueue(chunk); }, }), ); } }