#!/usr/bin/env node import { Command } from "commander"; import { createServer } from "node:http"; import { pathToFileURL } from "node:url"; import { resolve } from "node:path"; import { RPCHandler } from "@orpc/server/node"; import { onError } from "@orpc/server"; import * as v from "valibot"; import { router } from "./api/server.ts"; import { Manager, ManagerConfigSchema } from "./manager.ts"; import { logger } from "./logger.ts"; import { tsImport } from "tsx/esm/api"; import Table from "cli-table3"; import { createClient } from "./api/client.ts"; import { ProcessDefinitionSchema } from "./api/contract.ts"; const program = new Command(); program .name("pid1") .description("Process manager with init system capabilities") .version("0.0.0-dev.1"); program .command("init") .description("Initialize and run the process manager with config file") .option("-c, --config ", "Path to config file", "pid1.config.ts") .action(async (options) => { const initLogger = logger({ name: "pid1" }); try { // Resolve config file path const configPath = resolve(process.cwd(), options.config); const configUrl = pathToFileURL(configPath).href; // Import the config file const configModule = await tsImport(configUrl, { parentURL: import.meta.url }); const rawConfig = configModule.default.default || configModule.default || configModule.config || {}; // Parse and validate the config with Valibot const config = v.parse(ManagerConfigSchema, rawConfig); // Extract HTTP config with defaults const host = config.http?.host ?? "localhost"; const port = config.http?.port ?? 3000; const authToken = config.http?.authToken; // Create manager with config const logDir = config.logDir ?? resolve(process.cwd(), "logs"); const managerLogger = logger({ name: "pid1", logFile: resolve(logDir, "pid1.log") }); const manager = new Manager(config, managerLogger); // Setup ORPC server with optional auth token middleware const handler = new RPCHandler(router, { interceptors: [ onError((error) => { managerLogger.error(error); }), ], }); const server = createServer(async (req, res) => { // Check auth token if configured if (authToken) { const providedToken = req.headers["authorization"]?.replace("Bearer ", ""); if (providedToken !== authToken) { res.statusCode = 401; res.end("Unauthorized"); return; } } const { matched } = await handler.handle(req, res, { prefix: "/rpc", context: { manager }, }); if (matched) return; res.statusCode = 404; res.end("Not found"); }); server.listen(port, host, async () => { managerLogger.info(`pid1 RPC server running on http://${host}:${port}`); if (authToken) { managerLogger.info("Auth token required for API access"); } try { await manager.start(); } catch (err) { managerLogger.error("Failed to start manager:", err); server.close(); process.exit(1); } }); // Wait for shutdown await manager.waitForShutdown(); // Close server on shutdown server.close(); } catch (error) { initLogger.error("Failed to start pid1:", error); process.exit(1); } }); program .command("status") .description("Show manager status") .option("-u, --url ", "RPC server URL", "http://localhost:3000/rpc") .action(async (options) => { try { const client = createClient(options.url); const status = await client.manager.status(); const table = new Table({ head: ["State", "Processes", "Crons", "Tasks"] }); table.push([status.state, status.processCount, status.cronCount, status.taskCount]); console.log(table.toString()); } catch (error) { console.error("Failed to fetch status:", error); process.exit(1); } }); const processes = program.command("processes").description("Manage restarting processes"); processes .command("list") .description("List restarting processes") .option("-u, --url ", "RPC server URL", "http://localhost:3000/rpc") .action(async (options) => { try { const client = createClient(options.url); const processes = await client.processes.list(); const table = new Table({ head: ["Name", "State", "Restarts"] }); for (const proc of processes) { table.push([proc.name, proc.state, proc.restarts]); } console.log(table.toString()); } catch (error) { console.error("Failed to list processes:", error); process.exit(1); } }); processes .command("get") .description("Get a restarting process by name or index") .argument("", "Process name or index") .option("-u, --url ", "RPC server URL", "http://localhost:3000/rpc") .action(async (target, options) => { try { const client = createClient(options.url); const proc = await client.processes.get({ target: parseTarget(target) }); const table = new Table({ head: ["Name", "State", "Restarts"] }); table.push([proc.name, proc.state, proc.restarts]); console.log(table.toString()); } catch (error) { console.error("Failed to get process:", error); process.exit(1); } }); processes .command("add") .description("Add a restarting process") .requiredOption("-n, --name ", "Process name") .requiredOption("-d, --definition ", "Process definition JSON") .option("-u, --url ", "RPC server URL", "http://localhost:3000/rpc") .action(async (options) => { try { const client = createClient(options.url); const definition = parseDefinition(options.definition); const proc = await client.processes.add({ name: options.name, definition }); const table = new Table({ head: ["Name", "State", "Restarts"] }); table.push([proc.name, proc.state, proc.restarts]); console.log(table.toString()); } catch (error) { console.error("Failed to add process:", error); process.exit(1); } }); processes .command("start") .description("Start a restarting process") .argument("", "Process name or index") .option("-u, --url ", "RPC server URL", "http://localhost:3000/rpc") .action(async (target, options) => { try { const client = createClient(options.url); const proc = await client.processes.start({ target: parseTarget(target) }); const table = new Table({ head: ["Name", "State", "Restarts"] }); table.push([proc.name, proc.state, proc.restarts]); console.log(table.toString()); } catch (error) { console.error("Failed to start process:", error); process.exit(1); } }); processes .command("stop") .description("Stop a restarting process") .argument("", "Process name or index") .option("-u, --url ", "RPC server URL", "http://localhost:3000/rpc") .action(async (target, options) => { try { const client = createClient(options.url); const proc = await client.processes.stop({ target: parseTarget(target) }); const table = new Table({ head: ["Name", "State", "Restarts"] }); table.push([proc.name, proc.state, proc.restarts]); console.log(table.toString()); } catch (error) { console.error("Failed to stop process:", error); process.exit(1); } }); processes .command("restart") .description("Restart a restarting process") .argument("", "Process name or index") .option("-f, --force", "Force restart") .option("-u, --url ", "RPC server URL", "http://localhost:3000/rpc") .action(async (target, options) => { try { const client = createClient(options.url); const proc = await client.processes.restart({ target: parseTarget(target), force: options.force, }); const table = new Table({ head: ["Name", "State", "Restarts"] }); table.push([proc.name, proc.state, proc.restarts]); console.log(table.toString()); } catch (error) { console.error("Failed to restart process:", error); process.exit(1); } }); processes .command("remove") .description("Remove a restarting process") .argument("", "Process name or index") .option("-u, --url ", "RPC server URL", "http://localhost:3000/rpc") .action(async (target, options) => { try { const client = createClient(options.url); await client.processes.remove({ target: parseTarget(target) }); console.log("Process removed"); } catch (error) { console.error("Failed to remove process:", error); process.exit(1); } }); const crons = program.command("crons").description("Manage cron processes"); crons .command("list") .description("List cron processes") .option("-u, --url ", "RPC server URL", "http://localhost:3000/rpc") .action(async (options) => { try { const client = createClient(options.url); const crons = await client.crons.list(); const table = new Table({ head: ["Name", "State", "Runs", "Fails", "Next Run"] }); for (const cron of crons) { table.push([cron.name, cron.state, cron.runCount, cron.failCount, cron.nextRun ?? "-"]); } console.log(table.toString()); } catch (error) { console.error("Failed to list crons:", error); process.exit(1); } }); crons .command("get") .description("Get a cron process by name or index") .argument("", "Cron name or index") .option("-u, --url ", "RPC server URL", "http://localhost:3000/rpc") .action(async (target, options) => { try { const client = createClient(options.url); const cron = await client.crons.get({ target: parseTarget(target) }); const table = new Table({ head: ["Name", "State", "Runs", "Fails", "Next Run"] }); table.push([cron.name, cron.state, cron.runCount, cron.failCount, cron.nextRun ?? "-"]); console.log(table.toString()); } catch (error) { console.error("Failed to get cron:", error); process.exit(1); } }); crons .command("trigger") .description("Trigger a cron process") .argument("", "Cron name or index") .option("-u, --url ", "RPC server URL", "http://localhost:3000/rpc") .action(async (target, options) => { try { const client = createClient(options.url); const cron = await client.crons.trigger({ target: parseTarget(target) }); const table = new Table({ head: ["Name", "State", "Runs", "Fails", "Next Run"] }); table.push([cron.name, cron.state, cron.runCount, cron.failCount, cron.nextRun ?? "-"]); console.log(table.toString()); } catch (error) { console.error("Failed to trigger cron:", error); process.exit(1); } }); crons .command("start") .description("Start a cron process") .argument("", "Cron name or index") .option("-u, --url ", "RPC server URL", "http://localhost:3000/rpc") .action(async (target, options) => { try { const client = createClient(options.url); const cron = await client.crons.start({ target: parseTarget(target) }); const table = new Table({ head: ["Name", "State", "Runs", "Fails", "Next Run"] }); table.push([cron.name, cron.state, cron.runCount, cron.failCount, cron.nextRun ?? "-"]); console.log(table.toString()); } catch (error) { console.error("Failed to start cron:", error); process.exit(1); } }); crons .command("stop") .description("Stop a cron process") .argument("", "Cron name or index") .option("-u, --url ", "RPC server URL", "http://localhost:3000/rpc") .action(async (target, options) => { try { const client = createClient(options.url); const cron = await client.crons.stop({ target: parseTarget(target) }); const table = new Table({ head: ["Name", "State", "Runs", "Fails", "Next Run"] }); table.push([cron.name, cron.state, cron.runCount, cron.failCount, cron.nextRun ?? "-"]); console.log(table.toString()); } catch (error) { console.error("Failed to stop cron:", error); process.exit(1); } }); const tasks = program.command("tasks").description("Manage tasks"); tasks .command("list") .description("List tasks") .option("-u, --url ", "RPC server URL", "http://localhost:3000/rpc") .action(async (options) => { try { const client = createClient(options.url); const tasks = await client.tasks.list(); const table = new Table({ head: ["Id", "State", "Processes"] }); for (const task of tasks) { table.push([task.id, task.state, task.processNames.join(", ")]); } console.log(table.toString()); } catch (error) { console.error("Failed to list tasks:", error); process.exit(1); } }); tasks .command("get") .description("Get a task by id or index") .argument("", "Task id or index") .option("-u, --url ", "RPC server URL", "http://localhost:3000/rpc") .action(async (target, options) => { try { const client = createClient(options.url); const task = await client.tasks.get({ target: parseTarget(target) }); const table = new Table({ head: ["Id", "State", "Processes"] }); table.push([task.id, task.state, task.processNames.join(", ")]); console.log(table.toString()); } catch (error) { console.error("Failed to get task:", error); process.exit(1); } }); tasks .command("add") .description("Add a task") .requiredOption("-n, --name ", "Task name") .requiredOption("-d, --definition ", "Process definition JSON") .option("-u, --url ", "RPC server URL", "http://localhost:3000/rpc") .action(async (options) => { try { const client = createClient(options.url); const definition = parseDefinition(options.definition); const task = await client.tasks.add({ name: options.name, definition }); const table = new Table({ head: ["Id", "State", "Processes"] }); table.push([task.id, task.state, task.processNames.join(", ")]); console.log(table.toString()); } catch (error) { console.error("Failed to add task:", error); process.exit(1); } }); tasks .command("remove") .description("Remove a task by id or index") .argument("", "Task id or index") .option("-u, --url ", "RPC server URL", "http://localhost:3000/rpc") .action(async (target, options) => { try { const client = createClient(options.url); const task = await client.tasks.remove({ target: parseTarget(target) }); const table = new Table({ head: ["Id", "State", "Processes"] }); table.push([task.id, task.state, task.processNames.join(", ")]); console.log(table.toString()); } catch (error) { console.error("Failed to remove task:", error); process.exit(1); } }); const TargetSchema = v.union([v.string(), v.number()]); type ProcessDefinition = v.InferOutput; function parseTarget(value: string): string | number { const asNumber = Number(value); return v.parse(TargetSchema, Number.isNaN(asNumber) ? value : asNumber); } function parseDefinition(raw: string): ProcessDefinition { try { const parsed = JSON.parse(raw); return v.parse(ProcessDefinitionSchema, parsed); } catch (error) { console.error("Invalid --definition JSON. Expected a ProcessDefinition."); throw error; } } program.parse();