import type { ImageBlockParam, TextBlockParam, } from '@anthropic-ai/sdk/resources/index.mjs' import { existsSync, readFileSync } from 'fs' import { Text } from 'ink' import { extname, isAbsolute, relative, resolve } from 'path' import * as React from 'react' import { z } from 'zod' import { FallbackToolUseRejectedMessage } from '../../components/FallbackToolUseRejectedMessage' import { Tool } from '../../Tool' import { NotebookCellSource, NotebookContent, NotebookCell, NotebookOutputImage, NotebookCellSourceOutput, NotebookCellOutput, NotebookCellType, } from '../../types/notebook' import { formatOutput } from '../BashTool/utils' import { getCwd } from '../../utils/state' import { findSimilarFile } from '../../utils/file' import { DESCRIPTION, PROMPT } from './prompt' import { hasReadPermission } from '../../utils/permissions/filesystem' const inputSchema = z.strictObject({ notebook_path: z .string() .describe( 'The absolute path to the Jupyter notebook file to read (must be absolute, not relative)', ), }) type In = typeof inputSchema type Out = NotebookCellSource[] export const NotebookReadTool = { name: 'ReadNotebook', async description() { return DESCRIPTION }, async prompt() { return PROMPT }, isReadOnly() { return true }, isConcurrencySafe() { return true // NotebookReadTool is read-only, safe for concurrent execution }, inputSchema, userFacingName() { return 'Read Notebook' }, async isEnabled() { return true }, needsPermissions({ notebook_path }) { return !hasReadPermission(notebook_path) }, async validateInput({ notebook_path }) { const fullFilePath = isAbsolute(notebook_path) ? notebook_path : resolve(getCwd(), notebook_path) if (!existsSync(fullFilePath)) { // Try to find a similar file with a different extension const similarFilename = findSimilarFile(fullFilePath) let message = 'File does not exist.' // If we found a similar file, suggest it to the assistant if (similarFilename) { message += ` Did you mean ${similarFilename}?` } return { result: false, message, } } if (extname(fullFilePath) !== '.ipynb') { return { result: false, message: 'File must be a Jupyter notebook (.ipynb file).', } } return { result: true } }, renderToolUseMessage(input, { verbose }) { return `notebook_path: ${verbose ? input.notebook_path : relative(getCwd(), input.notebook_path)}` }, renderToolUseRejectedMessage() { return }, renderToolResultMessage(content) { if (!content) { return No cells found in notebook } if (content.length < 1 || !content[0]) { return No cells found in notebook } return Read {content.length} cells }, async *call({ notebook_path }) { const fullPath = isAbsolute(notebook_path) ? notebook_path : resolve(getCwd(), notebook_path) const content = readFileSync(fullPath, 'utf-8') const notebook = JSON.parse(content) as NotebookContent const language = notebook.metadata.language_info?.name ?? 'python' const cells = notebook.cells.map((cell, index) => processCell(cell, index, language), ) yield { type: 'result', resultForAssistant: this.renderResultForAssistant(cells), data: cells, } }, renderResultForAssistant(data: NotebookCellSource[]) { // Convert the complex structure to a string representation for the assistant return data.map((cell, index) => { let content = `Cell ${index + 1} (${cell.cellType}):\n${cell.source}` if (cell.outputs && cell.outputs.length > 0) { const outputText = cell.outputs.map(output => output.text).filter(Boolean).join('\n') if (outputText) { content += `\nOutput:\n${outputText}` } } return content }).join('\n\n') }, } satisfies Tool function processOutputText(text: string | string[] | undefined): string { if (!text) return '' const rawText = Array.isArray(text) ? text.join('') : text const { truncatedContent } = formatOutput(rawText) return truncatedContent } function extractImage( data: Record, ): NotebookOutputImage | undefined { if (typeof data['image/png'] === 'string') { return { image_data: data['image/png'] as string, media_type: 'image/png', } } if (typeof data['image/jpeg'] === 'string') { return { image_data: data['image/jpeg'] as string, media_type: 'image/jpeg', } } return undefined } function processOutput(output: NotebookCellOutput) { switch (output.output_type) { case 'stream': return { output_type: output.output_type, text: processOutputText(output.text), } case 'execute_result': case 'display_data': return { output_type: output.output_type, text: processOutputText(output.data?.['text/plain']), image: output.data && extractImage(output.data), } case 'error': return { output_type: output.output_type, text: processOutputText( `${output.ename}: ${output.evalue}\n${output.traceback.join('\n')}`, ), } } } function processCell( cell: NotebookCell, index: number, language: string, ): NotebookCellSource { const cellData: NotebookCellSource = { cell: index, cellType: cell.cell_type, source: Array.isArray(cell.source) ? cell.source.join('') : cell.source, language, execution_count: cell.execution_count, } if (cell.outputs?.length) { cellData.outputs = cell.outputs.map(processOutput) } return cellData } function cellContentToToolResult(cell: NotebookCellSource): TextBlockParam { const metadata = [] if (cell.cellType !== 'code') { metadata.push(`${cell.cellType}`) } if (cell.language !== 'python' && cell.cellType === 'code') { metadata.push(`${cell.language}`) } const cellContent = `${metadata.join('')}${cell.source}` return { text: cellContent, type: 'text', } } function cellOutputToToolResult(output: NotebookCellSourceOutput) { const outputs: (TextBlockParam | ImageBlockParam)[] = [] if (output.text) { outputs.push({ text: `\n${output.text}`, type: 'text', }) } if (output.image) { outputs.push({ type: 'image', source: { data: output.image.image_data, media_type: output.image.media_type, type: 'base64', }, }) } return outputs } function getToolResultFromCell(cell: NotebookCellSource) { const contentResult = cellContentToToolResult(cell) const outputResults = cell.outputs?.flatMap(cellOutputToToolResult) return [contentResult, ...(outputResults ?? [])] } export function isNotebookCellType( value: string | null, ): value is NotebookCellType { return value === 'code' || value === 'markdown' }