import fs from "node:fs/promises"; import http from "node:http"; import https from "node:https"; import net from "node:net"; import path from "node:path"; import readline from "node:readline"; import { createHash } from "node:crypto"; import { spawn, type ChildProcess } from "node:child_process"; import { fileURLToPath } from "node:url"; import JSON5 from "json5"; import type { PluginLogger } from "../../api.js"; import { generateId } from "../protocol.js"; import { resolveDefaultOpenClawConfigPath, resolveDefaultOpenClawStateDir, resolveDefaultTeamClawRuntimeRootDir, resolveDefaultAgentDir, resolveTeamClawAgentDir, resolveTeamClawAgentWorkspaceRootDir, resolveTeamClawWorkspaceDir, TEAMCLAW_AGENT_ID, } from "../openclaw-workspace.js"; import { ROLES } from "../roles.js"; import { inferTaskRole } from "./role-inference.js"; import type { PluginConfig, ProvisionedWorkerRecord, ProvisionedWorkerStatus, RoleId, StartupProvisioningReadiness, TaskInfo, TeamProvisioningState, TeamState, WorkerProvisioningType, WorkerStatus, } from "../types.js"; const DEFAULT_CONTAINER_WORKER_PORT = 9527; const DEFAULT_CONTAINER_GATEWAY_PORT = 18789; const DEFAULT_DOCKER_BUNDLED_TEAMCLAW_PLUGIN_DIR = "/app/extensions/teamclaw"; const PROVISIONING_RECORD_RETENTION_MS = 6 * 60 * 60 * 1000; const PROVISIONING_FAILURE_COOLDOWN_MS = 30_000; const PROCESS_TERMINATION_TIMEOUT_MS = 10_000; const STARTUP_READINESS_POLL_INTERVAL_MS = 1_000; const STARTUP_READINESS_RETRY_INTERVAL_MS = 5_000; const DOCKER_API_VERSION = resolveDockerApiVersion(); export type WorkerProvisioningManagerDeps = { config: PluginConfig; logger: PluginLogger; getTeamState: () => TeamState | null; updateTeamState: (updater: (state: TeamState) => void) => TeamState; /** Actual HTTP port once the controller server has bound (may differ from config.port). */ actualControllerPort?: number; }; type LaunchSpec = { workerId: string; role: RoleId; launchToken: string; controllerUrl: string; workerPort: number; gatewayPort: number; publishedHostPort?: number; workspaceDir?: string; env: Record; configJson: string; }; type LaunchResult = { instanceId?: string; instanceName?: string; runtimeHomeDir?: string; }; interface WorkerProvisionerBackend { readonly type: WorkerProvisioningType; launch(spec: LaunchSpec): Promise; terminate(record: ProvisionedWorkerRecord): Promise; stop?(): Promise; } export class WorkerProvisioningManager { private deps: WorkerProvisioningManagerDeps; private readonly backend: WorkerProvisionerBackend | null; private baseConfigPromise: Promise> | null = null; private reconcilePromise: Promise | null = null; private reconcileQueued = false; private stopped = false; constructor(deps: WorkerProvisioningManagerDeps) { this.deps = deps; this.backend = createProvisionerBackend(deps.config, deps.logger); } /** Update the actual controller port after the HTTP server binds. */ setActualPort(port: number): void { this.deps = { ...this.deps, actualControllerPort: port }; } isEnabled(): boolean { return this.backend !== null; } hasManagedWorker(workerId: string): boolean { return Boolean(this.deps.getTeamState()?.provisioning?.workers?.[workerId]); } /** Returns true when the worker was provisioned as a local process (shared filesystem). */ isSharedWorkspaceWorker(workerId: string): boolean { const record = this.deps.getTeamState()?.provisioning?.workers?.[workerId]; return Boolean(record && record.provider === "process"); } syncState(state: TeamState): boolean { if (!this.backend) { return false; } return this.refreshProvisioningState(state, Date.now()); } primeStartupReadiness(): void { if (!this.backend || this.stopped) { return; } const now = Date.now(); const previousAttempts = this.deps.getTeamState()?.provisioning?.startupReadiness?.attempts ?? 0; this.updateStartupReadiness({ status: "checking", startedAt: now, checkedAt: now, attempts: previousAttempts, requiredRoles: this.determineStartupReadinessRoles(), readyWorkerIds: [], message: "Controller started; waiting for startup provisioning warm-up.", }); } validateRegistration( workerId: string, role: RoleId, launchToken: string | undefined, ): { ok: boolean; managed: boolean; reason?: string } { const record = this.deps.getTeamState()?.provisioning?.workers?.[workerId]; if (!record) { return { ok: true, managed: false }; } if (record.role !== role) { return { ok: false, managed: true, reason: `Provisioned worker ${workerId} expected role ${record.role}, got ${role}`, }; } if (!launchToken || launchToken !== record.launchToken) { return { ok: false, managed: true, reason: `Provisioned worker ${workerId} is missing a valid launch token`, }; } if (record.status === "failed" || record.status === "terminated" || record.status === "terminating") { return { ok: false, managed: true, reason: `Provisioned worker ${workerId} is no longer allowed to register (${record.status})`, }; } return { ok: true, managed: true }; } onWorkerRegistered(workerId: string): void { if (!this.backend) { return; } this.deps.updateTeamState((state) => { const record = ensureProvisioningState(state).workers[workerId]; if (!record) { return; } const now = Date.now(); record.status = "registered"; record.registeredAt = record.registeredAt ?? now; record.updatedAt = now; record.idleSince = now; delete record.lastError; }); } onWorkerHeartbeat(workerId: string, status: WorkerStatus): void { if (!this.backend) { return; } this.deps.updateTeamState((state) => { const record = ensureProvisioningState(state).workers[workerId]; if (!record) { return; } const now = Date.now(); if (record.status === "launching") { record.status = "registered"; record.registeredAt = record.registeredAt ?? now; } record.updatedAt = now; if (status === "idle") { record.idleSince = record.idleSince ?? now; } else { delete record.idleSince; } }); } async onWorkerRemoved(workerId: string, reason: string): Promise { if (!this.backend) { return; } const record = this.deps.getTeamState()?.provisioning?.workers?.[workerId]; if (!record || record.status === "terminated" || record.status === "failed") { return; } this.deps.logger.info(`Provisioner: terminating managed worker ${workerId} (${reason})`); await this.terminateManagedWorker(workerId, reason, "terminated"); } async requestReconcile(reason: string): Promise { if (!this.backend || this.stopped) { return; } if (this.reconcilePromise) { this.reconcileQueued = true; return this.reconcilePromise; } this.reconcilePromise = this.runReconcileLoop(reason) .catch((err) => { this.deps.logger.warn( `Provisioner: reconcile failed: ${err instanceof Error ? err.message : String(err)}`, ); }) .finally(() => { this.reconcilePromise = null; }); return this.reconcilePromise; } async runStartupReadinessCheck(): Promise { if (!this.backend || this.stopped) { return; } const requiredRoles = this.determineStartupReadinessRoles(); const previousAttempts = this.deps.getTeamState()?.provisioning?.startupReadiness?.attempts ?? 0; const startedAt = Date.now(); this.updateStartupReadiness({ status: "checking", startedAt, checkedAt: startedAt, attempts: previousAttempts + 1, requiredRoles, readyWorkerIds: [], message: `Warming startup workers for ${requiredRoles.join(", ")}.`, }); try { await this.ensureStartupProvisioningPrerequisites(); await this.ensureWarmWorkersForRoles(requiredRoles, "startup readiness warmup"); } catch (err) { const message = err instanceof Error ? err.message : String(err); this.updateStartupReadiness({ status: "degraded", startedAt, checkedAt: Date.now(), attempts: previousAttempts + 1, requiredRoles, readyWorkerIds: [], message, }); this.deps.logger.warn(`Provisioner: startup readiness prerequisites failed: ${message}`); return; } const deadline = Date.now() + this.deps.config.workerProvisioningStartupTimeoutMs; let lastRetryAt = 0; while (!this.stopped && Date.now() < deadline) { const readiness = this.collectRoleReadiness(requiredRoles); if (readiness.missingRoles.length === 0) { this.updateStartupReadiness({ status: "ready", startedAt, checkedAt: Date.now(), attempts: previousAttempts + 1, requiredRoles, readyWorkerIds: readiness.readyWorkerIds, message: `Startup provisioning ready with ${readiness.readyWorkerIds.length} warm worker(s).`, }); return; } if (Date.now() - lastRetryAt >= STARTUP_READINESS_RETRY_INTERVAL_MS) { lastRetryAt = Date.now(); await this.ensureWarmWorkersForRoles(readiness.missingRoles, "startup readiness retry"); await this.requestReconcile("startup readiness poll"); } await sleep(STARTUP_READINESS_POLL_INTERVAL_MS); } const finalReadiness = this.collectRoleReadiness(requiredRoles); const failureDetails = this.describeStartupReadinessFailure(finalReadiness.missingRoles); this.updateStartupReadiness({ status: "degraded", startedAt, checkedAt: Date.now(), attempts: previousAttempts + 1, requiredRoles, readyWorkerIds: finalReadiness.readyWorkerIds, message: failureDetails, }); this.deps.logger.warn(`Provisioner: startup readiness degraded: ${failureDetails}`); } async stop(): Promise { if (!this.backend) { return; } this.stopped = true; const state = this.deps.getTeamState(); const managedWorkerIds = state?.provisioning ? Object.entries(state.provisioning.workers) .filter(([, record]) => record.provider === this.backend?.type && record.status !== "terminated") .map(([workerId]) => workerId) : []; for (const workerId of managedWorkerIds) { try { await this.terminateManagedWorker(workerId, "controller shutdown", "terminated"); } catch (err) { this.deps.logger.warn( `Provisioner: failed to stop worker ${workerId}: ${err instanceof Error ? err.message : String(err)}`, ); } } await this.backend.stop?.(); } private async runReconcileLoop(initialReason: string): Promise { let reason = initialReason; do { this.reconcileQueued = false; await this.reconcileOnce(reason); reason = "queued reconcile"; } while (this.reconcileQueued && !this.stopped); } private async reconcileOnce(reason: string): Promise { if (!this.backend) { return; } const now = Date.now(); this.deps.updateTeamState((state) => { this.refreshProvisioningState(state, now); }); await this.expireStalledLaunches(now); const state = this.deps.getTeamState(); if (!state) { return; } const roles = this.getProvisionableRoles(state); for (const role of roles) { const demand = this.computeRoleDemand(state, role); if (demand > 0) { if ( this.deps.config.workerProvisioningRoles.length > 0 && !this.deps.config.workerProvisioningRoles.includes(role) ) { this.deps.logger.info( `Provisioner: allowing role ${role} because pending task demand exists outside configured workerProvisioningRoles`, ); } this.deps.logger.info(`Provisioner: role ${role} needs ${demand} additional worker(s) (${reason})`); } for (let i = 0; i < demand; i += 1) { if (this.hasRecentProvisioningFailure(role)) { this.deps.logger.warn(`Provisioner: recent ${role} launch failure detected; cooling down`); break; } await this.launchWorker(role); } } await this.scaleDownIdleWorkers(now); } private determineStartupReadinessRoles(): RoleId[] { const configured = this.deps.config.workerProvisioningRoles; if (configured.length > 0) { return [...new Set(configured)]; } return ["developer"]; } private async ensureStartupProvisioningPrerequisites(): Promise { await ensureWritableDirectory(resolveTeamClawAgentWorkspaceRootDir()); await ensureWritableDirectory(resolveTeamClawWorkspaceDir()); await ensureWritableDirectory(resolveDefaultTeamClawRuntimeRootDir()); if (this.deps.config.workerProvisioningType === "process") { await ensureWritableDirectory(resolveTeamClawAgentDir()); } } private async ensureWarmWorkersForRoles(roles: RoleId[], reason: string): Promise { const state = this.deps.getTeamState(); for (const role of roles) { if (this.hasActiveOrLaunchingWorkerForRole(state, role)) { continue; } if (this.hasRecentProvisioningFailure(role)) { this.deps.logger.warn(`Provisioner: startup warmup skipped ${role} due to recent failure`); continue; } this.deps.logger.info(`Provisioner: warming ${role} worker (${reason})`); await this.launchWorker(role); } } private async expireStalledLaunches(now: number): Promise { const state = this.deps.getTeamState(); if (!state?.provisioning) { return; } const timedOut = Object.values(state.provisioning.workers) .filter((record) => record.status === "launching") .filter((record) => now - record.requestedAt > this.deps.config.workerProvisioningStartupTimeoutMs); for (const record of timedOut) { await this.terminateManagedWorker( record.workerId, `startup timeout exceeded (${this.deps.config.workerProvisioningStartupTimeoutMs}ms)`, "failed", ); } } private async scaleDownIdleWorkers(now: number): Promise { const state = this.deps.getTeamState(); if (!state?.provisioning) { return; } const roles = this.getProvisionableRoles(state); for (const role of roles) { const activeWorkers = Object.values(state.workers).filter( (worker) => worker.role === role && worker.status !== "offline", ); const pendingDemand = this.countPendingTasksForRole(state, role); if (pendingDemand > 0) { continue; } let remainingActive = activeWorkers.length; const managedIdleWorkers = activeWorkers .filter((worker) => worker.status === "idle") .map((worker) => ({ worker, record: state.provisioning?.workers[worker.id], })) .filter((entry): entry is { worker: typeof activeWorkers[number]; record: ProvisionedWorkerRecord } => Boolean(entry.record)) .filter(({ record }) => record.status === "registered") .sort((a, b) => (a.record.idleSince ?? Number.MAX_SAFE_INTEGER) - (b.record.idleSince ?? Number.MAX_SAFE_INTEGER)); for (const entry of managedIdleWorkers) { if (remainingActive <= this.deps.config.workerProvisioningMinPerRole) { break; } if (!entry.record.idleSince || now - entry.record.idleSince < this.deps.config.workerProvisioningIdleTtlMs) { continue; } await this.terminateManagedWorker(entry.worker.id, "idle TTL exceeded", "terminated"); remainingActive -= 1; } } } private hasRecentProvisioningFailure(role: RoleId): boolean { const now = Date.now(); const records = this.deps.getTeamState()?.provisioning?.workers ?? {}; return Object.values(records).some((record) => record.role === role && record.status === "failed" && now - record.updatedAt < PROVISIONING_FAILURE_COOLDOWN_MS ); } private hasActiveOrLaunchingWorkerForRole(state: TeamState | null, role: RoleId): boolean { if (!state) { return false; } const activeWorker = Object.values(state.workers).some((worker) => worker.role === role && worker.status !== "offline" ); if (activeWorker) { return true; } return Object.values(state.provisioning?.workers ?? {}).some((record) => record.role === role && (record.status === "launching" || record.status === "registered") ); } private collectRoleReadiness(roles: RoleId[]): { readyWorkerIds: string[]; missingRoles: RoleId[] } { const state = this.deps.getTeamState(); const readyWorkerIds: string[] = []; const missingRoles: RoleId[] = []; for (const role of roles) { const worker = Object.values(state?.workers ?? {}).find((candidate) => candidate.role === role && (candidate.status === "idle" || candidate.status === "busy") ); if (worker) { readyWorkerIds.push(worker.id); } else { missingRoles.push(role); } } return { readyWorkerIds, missingRoles }; } private describeStartupReadinessFailure(missingRoles: RoleId[]): string { const records = Object.values(this.deps.getTeamState()?.provisioning?.workers ?? {}); const relevant = records.filter((record) => missingRoles.includes(record.role)); const detail = relevant .map((record) => `${record.role}:${record.status}${record.lastError ? ` (${record.lastError})` : ""}`) .join(", "); const suffix = detail ? ` Latest records: ${detail}` : ""; return `Startup worker readiness failed for roles: ${missingRoles.join(", ")}.${suffix}`; } private updateStartupReadiness(readiness: StartupProvisioningReadiness): void { this.deps.updateTeamState((state) => { ensureProvisioningState(state).startupReadiness = readiness; }); } private async launchWorker(role: RoleId): Promise { if (!this.backend) { return; } const workerId = `provisioned-${role}-${generateId()}`; const launchToken = `${generateId()}-${generateId()}`; const controllerUrl = this.resolveControllerUrl(); const requiresDedicatedHostPorts = requiresDedicatedHostPortsForProvisioner( this.backend.type, this.deps.config, ); const workerPort = requiresDedicatedHostPorts ? await reserveEphemeralPort() : DEFAULT_CONTAINER_WORKER_PORT; const gatewayPort = requiresDedicatedHostPorts ? await reserveEphemeralPort() : DEFAULT_CONTAINER_GATEWAY_PORT; // Docker bridge networks need a host port to publish so the controller // (running on the host) can reach the worker container. const needsPublishedPort = this.backend.type === "docker" && !isDockerHostNetwork(this.deps.config.workerProvisioningDockerNetwork); const publishedHostPort = needsPublishedPort ? await reserveEphemeralPort() : undefined; const now = Date.now(); this.deps.updateTeamState((state) => { ensureProvisioningState(state).workers[workerId] = { workerId, role, provider: this.backend!.type, status: "launching", launchToken, requestedAt: now, updatedAt: now, }; }); try { const baseConfig = await this.loadBaseOpenClawConfig(); const workerConfig = buildProvisionedWorkerConfig(baseConfig, this.deps.config, { role, controllerUrl, workerPort, gatewayPort, workspaceDir: buildProvisionedWorkspaceDir(this.backend.type, this.deps.config, role, workerId), }); const launchResult = await this.backend.launch({ workerId, role, launchToken, controllerUrl, workerPort, gatewayPort, publishedHostPort, workspaceDir: getConfiguredWorkerWorkspaceDir(workerConfig), env: this.buildForwardedEnv(controllerUrl), configJson: `${JSON.stringify(workerConfig, null, 2)}\n`, }); this.deps.updateTeamState((state) => { const record = ensureProvisioningState(state).workers[workerId]; if (!record) { return; } record.instanceId = launchResult.instanceId ?? record.instanceId; record.instanceName = launchResult.instanceName ?? record.instanceName; record.runtimeHomeDir = launchResult.runtimeHomeDir ?? record.runtimeHomeDir; record.updatedAt = Date.now(); }); this.deps.logger.info(`Provisioner: launched ${role} worker ${workerId} via ${this.backend.type}`); } catch (err) { const message = err instanceof Error ? err.message : String(err); this.deps.updateTeamState((state) => { const record = ensureProvisioningState(state).workers[workerId]; if (!record) { return; } record.status = "failed"; record.updatedAt = Date.now(); record.lastError = message; }); this.deps.logger.warn(`Provisioner: failed to launch ${role} worker ${workerId}: ${message}`); } } private async terminateManagedWorker( workerId: string, reason: string, terminalStatus: Extract, ): Promise { if (!this.backend) { return; } const state = this.deps.getTeamState(); const record = state?.provisioning?.workers?.[workerId]; if (!record) { return; } this.deps.updateTeamState((draft) => { const current = ensureProvisioningState(draft).workers[workerId]; if (!current) { return; } current.status = "terminating"; current.updatedAt = Date.now(); if (draft.workers[workerId]) { draft.workers[workerId].status = "offline"; draft.workers[workerId].currentTaskId = undefined; } }); try { await this.backend.terminate(record); } catch (err) { this.deps.logger.warn( `Provisioner: backend terminate failed for ${workerId}: ${err instanceof Error ? err.message : String(err)}`, ); } this.deps.updateTeamState((draft) => { const current = ensureProvisioningState(draft).workers[workerId]; if (!current) { return; } current.status = terminalStatus; current.updatedAt = Date.now(); delete current.idleSince; if (terminalStatus === "failed") { current.lastError = reason; } else { delete current.lastError; } }); } private getProvisionableRoles(state: TeamState | null): RoleId[] { const roleIds = new Set( this.deps.config.workerProvisioningRoles.length > 0 ? this.deps.config.workerProvisioningRoles : ROLES.map((role) => role.id), ); if (!state) { return [...roleIds]; } for (const task of Object.values(state.tasks)) { if (task.status !== "pending" && task.status !== "assigned") { continue; } const taskRole = this.inferTaskRole(task); if (taskRole) { roleIds.add(taskRole); } } for (const worker of Object.values(state.workers)) { roleIds.add(worker.role); } for (const record of Object.values(state.provisioning?.workers ?? {})) { roleIds.add(record.role); } return [...roleIds]; } private countPendingTasksForRole(state: TeamState, role: RoleId): number { return Object.values(state.tasks).filter((task) => this.doesTaskNeedRole(task, state, role)).length; } private doesTaskNeedRole(task: TaskInfo, state: TeamState, role: RoleId): boolean { if (task.status !== "pending" && task.status !== "assigned") { return false; } if (task.assignedWorkerId) { const assignedWorker = state.workers[task.assignedWorkerId]; if (assignedWorker && assignedWorker.status !== "offline") { return false; } } if (task.assignedRole) { return task.assignedRole === role; } return this.inferTaskRole(task) === role; } private inferTaskRole(task: TaskInfo): RoleId | null { return inferTaskRole(task); } private computeRoleDemand(state: TeamState, role: RoleId): number { const pendingDemand = this.countPendingTasksForRole(state, role); const activeWorkers = Object.values(state.workers).filter( (worker) => worker.role === role && worker.status !== "offline", ); const idleWorkers = activeWorkers.filter((worker) => worker.status === "idle").length; const launchingWorkers = Object.values(state.provisioning?.workers ?? {}).filter( (record) => record.role === role && record.status === "launching", ).length; const warmShortfall = Math.max( 0, this.deps.config.workerProvisioningMinPerRole - (activeWorkers.length + launchingWorkers), ); const queueDrivenNeed = Math.max(0, pendingDemand - idleWorkers - launchingWorkers); const cap = Math.max( 0, this.deps.config.workerProvisioningMaxPerRole - activeWorkers.length - launchingWorkers, ); return Math.min(cap, Math.max(warmShortfall, queueDrivenNeed)); } private resolveControllerUrl(): string { if (this.backend?.type === "process") { // Process workers run on the same host — always use loopback regardless // of workerProvisioningControllerUrl (which may target Docker DNS). const port = this.deps.actualControllerPort ?? this.deps.config.port; return `http://127.0.0.1:${port}`; } if (this.deps.config.workerProvisioningControllerUrl) { return this.deps.config.workerProvisioningControllerUrl; } throw new Error( `workerProvisioningControllerUrl is required when workerProvisioningType=${this.backend?.type}`, ); } private buildForwardedEnv(controllerUrl: string): Record { const env: Record = { ...this.deps.config.workerProvisioningExtraEnv, }; for (const name of this.deps.config.workerProvisioningPassEnv) { const value = process.env[name]; if (typeof value === "string" && value.length > 0) { env[name] = value; } } return appendNoProxyEntries(env, controllerUrl); } private async loadBaseOpenClawConfig(): Promise> { if (!this.baseConfigPromise) { this.baseConfigPromise = loadOpenClawConfig(resolveDefaultOpenClawConfigPath()); } return cloneJson(await this.baseConfigPromise); } private refreshProvisioningState(state: TeamState, now: number): boolean { let changed = !state.provisioning || typeof state.provisioning !== "object"; const provisioning = ensureProvisioningState(state); for (const [workerId, record] of Object.entries(provisioning.workers)) { const worker = state.workers[workerId]; if (worker && worker.status !== "offline") { if (record.status === "launching") { record.status = "registered"; record.registeredAt = record.registeredAt ?? worker.registeredAt ?? now; changed = true; } if (record.status === "registered") { if (worker.status === "idle") { if (!record.idleSince) { record.idleSince = now; changed = true; } } else if (record.idleSince) { delete record.idleSince; changed = true; } if (record.updatedAt < worker.lastHeartbeat) { record.updatedAt = worker.lastHeartbeat; changed = true; } } } if ((record.status === "failed" || record.status === "terminated") && now - record.updatedAt > PROVISIONING_RECORD_RETENTION_MS) { delete provisioning.workers[workerId]; changed = true; } } return changed; } } class ProcessProvisioner implements WorkerProvisionerBackend { readonly type = "process" as const; private readonly logger: PluginLogger; private readonly processByWorkerId = new Map(); private readonly baseDirPromise: Promise; constructor(logger: PluginLogger) { this.logger = logger; const provisionedRoot = path.join(resolveDefaultTeamClawRuntimeRootDir(), "provisioned-workers"); this.baseDirPromise = fs.mkdir(provisionedRoot, { recursive: true }) .then(() => fs.mkdtemp(path.join(provisionedRoot, "session-"))); } async launch(spec: LaunchSpec): Promise { const baseDir = await this.baseDirPromise; const runtimeHomeDir = await fs.mkdtemp(path.join(baseDir, `${sanitizePathSegment(spec.role)}-`)); const stateDir = path.join(runtimeHomeDir, ".openclaw"); const configPath = path.join(stateDir, "openclaw.json"); await fs.mkdir(stateDir, { recursive: true }); await prepareProcessRuntimeExtensions(stateDir); await fs.writeFile(configPath, spec.configJson, "utf8"); const gatewayEntrypoint = resolveGatewayEntrypoint(); const child = spawn(process.execPath, [ gatewayEntrypoint, "gateway", "--allow-unconfigured", "--bind", "loopback", "--port", String(spec.gatewayPort), ], { cwd: path.dirname(gatewayEntrypoint), env: { ...process.env, ...spec.env, HOME: runtimeHomeDir, OPENCLAW_HOME: runtimeHomeDir, OPENCLAW_STATE_DIR: stateDir, OPENCLAW_CONFIG_PATH: configPath, OPENCLAW_SKIP_CANVAS_HOST: "1", TEAMCLAW_WORKER_ID: spec.workerId, TEAMCLAW_LAUNCH_TOKEN: spec.launchToken, ...(spec.workspaceDir ? { TEAMCLAW_WORKSPACE_DIR: spec.workspaceDir } : {}), }, stdio: ["ignore", "pipe", "pipe"], }); this.processByWorkerId.set(spec.workerId, child); attachChildLogs(child, this.logger, `ProvisionedWorker[${spec.role}]`); child.on("exit", (code: number | null, signal: string | null) => { this.processByWorkerId.delete(spec.workerId); this.logger.info( `Provisioner: process worker ${spec.workerId} exited (code=${String(code)}, signal=${String(signal)})`, ); }); return { instanceId: child.pid ? `pid:${child.pid}` : undefined, instanceName: spec.workerId, runtimeHomeDir, }; } async terminate(record: ProvisionedWorkerRecord): Promise { const child = this.processByWorkerId.get(record.workerId); if (child) { await stopChildProcess(child); this.processByWorkerId.delete(record.workerId); } else if (record.instanceId?.startsWith("pid:")) { const pid = Number(record.instanceId.slice("pid:".length)); if (Number.isFinite(pid) && pid > 0) { try { process.kill(pid, "SIGTERM"); } catch { // ignore } } } if (record.runtimeHomeDir) { await fs.rm(record.runtimeHomeDir, { recursive: true, force: true }).catch(() => { // ignore }); } } async stop(): Promise { for (const child of this.processByWorkerId.values()) { await stopChildProcess(child).catch(() => { // ignore }); } this.processByWorkerId.clear(); } } class DockerProvisioner implements WorkerProvisionerBackend { readonly type = "docker" as const; private readonly config: PluginConfig; private readonly logger: PluginLogger; private readonly client: DockerApiClient; constructor(config: PluginConfig, logger: PluginLogger) { this.config = config; this.logger = logger; this.client = new DockerApiClient(); } async launch(spec: LaunchSpec): Promise { if (!this.config.workerProvisioningImage) { throw new Error("workerProvisioningImage is required for docker provisioning"); } const instanceName = buildManagedInstanceName(this.config.teamName, spec.role, spec.workerId); const env: Record = { ...spec.env, HOME: "/home/node", OPENCLAW_HOME: "/home/node", OPENCLAW_STATE_DIR: "/home/node/.openclaw", OPENCLAW_CONFIG_PATH: "/home/node/.openclaw/openclaw.json", OPENCLAW_SKIP_CANVAS_HOST: "1", TEAMCLAW_BOOTSTRAP_CONFIG_B64: Buffer.from(spec.configJson, "utf8").toString("base64"), TEAMCLAW_WORKER_ID: spec.workerId, TEAMCLAW_LAUNCH_TOKEN: spec.launchToken, ...(spec.workspaceDir ? { TEAMCLAW_WORKSPACE_DIR: spec.workspaceDir } : {}), }; // When workers share a Docker network with the controller container, advertise // the container name and internal worker port so sibling containers can reach it // directly. When the controller runs on the host, fall back to publishing a host // port and advertising localhost for host-side callbacks. const usePortPublishing = spec.publishedHostPort !== undefined; if (this.config.workerProvisioningDockerNetwork) { env.TEAMCLAW_ADVERTISE_HOST = instanceName; env.TEAMCLAW_ADVERTISE_PORT = String(spec.workerPort); } else if (usePortPublishing) { env.TEAMCLAW_ADVERTISE_HOST = "localhost"; env.TEAMCLAW_ADVERTISE_PORT = String(spec.publishedHostPort); } const hostConfig: Record = { Binds: buildDockerBinds(this.config), NetworkMode: this.config.workerProvisioningDockerNetwork || undefined, }; if (usePortPublishing) { hostConfig.PortBindings = { [`${spec.workerPort}/tcp`]: [{ HostPort: String(spec.publishedHostPort) }], }; } const containerConfig: Record = { Image: this.config.workerProvisioningImage, Cmd: ["sh", "-lc", buildContainerBootstrapScript()], Env: Object.entries(env).map(([key, value]) => `${key}=${value}`), User: "root", Labels: { "teamclaw.managed": "true", "teamclaw.team": this.config.teamName, "teamclaw.role": spec.role, "teamclaw.worker_id": spec.workerId, }, HostConfig: hostConfig, }; if (usePortPublishing) { containerConfig.ExposedPorts = { [`${spec.workerPort}/tcp`]: {} }; } const response = await this.client.requestJson<{ Id?: string }>( "POST", `/containers/create?name=${encodeURIComponent(instanceName)}`, containerConfig, [201], ); const instanceId = typeof response.Id === "string" ? response.Id : undefined; if (!instanceId) { throw new Error("Docker create did not return a container ID"); } await this.client.requestVoid("POST", `/containers/${instanceId}/start`, undefined, [204]); this.logger.info(`Provisioner: started docker worker container ${instanceName} (${instanceId})`); return { instanceId, instanceName, }; } async terminate(record: ProvisionedWorkerRecord): Promise { const target = record.instanceId || record.instanceName; if (!target) { return; } await this.client.requestVoid("DELETE", `/containers/${target}?force=1`, undefined, [204, 404]); } } /** * Lightweight Kubernetes API client using in-cluster service account or * kubeconfig for out-of-cluster (via kubectl proxy or direct API). * Avoids requiring kubectl binary in the container image. */ class K8sApiClient { private token: string | undefined; private caCert: Buffer | undefined; private apiServer: string; private contextArgs: string; constructor(context: string, private readonly logger: PluginLogger) { this.contextArgs = context; // In-cluster detection: K8s injects these env vars and mounts SA token const host = process.env.KUBERNETES_SERVICE_HOST; const port = process.env.KUBERNETES_SERVICE_PORT; if (host && port) { this.apiServer = `https://${host}:${port}`; try { const fs = require("node:fs"); this.token = fs.readFileSync("/var/run/secrets/kubernetes.io/serviceaccount/token", "utf8").trim(); this.caCert = fs.readFileSync("/var/run/secrets/kubernetes.io/serviceaccount/ca.crt"); } catch { logger.warn("K8sApiClient: in-cluster but cannot read service account token — falling back to kubectl"); } } else { // Out-of-cluster: delegate to kubectl this.apiServer = ""; } } private get inCluster(): boolean { return Boolean(this.apiServer && this.token); } async createPod(namespace: string, manifest: unknown): Promise { if (this.inCluster) { await this.apiRequest("POST", `/api/v1/namespaces/${namespace}/pods`, manifest, [200, 201]); } else { await runCommand("kubectl", [ ...buildKubectlContextArgs(this.contextArgs), "apply", "-f", "-", ], JSON.stringify(manifest)); } } async deletePod(namespace: string, podName: string): Promise { if (this.inCluster) { await this.apiRequest("DELETE", `/api/v1/namespaces/${namespace}/pods/${podName}?gracePeriodSeconds=0`, undefined, [200, 202, 404]); } else { await runCommand("kubectl", [ ...buildKubectlContextArgs(this.contextArgs), "-n", namespace, "delete", "pod", podName, "--ignore-not-found=true", "--grace-period=0", "--force", ]); } } private async apiRequest(method: string, apiPath: string, body: unknown, okStatuses: number[]): Promise { const payload = body !== undefined ? JSON.stringify(body) : undefined; return new Promise((resolve, reject) => { const url = new URL(apiPath, this.apiServer); const options: https.RequestOptions = { method, hostname: url.hostname, port: url.port, path: url.pathname + url.search, headers: { "Authorization": `Bearer ${this.token}`, ...(payload ? { "Content-Type": "application/json", "Content-Length": Buffer.byteLength(payload) } : {}), }, ...(this.caCert ? { ca: this.caCert } : { rejectUnauthorized: false }), }; const req = https.request(options, (res) => { let data = ""; res.on("data", (chunk: string) => { data += chunk; }); res.on("end", () => { if (okStatuses.includes(res.statusCode ?? 0)) { resolve(data); } else { reject(new Error(`K8s API ${method} ${apiPath} returned ${res.statusCode}: ${data.slice(0, 500)}`)); } }); }); req.on("error", reject); if (payload) req.write(payload); req.end(); }); } } class KubernetesProvisioner implements WorkerProvisionerBackend { readonly type = "kubernetes" as const; private readonly config: PluginConfig; private readonly logger: PluginLogger; private readonly k8sApi: K8sApiClient; constructor(config: PluginConfig, logger: PluginLogger) { this.config = config; this.logger = logger; this.k8sApi = new K8sApiClient(config.workerProvisioningKubernetesContext, logger); } async launch(spec: LaunchSpec): Promise { if (!this.config.workerProvisioningImage) { throw new Error("workerProvisioningImage is required for kubernetes provisioning"); } const instanceName = buildManagedInstanceName(this.config.teamName, spec.role, spec.workerId); const ns = this.config.workerProvisioningKubernetesNamespace; const env = { ...spec.env, HOME: "/home/node", OPENCLAW_HOME: "/home/node", OPENCLAW_STATE_DIR: "/home/node/.openclaw", OPENCLAW_CONFIG_PATH: "/home/node/.openclaw/openclaw.json", OPENCLAW_SKIP_CANVAS_HOST: "1", TEAMCLAW_BOOTSTRAP_CONFIG_B64: Buffer.from(spec.configJson, "utf8").toString("base64"), TEAMCLAW_WORKER_ID: spec.workerId, TEAMCLAW_LAUNCH_TOKEN: spec.launchToken, ...(spec.workspaceDir ? { TEAMCLAW_WORKSPACE_DIR: spec.workspaceDir } : {}), }; const workspaceRoot = this.config.workerProvisioningWorkspaceRoot; const hasPersistentWorkspace = Boolean( workspaceRoot && this.config.workerProvisioningKubernetesWorkspacePersistentVolumeClaim, ); const imagePullSecrets = this.config.workerProvisioningKubernetesImagePullSecrets .map((name) => name.trim()) .filter((name) => name.length > 0) .map((name) => ({ name })); const manifest = { apiVersion: "v1", kind: "Pod", metadata: { name: instanceName, namespace: ns, labels: { app: "teamclaw-worker", "teamclaw.managed": "true", "teamclaw.team": sanitizeName(this.config.teamName, 40), "teamclaw.role": sanitizeName(spec.role, 40), ...this.config.workerProvisioningKubernetesLabels, }, annotations: { ...this.config.workerProvisioningKubernetesAnnotations, }, }, spec: { restartPolicy: "Never", hostname: buildManagedHostname(this.config.teamName, spec.role, spec.workerId), serviceAccountName: this.config.workerProvisioningKubernetesServiceAccount || undefined, imagePullSecrets: imagePullSecrets.length > 0 ? imagePullSecrets : undefined, securityContext: hasPersistentWorkspace ? { runAsUser: 1000, runAsGroup: 1000, fsGroup: 1000, } : undefined, volumes: hasPersistentWorkspace ? [ { name: "workspace", persistentVolumeClaim: { claimName: this.config.workerProvisioningKubernetesWorkspacePersistentVolumeClaim, }, }, ] : undefined, containers: [ { name: "worker", image: this.config.workerProvisioningImage, command: ["sh", "-lc"], args: [buildContainerBootstrapScript()], env: Object.entries(env).map(([name, value]) => ({ name, value })), volumeMounts: hasPersistentWorkspace ? [ { name: "workspace", mountPath: workspaceRoot, }, ] : undefined, }, ], }, }; await this.k8sApi.createPod(ns, manifest); this.logger.info(`Provisioner: applied kubernetes pod ${instanceName}`); return { instanceId: instanceName, instanceName, }; } async terminate(record: ProvisionedWorkerRecord): Promise { const podName = record.instanceName || record.instanceId; if (!podName) { return; } const ns = this.config.workerProvisioningKubernetesNamespace; await this.k8sApi.deletePod(ns, podName); } } class DockerApiClient { private readonly endpoint: DockerEndpoint; constructor() { this.endpoint = resolveDockerEndpoint(); } async requestJson(method: string, requestPath: string, body?: unknown, okStatuses: number[] = [200]): Promise { const response = await this.request(method, requestPath, body, okStatuses); if (!response.body) { return {} as T; } return JSON.parse(response.body) as T; } async requestVoid(method: string, requestPath: string, body?: unknown, okStatuses: number[] = [200]): Promise { await this.request(method, requestPath, body, okStatuses); } private async request( method: string, requestPath: string, body: unknown, okStatuses: number[], ): Promise<{ status: number; body: string }> { const payload = body === undefined ? undefined : JSON.stringify(body); // Prefer the daemon's negotiated default unless the operator pins a version explicitly. const finalPath = DOCKER_API_VERSION ? `/${DOCKER_API_VERSION}${requestPath}` : requestPath; return await new Promise<{ status: number; body: string }>((resolve, reject) => { const transport = this.endpoint.protocol === "https:" ? https : http; const req = transport.request({ method, socketPath: this.endpoint.socketPath, hostname: this.endpoint.hostname, port: this.endpoint.port, path: finalPath, headers: payload ? { "Content-Type": "application/json", "Content-Length": Buffer.byteLength(payload), } : undefined, }, (res: any) => { const chunks: Buffer[] = []; res.on("data", (chunk: Buffer) => chunks.push(chunk)); res.on("end", () => { const text = Buffer.concat(chunks).toString("utf8"); const status = res.statusCode ?? 500; if (!okStatuses.includes(status)) { const message = extractDockerErrorMessage(text) || `Docker API ${method} ${requestPath} failed with ${status}`; reject(new Error(message)); return; } resolve({ status, body: text }); }); }); req.on("error", reject); if (payload) { req.write(payload); } req.end(); }); } } type DockerEndpoint = { protocol: "http:" | "https:"; socketPath?: string; hostname?: string; port?: number; }; function resolveDockerApiVersion(): string | null { const configured = process.env.DOCKER_API_VERSION?.trim(); if (!configured) { return null; } return configured.startsWith("v") ? configured : `v${configured}`; } function createProvisionerBackend( config: PluginConfig, logger: PluginLogger, ): WorkerProvisionerBackend | null { switch (config.workerProvisioningType) { case "process": return new ProcessProvisioner(logger); case "docker": return new DockerProvisioner(config, logger); case "kubernetes": return new KubernetesProvisioner(config, logger); case "none": default: return null; } } function buildProvisionedWorkerConfig( baseConfig: Record, controllerConfig: PluginConfig, spec: { role: RoleId; controllerUrl: string; workerPort: number; gatewayPort: number; workspaceDir?: string; }, ): Record { const config = cloneJson(baseConfig); delete config.channels; const agents = ensureRecord(config.agents); const agentDefaults = ensureRecord(agents.defaults); delete agentDefaults.repoRoot; if (spec.workspaceDir) { agentDefaults.workspace = spec.workspaceDir; } else { delete agentDefaults.workspace; } agents.defaults = agentDefaults; config.agents = agents; const gateway = ensureRecord(config.gateway); gateway.mode = "local"; // Workers running inside Docker/K8s containers must bind on all // interfaces so the controller (in a sibling container) can reach them. gateway.bind = controllerConfig.workerProvisioningType === "docker" || controllerConfig.workerProvisioningType === "kubernetes" ? "lan" : "loopback"; gateway.port = spec.gatewayPort; delete gateway.remote; config.gateway = gateway; const plugins = ensureRecord(config.plugins); plugins.enabled = true; if (controllerConfig.workerProvisioningType === "docker" || controllerConfig.workerProvisioningType === "kubernetes") { delete plugins.load; } const entries = ensureRecord(plugins.entries); const teamclawEntry = ensureRecord(entries.teamclaw); teamclawEntry.enabled = true; const teamclawConfig = ensureRecord(teamclawEntry.config); teamclawConfig.mode = "worker"; teamclawConfig.processModel = "multi"; teamclawConfig.role = spec.role; teamclawConfig.port = spec.workerPort; teamclawConfig.controllerUrl = spec.controllerUrl; teamclawConfig.teamName = controllerConfig.teamName; teamclawConfig.heartbeatIntervalMs = controllerConfig.heartbeatIntervalMs; teamclawConfig.taskTimeoutMs = controllerConfig.taskTimeoutMs; teamclawConfig.gitEnabled = controllerConfig.gitEnabled; teamclawConfig.gitRemoteUrl = controllerConfig.gitRemoteUrl; teamclawConfig.gitDefaultBranch = controllerConfig.gitDefaultBranch; teamclawConfig.gitAuthorName = controllerConfig.gitAuthorName; teamclawConfig.gitAuthorEmail = controllerConfig.gitAuthorEmail; teamclawConfig.workerProvisioningType = "none"; teamclawConfig.workerProvisioningControllerUrl = ""; teamclawConfig.workerProvisioningRoles = []; teamclawConfig.workerProvisioningMinPerRole = 0; teamclawConfig.workerProvisioningMaxPerRole = 10; teamclawConfig.workerProvisioningIdleTtlMs = controllerConfig.workerProvisioningIdleTtlMs; teamclawConfig.workerProvisioningStartupTimeoutMs = controllerConfig.workerProvisioningStartupTimeoutMs; teamclawConfig.workerProvisioningImage = ""; teamclawConfig.workerProvisioningPassEnv = []; teamclawConfig.workerProvisioningExtraEnv = {}; teamclawConfig.workerProvisioningDockerNetwork = ""; teamclawConfig.workerProvisioningDockerMounts = []; teamclawConfig.workerProvisioningWorkspaceRoot = ""; teamclawConfig.workerProvisioningDockerWorkspaceVolume = ""; teamclawConfig.workerProvisioningKubernetesNamespace = "default"; teamclawConfig.workerProvisioningKubernetesContext = ""; teamclawConfig.workerProvisioningKubernetesServiceAccount = ""; teamclawConfig.workerProvisioningKubernetesImagePullSecrets = []; teamclawConfig.workerProvisioningKubernetesWorkspacePersistentVolumeClaim = ""; teamclawConfig.workerProvisioningKubernetesLabels = {}; teamclawConfig.workerProvisioningKubernetesAnnotations = {}; teamclawEntry.config = teamclawConfig; entries.teamclaw = teamclawEntry; plugins.entries = entries; config.plugins = plugins; return config; } function ensureProvisioningState(state: TeamState): TeamProvisioningState { if (!state.provisioning || typeof state.provisioning !== "object") { state.provisioning = { workers: {} }; } if (!state.provisioning.workers || typeof state.provisioning.workers !== "object") { state.provisioning.workers = {}; } return state.provisioning; } async function ensureWritableDirectory(dir: string): Promise { await fs.mkdir(dir, { recursive: true }); const probePath = path.join(dir, `.teamclaw-write-probe-${process.pid}-${Date.now()}`); await fs.writeFile(probePath, "ok\n", "utf8"); await fs.rm(probePath, { force: true }); } function sleep(ms: number): Promise { return new Promise((resolve) => setTimeout(resolve, ms)); } async function loadOpenClawConfig(configPath: string): Promise> { const raw = await fs.readFile(configPath, "utf8"); return parseLooseJsonObject(raw, configPath); } async function prepareProcessRuntimeExtensions(stateDir: string): Promise { const runtimeExtensionsDir = path.join(stateDir, "extensions"); const controllerExtensionsDir = path.join(path.dirname(resolveDefaultOpenClawConfigPath()), "extensions"); if (await pathExists(controllerExtensionsDir)) { await fs.symlink(controllerExtensionsDir, runtimeExtensionsDir, "dir"); } else { await fs.mkdir(runtimeExtensionsDir, { recursive: true }); await fs.symlink(resolveCurrentTeamClawPluginRootDir(), path.join(runtimeExtensionsDir, "teamclaw"), "dir"); } await copyControllerAuthProfiles(stateDir); } async function copyControllerAuthProfiles(stateDir: string): Promise { const candidateControllerStateDirs = [ path.dirname(resolveDefaultOpenClawConfigPath()), resolveDefaultOpenClawStateDir(), ]; const candidateAuthProfilePaths = [ path.join(resolveTeamClawAgentDir(), "auth-profiles.json"), path.join(resolveDefaultAgentDir(), "auth-profiles.json"), ...candidateControllerStateDirs.flatMap((controllerStateDir) => [ path.join(controllerStateDir, "agents", TEAMCLAW_AGENT_ID, "agent", "auth-profiles.json"), path.join(controllerStateDir, "agents", "main", "agent", "auth-profiles.json"), ]), ].filter((value, index, values) => values.indexOf(value) === index); let controllerAuthProfilesPath: string | undefined; for (const candidatePath of candidateAuthProfilePaths) { if (await pathExists(candidatePath)) { controllerAuthProfilesPath = candidatePath; break; } } if (!controllerAuthProfilesPath) { return; } const runtimeAuthProfilePaths = [ path.join(stateDir, "agents", TEAMCLAW_AGENT_ID, "agent", "auth-profiles.json"), path.join(stateDir, "agents", "main", "agent", "auth-profiles.json"), ]; for (const runtimeAuthProfilesPath of runtimeAuthProfilePaths) { await fs.mkdir(path.dirname(runtimeAuthProfilesPath), { recursive: true }); await fs.copyFile(controllerAuthProfilesPath, runtimeAuthProfilesPath); } } function cloneJson(value: T): T { return JSON.parse(JSON.stringify(value)) as T; } function requiresDedicatedHostPortsForProvisioner( provider: WorkerProvisioningType, config: PluginConfig, ): boolean { if (provider === "process") { return true; } return provider === "docker" && isDockerHostNetwork(config.workerProvisioningDockerNetwork); } function isDockerHostNetwork(networkMode: string): boolean { return networkMode.trim().toLowerCase() === "host"; } function extractDockerBindTarget(bind: string): string | null { for (const part of bind.split(":").reverse()) { const trimmed = part.trim(); if (trimmed.startsWith("/")) { return trimmed; } } return null; } function appendNoProxyEntries(env: Record, controllerUrl: string): Record { const requiredEntries = resolveRequiredNoProxyEntries(controllerUrl); if (requiredEntries.length === 0) { return env; } const upperBase = env.NO_PROXY ?? process.env.NO_PROXY ?? process.env.no_proxy ?? ""; const lowerBase = env.no_proxy ?? process.env.no_proxy ?? env.NO_PROXY ?? process.env.NO_PROXY ?? ""; env.NO_PROXY = mergeNoProxyEntries(upperBase, requiredEntries); env.no_proxy = mergeNoProxyEntries(lowerBase, requiredEntries); return env; } function resolveRequiredNoProxyEntries(controllerUrl: string): string[] { const required = new Set(["localhost", "127.0.0.1", "::1", "[::1]"]); try { const host = new URL(controllerUrl).hostname.trim(); if (host) { required.add(host); } } catch { // Ignore malformed controller URLs; worker registration will fail separately. } return Array.from(required); } function mergeNoProxyEntries(existing: string, requiredEntries: string[]): string { const tokens = new Map(); for (const entry of existing.split(",")) { const trimmed = entry.trim(); if (!trimmed) { continue; } tokens.set(trimmed.toLowerCase(), trimmed); } for (const entry of requiredEntries) { const trimmed = entry.trim(); if (!trimmed) { continue; } tokens.set(trimmed.toLowerCase(), trimmed); } return Array.from(tokens.values()).join(","); } async function pathExists(targetPath: string): Promise { try { await fs.access(targetPath); return true; } catch (err) { if ((err as NodeJS.ErrnoException).code === "ENOENT") { return false; } throw err; } } function ensureRecord(value: unknown): Record { return value && typeof value === "object" && !Array.isArray(value) ? { ...(value as Record) } : {}; } function parseLooseJsonObject(raw: string, configPath: string): Record { try { return JSON5.parse(raw) as Record; } catch (err) { throw new Error( `Failed to parse OpenClaw config at ${configPath}: ${err instanceof Error ? err.message : String(err)}`, ); } } async function reserveEphemeralPort(): Promise { return await new Promise((resolve, reject) => { const server = net.createServer(); server.listen(0, "127.0.0.1", () => { const address = server.address(); const port = address && typeof address === "object" ? address.port : 0; server.close((err: Error | undefined | null) => { if (err) { reject(err); return; } if (!port) { reject(new Error("Failed to reserve ephemeral port")); return; } resolve(port); }); }); server.on("error", reject); }); } function resolveGatewayEntrypoint(): string { const scriptPath = process.argv[1]; if (!scriptPath) { throw new Error("Unable to resolve OpenClaw gateway entrypoint"); } return path.resolve(scriptPath); } function resolveCurrentTeamClawPluginRootDir(): string { return process.env.TEAMCLAW_BAKED_IN === "true" ? "" : path.resolve(fileURLToPath(new URL("../../", import.meta.url))); } function attachChildLogs(child: ChildProcess, logger: PluginLogger, prefix: string): void { if (child.stdout) { const stdoutReader = readline.createInterface({ input: child.stdout }); stdoutReader.on("line", (line: string) => { logger.info(`${prefix}: ${line}`); }); } if (child.stderr) { const stderrReader = readline.createInterface({ input: child.stderr }); stderrReader.on("line", (line: string) => { logger.warn(`${prefix}: ${line}`); }); } } async function stopChildProcess(child: ChildProcess): Promise { if (child.exitCode !== null || child.signalCode !== null) { return; } await new Promise((resolve) => { const timeout = setTimeout(() => { if (child.exitCode === null && child.signalCode === null && child.pid) { try { process.kill(child.pid, "SIGKILL"); } catch { // ignore } } resolve(); }, PROCESS_TERMINATION_TIMEOUT_MS); const timer = timeout as unknown as { unref?: () => void }; timer.unref?.(); child.once("exit", () => { clearTimeout(timeout); resolve(); }); child.kill("SIGTERM"); }); } function sanitizePathSegment(value: string): string { const normalized = value.toLowerCase().replace(/[^a-z0-9-]+/g, "-").replace(/^-+|-+$/g, ""); return normalized || "worker"; } function sanitizeName(value: string, maxLength = 63): string { const normalized = sanitizePathSegment(value).slice(0, maxLength).replace(/^-+|-+$/g, ""); return normalized || "teamclaw"; } function buildManagedInstanceName(teamName: string, role: RoleId, workerId: string): string { return buildManagedName("teamclaw", teamName, role, workerId, { teamBudget: 18, roleBudget: 12, workerBudget: 12, hashLength: 8, maxLength: 63, }); } function buildManagedHostname(teamName: string, role: RoleId, workerId: string): string { return buildManagedName("tc", teamName, role, workerId, { teamBudget: 10, roleBudget: 8, workerBudget: 6, hashLength: 6, maxLength: 40, }); } function buildManagedName( prefix: string, teamName: string, role: RoleId, workerId: string, options: { teamBudget: number; roleBudget: number; workerBudget: number; hashLength: number; maxLength: number; }, ): string { const teamPart = sanitizeLeadingSegment(teamName, options.teamBudget, "team"); const rolePart = sanitizeLeadingSegment(role, options.roleBudget, "worker"); const workerPart = sanitizeTrailingSegment(workerId, options.workerBudget, "worker"); const hash = shortStableHash(`${teamName}:${role}:${workerId}`).slice(0, options.hashLength); return sanitizeName(`${prefix}-${teamPart}-${rolePart}-${workerPart}-${hash}`, options.maxLength); } function shortStableHash(value: string): string { return createHash("sha1").update(value).digest("hex"); } function sanitizeLeadingSegment(value: string, maxLength: number, fallback: string): string { const normalized = sanitizePathSegment(value).slice(0, maxLength).replace(/^-+|-+$/g, ""); return normalized || fallback; } function sanitizeTrailingSegment(value: string, maxLength: number, fallback: string): string { const normalized = sanitizePathSegment(value).slice(-maxLength).replace(/^-+|-+$/g, ""); return normalized || fallback; } function buildContainerBootstrapScript(): string { return [ "set -eu", "mkdir -p \"$OPENCLAW_STATE_DIR\"", "if [ -n \"${TEAMCLAW_WORKSPACE_DIR:-}\" ]; then mkdir -p \"$TEAMCLAW_WORKSPACE_DIR\"; fi", "node -e 'const fs=require(\"fs\"); const configPath=process.env.OPENCLAW_CONFIG_PATH; const raw=Buffer.from(process.env.TEAMCLAW_BOOTSTRAP_CONFIG_B64||\"\", \"base64\").toString(\"utf8\"); fs.mkdirSync(require(\"path\").dirname(configPath), { recursive: true }); fs.writeFileSync(configPath, raw);'", "exec node dist/index.js gateway --allow-unconfigured", ].join("\n"); } function buildDockerBinds(config: PluginConfig): string[] { const binds = [...config.workerProvisioningDockerMounts]; if (!binds.some((bind) => extractDockerBindTarget(bind) === DEFAULT_DOCKER_BUNDLED_TEAMCLAW_PLUGIN_DIR)) { const _pd = resolveCurrentTeamClawPluginRootDir(); if (_pd) binds.unshift(`${_pd}:${DEFAULT_DOCKER_BUNDLED_TEAMCLAW_PLUGIN_DIR}:ro`); } if (config.workerProvisioningDockerWorkspaceVolume && config.workerProvisioningWorkspaceRoot) { binds.unshift(`${config.workerProvisioningDockerWorkspaceVolume}:${config.workerProvisioningWorkspaceRoot}`); } return [...new Set(binds)]; } function buildProvisionedWorkspaceDir( provider: WorkerProvisioningType, config: PluginConfig, role: RoleId, workerId: string, ): string { // Process-type workers run on the same host — share the controller's workspace // so that file artifacts (previews, deliverables) are immediately visible to // the controller without requiring git sync or file transfer. if (provider === "process") { return resolveTeamClawAgentWorkspaceRootDir(); } if ( (provider !== "docker" && provider !== "kubernetes") || !config.workerProvisioningWorkspaceRoot ) { return ""; } return path.posix.join( config.workerProvisioningWorkspaceRoot, sanitizePathSegment(config.teamName), sanitizePathSegment(role), sanitizePathSegment(workerId), ); } function getConfiguredWorkerWorkspaceDir(config: Record): string { const agents = ensureRecord(config.agents); const defaults = ensureRecord(agents.defaults); return typeof defaults.workspace === "string" ? defaults.workspace : ""; } function resolveDockerEndpoint(): DockerEndpoint { const dockerHost = process.env.DOCKER_HOST?.trim(); if (!dockerHost) { return { protocol: "http:", socketPath: "/var/run/docker.sock", }; } if (dockerHost.startsWith("unix://")) { return { protocol: "http:", socketPath: dockerHost.slice("unix://".length), }; } const normalized = dockerHost.startsWith("tcp://") ? dockerHost.replace(/^tcp:\/\//, "http://") : dockerHost; const url = new URL(normalized); return { protocol: url.protocol === "https:" ? "https:" : "http:", hostname: url.hostname, port: url.port ? Number(url.port) : (url.protocol === "https:" ? 443 : 2375), }; } function extractDockerErrorMessage(body: string): string | null { if (!body.trim()) { return null; } try { const parsed = JSON.parse(body) as { message?: unknown }; return typeof parsed.message === "string" ? parsed.message : body; } catch { return body; } } async function runCommand(command: string, args: string[], stdin?: string): Promise { await new Promise((resolve, reject) => { const child = spawn(command, args, { stdio: ["pipe", "pipe", "pipe"], env: process.env, }); let stderr = ""; let stdout = ""; if (stdin) { child.stdin?.end(stdin); } else { child.stdin?.end(); } child.stdout?.on("data", (chunk: Uint8Array | string) => { stdout += chunk.toString("utf8"); }); child.stderr?.on("data", (chunk: Uint8Array | string) => { stderr += chunk.toString("utf8"); }); child.on("error", reject); child.on("exit", (code: number | null) => { if (code === 0) { resolve(); return; } reject(new Error(`${command} ${args.join(" ")} failed (${code}): ${(stderr || stdout).trim()}`)); }); }); } function buildKubectlContextArgs(context: string): string[] { return context ? ["--context", context] : []; }