///
import { afterEach, beforeEach, describe, expect, test, vi } from "vitest";
import { api, internal } from "./_generated/api.js";
import { DEFAULT_CONFIG } from "./config.js";
import { initConvexTest } from "./setup.test.js";
import { canTransitionWorkerStatus } from "./workerLifecycle.js";
const TEST_PROVIDER_CONFIG = {
kind: "fly" as const,
appName: "agent-factory-workers-test",
organizationSlug: "personal",
image: "registry.fly.io/agent-factory-workers-test:test-image",
region: "iad",
volumeName: "openclaw_data_test",
volumePath: "/data",
volumeSizeGb: 10,
};
function stableHashBase36(input: string): string {
let hash = 2166136261;
for (let index = 0; index < input.length; index += 1) {
hash ^= input.charCodeAt(index);
hash = Math.imul(hash, 16777619);
}
return (hash >>> 0).toString(36);
}
function buildDedicatedVolumeName(prefix: string, workerId: string) {
const sanitize = (value: string) =>
value
.toLowerCase()
.replace(/[^a-z0-9_]/g, "_")
.replace(/_+/g, "_")
.replace(/^_+|_+$/g, "");
const normalizedPrefix = sanitize(prefix) || "openclaw";
const normalizedWorker = sanitize(workerId) || "worker";
const workerHash = stableHashBase36(normalizedWorker).slice(0, 8);
const maxPrefixLen = 30 - 1 - workerHash.length;
return `${normalizedPrefix.slice(0, Math.max(1, maxPrefixLen))}_${workerHash}`;
}
function jsonResponse(body: unknown, status = 200) {
return new Response(JSON.stringify(body), {
status,
headers: { "Content-Type": "application/json" },
});
}
function emptyResponse(status = 204) {
return new Response(null, { status });
}
describe("component lib", () => {
beforeEach(async () => {
vi.useFakeTimers();
});
afterEach(() => {
vi.useRealTimers();
vi.restoreAllMocks();
vi.unstubAllGlobals();
});
test("worker status transitions should enforce draining before stopping", () => {
expect(canTransitionWorkerStatus("active", "active")).toBe(true);
expect(canTransitionWorkerStatus("active", "draining")).toBe(true);
expect(canTransitionWorkerStatus("active", "stopping")).toBe(false);
expect(canTransitionWorkerStatus("active", "stopped")).toBe(false);
expect(canTransitionWorkerStatus("draining", "stopped")).toBe(false);
});
test("enqueue and claim should respect queue flow", async () => {
const t = initConvexTest();
await t.mutation(api.queue.upsertAgentProfile, {
agentKey: "support-agent",
version: "1.0.0",
secretsRef: ["telegram.botToken"],
enabled: true,
});
const messageId = await t.mutation(api.lib.enqueue, {
conversationId: "telegram:chat:1",
agentKey: "support-agent",
payload: {
provider: "telegram",
providerUserId: "u1",
messageText: "Ciao",
},
});
expect(messageId).toBeDefined();
const claimed = await t.mutation(api.lib.claim, {
workerId: "worker-1",
});
expect(claimed).not.toBeNull();
expect(claimed?.conversationId).toBe("telegram:chat:1");
});
test("minimal agent profile should work when payload provides providerUserId", async () => {
const t = initConvexTest();
await t.mutation(api.queue.upsertAgentProfile, {
agentKey: "minimal-agent",
version: "1.0.0",
secretsRef: [],
enabled: true,
});
const messageId = await t.mutation(api.lib.enqueue, {
conversationId: "telegram:chat:minimal",
agentKey: "minimal-agent",
payload: {
provider: "telegram",
providerUserId: "u-minimal-1",
messageText: "hello",
},
});
const claim = await t.mutation(api.lib.claim, {
workerId: "worker-minimal-1",
});
expect(claim?.messageId).toBe(messageId);
expect(claim?.payload.providerUserId).toBe("u-minimal-1");
const bundle = await t.query(api.lib.getHydrationBundle, {
messageId,
workspaceId: "default",
});
expect(bundle).not.toBeNull();
expect(bundle?.payload.providerUserId).toBe("u-minimal-1");
expect(bundle?.bridgeRuntimeConfig).toBeNull();
});
test("enqueue should append global system prompt to queued message", async () => {
const t = initConvexTest();
await t.mutation(api.queue.upsertAgentProfile, {
agentKey: "system-prompt-agent",
version: "1.0.0",
secretsRef: [],
enabled: true,
});
await t.mutation(api.lib.setMessageRuntimeConfig, {
messageConfig: {
systemPrompt: " Rispondi sempre con un breve riassunto finale. ",
},
});
const storedMessageConfig = await t.query(api.lib.messageRuntimeConfig, {});
expect(storedMessageConfig).toEqual({
systemPrompt: "Rispondi sempre con un breve riassunto finale.",
});
const messageId = await t.mutation(api.lib.enqueue, {
conversationId: "telegram:chat:system-prompt",
agentKey: "system-prompt-agent",
payload: {
provider: "telegram",
providerUserId: "u-system-prompt-1",
messageText: "Come va?",
},
});
const claim = await t.mutation(api.lib.claim, {
workerId: "worker-system-prompt-1",
});
expect(claim?.messageId).toBe(messageId);
expect(claim?.payload.messageText).toBe(
"Come va?\n\nRispondi sempre con un breve riassunto finale.",
);
});
test("blank global system prompt should not modify queued messages", async () => {
const t = initConvexTest();
await t.mutation(api.queue.upsertAgentProfile, {
agentKey: "blank-system-prompt-agent",
version: "1.0.0",
secretsRef: [],
enabled: true,
});
await t.mutation(api.lib.setMessageRuntimeConfig, {
messageConfig: {
systemPrompt: " ",
},
});
const storedMessageConfig = await t.query(api.lib.messageRuntimeConfig, {});
expect(storedMessageConfig).toBeNull();
const messageId = await t.mutation(api.lib.enqueue, {
conversationId: "telegram:chat:blank-system-prompt",
agentKey: "blank-system-prompt-agent",
payload: {
provider: "telegram",
providerUserId: "u-blank-system-prompt-1",
messageText: "hello",
},
});
const claim = await t.mutation(api.lib.claim, {
workerId: "worker-blank-system-prompt-1",
});
expect(claim?.messageId).toBe(messageId);
expect(claim?.payload.messageText).toBe("hello");
});
test("message templates should normalize tags and auto-generate key", async () => {
const t = initConvexTest();
const templateId = await t.mutation(api.lib.createMessageTemplate, {
title: " Status Update ",
text: " Share your latest progress. ",
tags: [" Urgent ", "follow-up", "urgent", " Team "],
actorUserId: "user-admin-1",
});
expect(templateId).toBeDefined();
const templates = await t.query(api.lib.listMessageTemplatesByCompany, {
includeDisabled: true,
});
expect(templates).toHaveLength(1);
expect(templates[0]).toMatchObject({
templateKey: "status-update",
title: "Status Update",
text: "Share your latest progress.",
tags: ["follow-up", "team", "urgent"],
usageCount: 0,
enabled: true,
});
});
test("message templates should generate a unique key globally", async () => {
const t = initConvexTest();
const firstTemplateId = await t.mutation(api.lib.createMessageTemplate, {
title: "Daily check-in",
text: "Share your update.",
tags: [],
actorUserId: "user-admin-1",
});
const secondTemplateId = await t.mutation(api.lib.createMessageTemplate, {
title: "Daily check-in",
text: "Duplicate title",
tags: [],
actorUserId: "user-admin-2",
});
const templates = await t.query(api.lib.listMessageTemplatesByCompany, {
includeDisabled: true,
});
expect(firstTemplateId).toBeDefined();
expect(secondTemplateId).toBeDefined();
expect(templates.map((template) => template.templateKey).sort()).toEqual([
"daily-check-in",
"daily-check-in-2",
]);
});
test("message templates should track usage and surface most used templates first", async () => {
const t = initConvexTest();
await t.mutation(api.queue.upsertAgentProfile, {
agentKey: "template-agent",
version: "1.0.0",
secretsRef: [],
enabled: true,
});
await t.mutation(api.lib.bindUserAgent, {
consumerUserId: "user-template-1",
agentKey: "template-agent",
source: "manual",
metadata: { companyId: "co-templates" },
});
const firstTemplateId = await t.mutation(api.lib.createMessageTemplate, {
title: "Weekly recap",
text: "Condividi il recap della settimana.",
tags: ["weekly", "recap"],
actorUserId: "user-admin-1",
});
const secondTemplateId = await t.mutation(api.lib.createMessageTemplate, {
title: "Quick nudge",
text: "Mandami un aggiornamento rapido.",
tags: ["follow-up"],
actorUserId: "user-admin-1",
});
await t.mutation((api.lib as any).sendMessageTemplateToUserAgent, {
consumerUserId: "user-template-1",
agentKey: "template-agent",
templateId: firstTemplateId,
});
await t.mutation((api.lib as any).sendMessageTemplateToUserAgent, {
consumerUserId: "user-template-1",
agentKey: "template-agent",
templateId: firstTemplateId,
});
await t.mutation((api.lib as any).sendMessageTemplateToUserAgent, {
consumerUserId: "user-template-1",
agentKey: "template-agent",
templateId: secondTemplateId,
});
const templates = await t.query(api.lib.listMessageTemplatesByCompany, {
includeDisabled: true,
});
expect(templates.map((template) => ({
title: template.title,
usageCount: template.usageCount,
}))).toEqual([
{ title: "Weekly recap", usageCount: 2 },
{ title: "Quick nudge", usageCount: 1 },
]);
const queuedItems = await t.query(api.lib.listQueueItemsForUserAgent, {
consumerUserId: "user-template-1",
agentKey: "template-agent",
limit: 10,
});
expect(queuedItems).toHaveLength(3);
expect(queuedItems.map((item) => item.payload.messageText).sort()).toEqual([
"Condividi il recap della settimana.",
"Condividi il recap della settimana.",
"Mandami un aggiornamento rapido.",
].sort());
});
test("message runtime config should store telegram attachment retention", async () => {
const t = initConvexTest();
await t.mutation(api.lib.setMessageRuntimeConfig, {
messageConfig: {
telegramAttachmentRetentionMs: 60_000,
},
});
const storedMessageConfig = await t.query(api.lib.messageRuntimeConfig, {});
expect(storedMessageConfig).toEqual({
telegramAttachmentRetentionMs: 60_000,
});
});
test("enqueue should persist telegram attachments for claimed jobs", async () => {
const t = initConvexTest();
const nowMs = Date.UTC(2026, 0, 1, 9, 0, 0);
vi.setSystemTime(nowMs);
await t.mutation(api.queue.upsertAgentProfile, {
agentKey: "attachment-agent",
version: "1.0.0",
secretsRef: [],
enabled: true,
});
const storageId = await t.run(async (ctx) => {
return await ctx.storage.store(new Blob(["photo-binary"], { type: "image/jpeg" }));
});
const messageId = await t.mutation(api.lib.enqueue, {
conversationId: "telegram:chat:attachment",
agentKey: "attachment-agent",
payload: {
provider: "telegram",
providerUserId: "u-attachment-1",
messageText: "[telegram media] photo message",
attachments: [
{
kind: "photo",
status: "ready",
storageId,
telegramFileId: "telegram-photo-1",
mimeType: "image/jpeg",
sizeBytes: 12,
expiresAt: nowMs + 60_000,
},
],
},
nowMs,
});
const claim = await t.mutation(api.lib.claim, {
workerId: "worker-attachment-1",
nowMs,
});
expect(claim?.messageId).toBe(messageId);
expect(claim?.payload.attachments).toHaveLength(1);
expect(claim?.payload.attachments?.[0]?.telegramFileId).toBe("telegram-photo-1");
const bundle = await t.query(api.lib.getHydrationBundle, {
messageId,
workspaceId: "default",
});
expect(bundle?.payload.attachments).toHaveLength(1);
expect(bundle?.payload.attachments?.[0]?.storageId).toBe(storageId);
expect(bundle?.payload.attachments?.[0]?.downloadUrl).toEqual(expect.any(String));
const attachmentRows = await t.run(async (ctx) => {
return await ctx.db
.query("messageAttachments")
.withIndex("by_messageId", (q) => q.eq("messageId", messageId))
.collect();
});
expect(attachmentRows).toHaveLength(1);
expect(attachmentRows[0]?.status).toBe("ready");
});
test("expireOldTelegramAttachments should expire stored attachments and queued payloads", async () => {
const t = initConvexTest();
const nowMs = Date.UTC(2026, 0, 1, 10, 0, 0);
vi.setSystemTime(nowMs);
await t.mutation(api.queue.upsertAgentProfile, {
agentKey: "attachment-expiry-agent",
version: "1.0.0",
secretsRef: [],
enabled: true,
});
const storageId = await t.run(async (ctx) => {
return await ctx.storage.store(new Blob(["document-binary"], { type: "application/pdf" }));
});
const messageId = await t.mutation(api.queue.enqueueMessage, {
conversationId: "telegram:chat:attachment-expiry",
agentKey: "attachment-expiry-agent",
payload: {
provider: "telegram",
providerUserId: "u-attachment-expiry-1",
messageText: "[telegram media] document message",
attachments: [
{
kind: "document",
status: "ready",
storageId,
telegramFileId: "telegram-document-1",
fileName: "brief.pdf",
mimeType: "application/pdf",
sizeBytes: 15,
expiresAt: nowMs + 10,
},
],
},
nowMs,
});
const expired = await t.mutation((internal.queue as any).expireOldTelegramAttachments, {
nowMs: nowMs + 11,
limit: 10,
});
expect(expired).toBe(1);
const attachmentRows = await t.run(async (ctx) => {
return await ctx.db
.query("messageAttachments")
.withIndex("by_messageId", (q) => q.eq("messageId", messageId))
.collect();
});
expect(attachmentRows[0]?.status).toBe("expired");
const messageRow = await t.run(async (ctx) => {
return await ctx.db.get(messageId);
});
expect(messageRow?.payload.attachments?.[0]?.status).toBe("expired");
});
test("enqueue should fail when providerUserId is blank in payload", async () => {
const t = initConvexTest();
await t.mutation(api.queue.upsertAgentProfile, {
agentKey: "missing-provider-user-agent",
version: "1.0.0",
secretsRef: [],
enabled: true,
});
await expect(
t.mutation(api.queue.enqueueMessage, {
conversationId: "telegram:chat:missing-provider-user",
agentKey: "missing-provider-user-agent",
payload: {
provider: "telegram",
providerUserId: " ",
messageText: "hello",
},
}),
).rejects.toThrow("providerUserId is required but missing in payload");
});
test("identity binding should resolve, rebind and revoke", async () => {
const t = initConvexTest();
await t.mutation(api.queue.upsertAgentProfile, {
agentKey: "agent-a",
version: "1.0.0",
secretsRef: [],
botIdentity: "bot-agent-a",
enabled: true,
});
await t.mutation(api.queue.upsertAgentProfile, {
agentKey: "agent-b",
version: "1.0.0",
secretsRef: [],
botIdentity: "bot-agent-b",
enabled: true,
});
const first = await t.mutation(api.lib.bindUserAgent, {
consumerUserId: "u-1",
agentKey: "agent-a",
botIdentity: "bot-agent-a",
source: "telegram_pairing",
telegramUserId: "tg-user-1",
telegramChatId: "tg-chat-1",
});
expect(first.agentKey).toBe("agent-a");
const byUser = await t.query(api.lib.resolveAgentForUser, {
consumerUserId: "u-1",
});
expect(byUser.agentKey).toBe("agent-a");
const byTelegram = await t.query(api.lib.resolveAgentForTelegram, {
telegramUserId: "tg-user-1",
});
expect(byTelegram.agentKey).toBe("agent-a");
expect(byTelegram.conversationId).toBe("user-agent:agent-a:u-1");
await t.mutation(api.lib.bindUserAgent, {
consumerUserId: "u-1",
agentKey: "agent-b",
source: "manual",
telegramUserId: "tg-user-1",
telegramChatId: "tg-chat-1",
});
const rebound = await t.query(api.lib.resolveAgentForUser, {
consumerUserId: "u-1",
});
expect(rebound.agentKey).toBe("agent-b");
const revokeResult = await t.mutation(api.lib.revokeUserAgentBinding, {
consumerUserId: "u-1",
});
expect(revokeResult.revoked).toBe(1);
const afterRevoke = await t.query(api.lib.resolveAgentForTelegram, {
telegramChatId: "tg-chat-1",
});
expect(afterRevoke.agentKey).toBeNull();
expect(afterRevoke.conversationId).toBeNull();
});
test("worker scheduling should set idle shutdown from last claim time", async () => {
const t = initConvexTest();
const now = Date.now();
vi.setSystemTime(now);
await t.mutation(api.queue.upsertAgentProfile, {
agentKey: "support-agent",
version: "1.0.0",
secretsRef: [],
enabled: true,
});
const messageId = await t.mutation(api.lib.enqueue, {
conversationId: "telegram:chat:2",
agentKey: "support-agent",
payload: {
provider: "telegram",
providerUserId: "u2",
messageText: "ciao",
},
});
const claim = await t.mutation(api.lib.claim, { workerId: "worker-2" });
expect(claim).not.toBeNull();
expect(claim?.messageId).toBe(messageId);
const completionTime = now + 60_000;
vi.setSystemTime(completionTime);
const completed = await t.mutation(api.lib.complete, {
workerId: "worker-2",
messageId,
leaseId: claim?.leaseId ?? "",
});
expect(completed).toBe(true);
const workers = await t.query((internal.queue as any).listWorkersForScheduler, {});
const worker = workers.find((row: { workerId: string }) => row.workerId === "worker-2");
expect(worker?.status).toBe("active");
expect(worker?.load).toBe(0);
expect(worker?.scheduledShutdownAt).toBe(now + DEFAULT_CONFIG.scaling.idleTimeoutMs);
});
test("idle shutdown should move worker to draining and prevent reactivation", async () => {
const t = initConvexTest();
const claimTime = Date.UTC(2026, 0, 1, 12, 0, 0);
vi.stubGlobal(
"fetch",
vi.fn(async (input: RequestInfo | URL, init?: RequestInit) => {
const url = String(input);
const method = init?.method ?? "GET";
if (url.endsWith(`/apps/${TEST_PROVIDER_CONFIG.appName}/machines`) && method === "GET") {
return jsonResponse([]);
}
if (url.endsWith(`/apps/${TEST_PROVIDER_CONFIG.appName}/volumes`) && method === "GET") {
return jsonResponse([]);
}
throw new Error(`Unexpected fetch ${method} ${url}`);
}),
);
vi.setSystemTime(claimTime);
await t.mutation(api.queue.upsertAgentProfile, {
agentKey: "support-agent",
version: "1.0.0",
secretsRef: [],
enabled: true,
});
const messageId = await t.mutation(api.lib.enqueue, {
conversationId: "telegram:chat:forced-stop",
agentKey: "support-agent",
payload: {
provider: "telegram",
providerUserId: "u-stop",
messageText: "stop me",
},
});
const claim = await t.mutation(api.lib.claim, { workerId: "worker-stop-force-1" });
expect(claim?.messageId).toBe(messageId);
const completionTime = claimTime + 60_000;
vi.setSystemTime(completionTime);
await t.mutation(api.lib.complete, {
workerId: "worker-stop-force-1",
messageId,
leaseId: claim?.leaseId ?? "",
providerConfig: TEST_PROVIDER_CONFIG,
} as any);
const dueTime = claimTime + DEFAULT_CONFIG.scaling.idleTimeoutMs + 1;
vi.setSystemTime(dueTime);
const shutdown = await t.action(api.scheduler.checkIdleShutdowns, {
nowMs: dueTime,
flyApiToken: "fly-token",
providerConfig: TEST_PROVIDER_CONFIG,
});
expect(shutdown.stopped).toBe(1);
const workers = await t.query((internal.queue as any).listWorkersForScheduler, {});
const worker = workers.find((row: { workerId: string }) => row.workerId === "worker-stop-force-1");
expect(worker?.status).toBe("draining");
expect(worker?.scheduledShutdownAt).toBe(claimTime + DEFAULT_CONFIG.scaling.idleTimeoutMs);
expect(worker?.stoppedAt).toBeNull();
const control = await t.query(api.queue.getWorkerControlState as any, {
workerId: "worker-stop-force-1",
});
expect(control.shouldStop).toBe(true);
const newMessageId = await t.mutation(api.lib.enqueue, {
conversationId: "telegram:chat:forced-stop:2",
agentKey: "support-agent",
payload: {
provider: "telegram",
providerUserId: "u-stop",
messageText: "new message",
},
});
expect(newMessageId).toBeDefined();
const reactivatedClaim = await t.mutation(api.lib.claim, {
workerId: "worker-stop-force-1",
});
expect(reactivatedClaim).toBeNull();
});
test("hydration bundle should include resolved agent-bridge runtime config", async () => {
const t = initConvexTest();
await t.mutation(api.queue.upsertAgentProfile, {
agentKey: "bridge-agent",
version: "1.0.0",
secretsRef: [],
bridgeConfig: {
enabled: true,
baseUrl: "https://consumer.example.com",
serviceId: "openclaw-prod",
appKey: "crm",
},
enabled: true,
});
await t.mutation(api.queue.importPlaintextSecret, {
secretRef: "agent-bridge.serviceKey.bridge-agent",
plaintextValue: "abs_live_bridge_key",
});
await t.mutation(api.queue.importPlaintextSecret, {
secretRef: "agent-bridge.baseUrlMapJson.bridge-agent",
plaintextValue: '{"crm":"https://consumer.example.com","billing":"https://billing.example.com"}',
});
const messageId = await t.mutation(api.lib.enqueue, {
conversationId: "bridge:chat:1",
agentKey: "bridge-agent",
payload: {
provider: "telegram",
providerUserId: "bridge-user",
messageText: "test",
},
});
const claim = await t.mutation(api.lib.claim, {
workerId: "worker-bridge-1",
});
expect(claim?.messageId).toBe(messageId);
const bundle = await t.query(api.lib.getHydrationBundle, {
messageId,
workspaceId: "default",
});
expect(bundle).not.toBeNull();
expect(bundle?.bridgeRuntimeConfig).toEqual({
baseUrl: "https://consumer.example.com",
appBaseUrlMapJson: '{"crm":"https://consumer.example.com","billing":"https://billing.example.com"}',
serviceId: "openclaw-prod",
appKey: "crm",
serviceKey: "abs_live_bridge_key",
serviceKeySecretRef: "agent-bridge.serviceKey.bridge-agent",
botIdentity: null,
});
});
test("worker control state should signal stop for stopped worker", async () => {
const t = initConvexTest();
await t.mutation(internal.queue.upsertWorkerState, {
workerId: "worker-stop-1",
provider: "fly",
status: "stopped",
load: 0,
});
const control = await t.query(api.queue.getWorkerControlState as any, {
workerId: "worker-stop-1",
});
expect(control.shouldStop).toBe(true);
const controlUnknown = await t.query(api.queue.getWorkerControlState as any, {
workerId: "worker-nonexistent",
});
expect(controlUnknown.shouldStop).toBe(true);
});
test("worker control state should stop active workers past scheduled shutdown", async () => {
const t = initConvexTest();
const nowMs = Date.UTC(2026, 0, 1, 14, 0, 0);
vi.setSystemTime(nowMs);
await t.mutation(internal.queue.upsertWorkerState, {
workerId: "worker-overdue-1",
provider: "fly",
status: "active",
load: 0,
nowMs: nowMs - 60_000,
scheduledShutdownAt: nowMs - 1,
});
const control = await t.query(api.queue.getWorkerControlState as any, {
workerId: "worker-overdue-1",
});
expect(control.shouldStop).toBe(true);
});
test("shutdown teardown should wait for final snapshot before deleting worker volume", async () => {
const t = initConvexTest();
const workerId = "worker-cleanup-1";
const machineId = "machine-cleanup-1";
const volumeId = "vol-cleanup-1";
const volumeName = buildDedicatedVolumeName(TEST_PROVIDER_CONFIG.volumeName, workerId);
const claimTime = Date.UTC(2026, 0, 1, 15, 0, 0);
const fetchMock = vi.fn(async (input: RequestInfo | URL, init?: RequestInit) => {
const url = String(input);
const method = init?.method ?? "GET";
if (url.endsWith(`/apps/${TEST_PROVIDER_CONFIG.appName}/machines`) && method === "GET") {
return jsonResponse([
{
id: machineId,
name: workerId,
region: TEST_PROVIDER_CONFIG.region,
state: "started",
config: { image: TEST_PROVIDER_CONFIG.image },
},
]);
}
if (url.endsWith(`/machines/${machineId}/cordon`) && method === "POST") {
return emptyResponse();
}
if (url.endsWith(`/machines/${machineId}/stop`) && method === "POST") {
return emptyResponse();
}
if (url.endsWith(`/machines/${machineId}`) && method === "DELETE") {
return emptyResponse();
}
if (url.endsWith(`/machines/${machineId}`) && method === "GET") {
return jsonResponse({
id: machineId,
config: {
mounts: [{ volume: volumeId, path: TEST_PROVIDER_CONFIG.volumePath }],
},
});
}
if (url.endsWith(`/apps/${TEST_PROVIDER_CONFIG.appName}/volumes`) && method === "GET") {
return jsonResponse([
{
id: volumeId,
name: volumeName,
region: TEST_PROVIDER_CONFIG.region,
},
]);
}
if (url.endsWith(`/volumes/${volumeId}`) && method === "DELETE") {
return emptyResponse();
}
throw new Error(`Unexpected fetch ${method} ${url}`);
});
vi.stubGlobal("fetch", fetchMock);
vi.setSystemTime(claimTime);
await t.mutation(api.queue.upsertAgentProfile, {
agentKey: "support-agent",
version: "1.0.0",
secretsRef: [],
enabled: true,
});
const messageId = await t.mutation(api.lib.enqueue, {
conversationId: "telegram:chat:cleanup",
agentKey: "support-agent",
payload: {
provider: "telegram",
providerUserId: "u-clean",
messageText: "cleanup",
},
});
const claim = await t.mutation(api.lib.claim, { workerId });
expect(claim?.messageId).toBe(messageId);
await t.mutation(internal.queue.upsertWorkerState, {
workerId,
provider: "fly",
status: "active",
load: 1,
nowMs: claimTime,
machineId,
appName: TEST_PROVIDER_CONFIG.appName,
region: TEST_PROVIDER_CONFIG.region,
volumeId,
});
const completionTime = claimTime + 60_000;
vi.setSystemTime(completionTime);
await t.mutation(api.queue.completeJob as any, {
workerId,
messageId,
leaseId: claim?.leaseId ?? "",
nowMs: completionTime,
providerConfig: TEST_PROVIDER_CONFIG,
});
const dueTime = claimTime + DEFAULT_CONFIG.scaling.idleTimeoutMs + 1;
vi.setSystemTime(dueTime);
const firstPass = await t.action(api.scheduler.checkIdleShutdowns, {
nowMs: dueTime,
flyApiToken: "fly-token",
providerConfig: TEST_PROVIDER_CONFIG,
});
expect(firstPass.stopped).toBe(1);
expect(firstPass.pending).toBe(1);
const workersAfterFirstPass = await t.query((internal.queue as any).listWorkersForScheduler, {});
const workerAfterFirstPass = workersAfterFirstPass.find(
(row: { workerId: string }) => row.workerId === workerId,
);
expect(workerAfterFirstPass?.status).toBe("draining");
const prematureDeleteCalls = fetchMock.mock.calls.filter((call) =>
String(call[0]).includes(`/volumes/${volumeId}`),
);
expect(prematureDeleteCalls).toHaveLength(0);
const snapshot = await t.mutation(api.queue.prepareDataSnapshotUpload as any, {
workerId,
workspaceId: "default",
agentKey: "support-agent",
conversationId: "telegram:chat:cleanup",
reason: "drain",
nowMs: dueTime + 1,
});
const storageId = await t.run(async (ctx) => {
return await ctx.storage.store(new Blob(["snapshot-ready"]));
});
const finalized = await t.mutation(api.queue.finalizeDataSnapshotUpload as any, {
workerId,
snapshotId: snapshot.snapshotId,
storageId,
sha256: "deadbeef",
sizeBytes: 14,
nowMs: dueTime + 2,
});
expect(finalized).toBe(true);
const secondPass = await t.action(api.scheduler.checkIdleShutdowns, {
nowMs: dueTime + 10_000,
flyApiToken: "fly-token",
providerConfig: TEST_PROVIDER_CONFIG,
});
expect(secondPass.pending).toBe(0);
const workersAfterSecondPass = await t.query((internal.queue as any).listWorkersForScheduler, {});
const workerAfterSecondPass = workersAfterSecondPass.find(
(row: { workerId: string }) => row.workerId === workerId,
);
expect(workerAfterSecondPass?.status).toBe("stopped");
const deleteMachineCalls = fetchMock.mock.calls.filter(
(call) =>
String(call[0]).endsWith(`/machines/${machineId}`) &&
((call[1] as RequestInit | undefined)?.method ?? "GET") === "DELETE",
);
const deleteVolumeCalls = fetchMock.mock.calls.filter(
(call) =>
String(call[0]).endsWith(`/volumes/${volumeId}`) &&
((call[1] as RequestInit | undefined)?.method ?? "GET") === "DELETE",
);
expect(deleteMachineCalls).toHaveLength(1);
expect(deleteVolumeCalls).toHaveLength(1);
});
test("cleanup should remove orphan worker volumes when the machine is already gone", async () => {
const t = initConvexTest();
const workerId = "worker-orphan-1";
const volumeId = "vol-orphan-1";
const volumeName = buildDedicatedVolumeName(TEST_PROVIDER_CONFIG.volumeName, workerId);
const nowMs = Date.UTC(2026, 0, 1, 16, 0, 0);
const fetchMock = vi.fn(async (input: RequestInfo | URL, init?: RequestInit) => {
const url = String(input);
const method = init?.method ?? "GET";
if (url.endsWith(`/apps/${TEST_PROVIDER_CONFIG.appName}/machines`) && method === "GET") {
return jsonResponse([]);
}
if (url.endsWith(`/apps/${TEST_PROVIDER_CONFIG.appName}/volumes`) && method === "GET") {
return jsonResponse([
{
id: volumeId,
name: volumeName,
region: TEST_PROVIDER_CONFIG.region,
},
]);
}
if (url.endsWith(`/volumes/${volumeId}`) && method === "DELETE") {
return emptyResponse();
}
throw new Error(`Unexpected fetch ${method} ${url}`);
});
vi.stubGlobal("fetch", fetchMock);
await t.mutation(internal.queue.upsertWorkerState, {
workerId,
provider: "fly",
status: "active",
load: 0,
nowMs,
scheduledShutdownAt: nowMs - 1,
machineId: "machine-orphan-1",
appName: TEST_PROVIDER_CONFIG.appName,
region: TEST_PROVIDER_CONFIG.region,
volumeId,
});
const result = await t.action(api.scheduler.checkIdleShutdowns, {
nowMs,
flyApiToken: "fly-token",
providerConfig: TEST_PROVIDER_CONFIG,
});
expect(result.stopped).toBe(1);
expect(result.pending).toBe(0);
const deleteVolumeCalls = fetchMock.mock.calls.filter(
(call) =>
String(call[0]).endsWith(`/volumes/${volumeId}`) &&
((call[1] as RequestInit | undefined)?.method ?? "GET") === "DELETE",
);
expect(deleteVolumeCalls).toHaveLength(1);
const machineDetailCalls = fetchMock.mock.calls.filter(
(call) =>
String(call[0]).endsWith(`/machines/machine-orphan-1`) &&
((call[1] as RequestInit | undefined)?.method ?? "GET") === "GET",
);
expect(machineDetailCalls).toHaveLength(0);
const workers = await t.query((internal.queue as any).listWorkersForScheduler, {});
const worker = workers.find((row: { workerId: string }) => row.workerId === workerId);
expect(worker?.status).toBe("stopped");
});
test("reconcile should persist volumeId on the worker row after spawn", async () => {
const t = initConvexTest();
const nowMs = Date.UTC(2026, 0, 2, 9, 0, 0);
const volumeId = "vol-persist-1";
const machineId = "machine-persist-1";
const fetchMock = vi.fn(async (input: RequestInfo | URL, init?: RequestInit) => {
const url = String(input);
const method = init?.method ?? "GET";
if (url.endsWith(`/apps/${TEST_PROVIDER_CONFIG.appName}/machines`) && method === "GET") {
return jsonResponse(
fetchMock.mock.calls.some(
(call) =>
String(call[0]).endsWith(`/apps/${TEST_PROVIDER_CONFIG.appName}/machines`) &&
((call[1] as RequestInit | undefined)?.method ?? "GET") === "POST",
)
? [
{
id: machineId,
name: `afw-${nowMs}-0`,
region: TEST_PROVIDER_CONFIG.region,
state: "started",
config: { image: TEST_PROVIDER_CONFIG.image },
},
]
: [],
);
}
if (url.endsWith(`/apps/${TEST_PROVIDER_CONFIG.appName}/volumes`) && method === "GET") {
return jsonResponse([]);
}
if (url.endsWith(`/apps/${TEST_PROVIDER_CONFIG.appName}/volumes`) && method === "POST") {
return jsonResponse({
id: volumeId,
name: buildDedicatedVolumeName(TEST_PROVIDER_CONFIG.volumeName, `afw-${nowMs}-0`),
region: TEST_PROVIDER_CONFIG.region,
});
}
if (url.endsWith(`/apps/${TEST_PROVIDER_CONFIG.appName}/machines`) && method === "POST") {
return jsonResponse({
id: machineId,
region: TEST_PROVIDER_CONFIG.region,
state: "started",
config: { image: TEST_PROVIDER_CONFIG.image },
});
}
throw new Error(`Unexpected fetch ${method} ${url}`);
});
vi.stubGlobal("fetch", fetchMock);
await t.mutation(api.queue.upsertAgentProfile, {
agentKey: "support-agent",
version: "1.0.0",
secretsRef: [],
enabled: true,
});
await t.mutation(api.queue.importPlaintextSecret, {
secretRef: "fly.apiToken",
plaintextValue: "fly-token",
});
await t.mutation(api.queue.importPlaintextSecret, {
secretRef: "convex.url",
plaintextValue: "https://example.convex.cloud",
});
await t.mutation(api.queue.enqueueMessage, {
conversationId: "telegram:chat:persist-volume",
agentKey: "support-agent",
payload: {
provider: "telegram",
providerUserId: "u-persist-volume",
messageText: "hello",
},
nowMs,
});
await t.action(api.scheduler.reconcileWorkerPool, {
nowMs,
flyApiToken: "fly-token",
convexUrl: "https://example.convex.cloud",
scalingPolicy: {
maxWorkers: 5,
queuePerWorkerTarget: 1,
spawnStep: 1,
idleTimeoutMs: 300_000,
reconcileIntervalMs: 15_000,
},
providerConfig: TEST_PROVIDER_CONFIG,
});
const workers = await t.query((internal.queue as any).listWorkersForScheduler, {});
const worker = workers.find((row: { workerId: string }) => row.workerId === `afw-${nowMs}-0`);
expect(worker?.machineId).toBe(machineId);
expect(worker?.volumeId).toBe(volumeId);
});
test("runFlyCleanup should destroy machines and volumes with final verification", async () => {
const t = initConvexTest();
await t.mutation(api.queue.importPlaintextSecret, {
secretRef: "fly.apiToken",
plaintextValue: "fly-token",
});
await t.mutation(api.queue.setProviderRuntimeConfig, {
providerConfig: TEST_PROVIDER_CONFIG,
});
let machineListCalls = 0;
let volumeListCalls = 0;
const deletedMachines: Array = [];
const deletedVolumes: Array = [];
vi.stubGlobal(
"fetch",
vi.fn(async (input: RequestInfo | URL, init?: RequestInit) => {
const url = String(input);
const method = init?.method ?? "GET";
const appBase = "https://api.machines.dev/v1/apps/agent-factory-workers-test";
if (url === `${appBase}` && method === "GET") {
return jsonResponse({ name: "agent-factory-workers-test" });
}
if (url === `${appBase}/machines` && method === "GET") {
machineListCalls += 1;
return jsonResponse(machineListCalls === 1 ? [{ id: "machine-1" }, { id: "machine-2" }] : []);
}
if (url === `${appBase}/volumes` && method === "GET") {
volumeListCalls += 1;
return jsonResponse(
volumeListCalls === 1
? [{ id: "volume-1" }, { id: "volume-2" }, { id: "volume-3" }]
: [],
);
}
if (url.includes("/machines/") && url.endsWith("/cordon") && method === "POST") {
return jsonResponse({});
}
if (url.includes("/machines/") && url.endsWith("/stop") && method === "POST") {
return jsonResponse({});
}
if (url.includes("/machines/") && method === "DELETE") {
const segments = url.split("/");
deletedMachines.push(segments[segments.length - 1]!);
return new Response(null, { status: 204 });
}
if (url.includes("/volumes/") && method === "DELETE") {
const segments = url.split("/");
deletedVolumes.push(segments[segments.length - 1]!);
return new Response(null, { status: 204 });
}
throw new Error(`Unexpected fetch ${method} ${url}`);
}),
);
const report = await t.action((api.lib as any).runFlyCleanup, {});
expect(report.appName).toBe("agent-factory-workers-test");
expect(report.machinesFound).toBe(2);
expect(report.machinesDeleted).toBe(2);
expect(report.machinesRemaining).toBe(0);
expect(report.volumesFound).toBe(3);
expect(report.volumesDeleted).toBe(3);
expect(report.volumesRemaining).toBe(0);
expect(report.errors).toEqual([]);
expect(report.warnings).toEqual([]);
expect(deletedMachines.sort()).toEqual(["machine-1", "machine-2"]);
expect(deletedVolumes.sort()).toEqual(["volume-1", "volume-2", "volume-3"]);
});
test("runFlyCleanup should require an active fly secret", async () => {
const t = initConvexTest();
await t.mutation(api.queue.setProviderRuntimeConfig, {
providerConfig: TEST_PROVIDER_CONFIG,
});
await expect(t.action((api.lib as any).runFlyCleanup, {})).rejects.toThrow(
"Missing active 'fly.apiToken' secret.",
);
});
test("scheduler count includes queued and in-progress conversations", async () => {
const t = initConvexTest();
await t.mutation(api.queue.upsertAgentProfile, {
agentKey: "support-agent",
version: "1.0.0",
secretsRef: [],
enabled: true,
});
await t.mutation(api.queue.enqueueMessage, {
conversationId: "telegram:chat:active-1",
agentKey: "support-agent",
payload: {
provider: "telegram",
providerUserId: "u-active-1",
messageText: "uno",
},
});
await t.mutation(api.queue.enqueueMessage, {
conversationId: "telegram:chat:active-2",
agentKey: "support-agent",
payload: {
provider: "telegram",
providerUserId: "u-active-2",
messageText: "due",
},
});
const claimed = await t.mutation(api.lib.claim, { workerId: "worker-active-1" });
expect(claimed).not.toBeNull();
const readyCount = await t.query((internal.queue as any).getReadyConversationCountForScheduler, {
nowMs: Date.now(),
limit: 1000,
});
const activeCount = await t.query((internal.queue as any).getActiveConversationCountForScheduler, {
nowMs: Date.now(),
limit: 1000,
});
expect(readyCount).toBe(1);
expect(activeCount).toBe(2);
});
test("upsertWorkerState should preserve heartbeat for stopped workers", async () => {
const t = initConvexTest();
const firstHeartbeat = 1_700_000_000_000;
const drainingAt = firstHeartbeat + 60_000;
const stoppingAt = firstHeartbeat + 120_000;
const stoppedAt = firstHeartbeat + 180_000;
await t.mutation(internal.queue.upsertWorkerState, {
workerId: "worker-heartbeat-1",
provider: "fly",
status: "active",
load: 0,
nowMs: firstHeartbeat,
});
await t.mutation(internal.queue.upsertWorkerState, {
workerId: "worker-heartbeat-1",
provider: "fly",
status: "draining",
load: 0,
nowMs: drainingAt,
scheduledShutdownAt: stoppingAt,
});
await t.mutation(internal.queue.upsertWorkerState, {
workerId: "worker-heartbeat-1",
provider: "fly",
status: "stopping",
load: 0,
nowMs: stoppingAt,
scheduledShutdownAt: stoppingAt,
stoppedAt: stoppingAt,
});
await t.mutation(internal.queue.upsertWorkerState, {
workerId: "worker-heartbeat-1",
provider: "fly",
status: "stopped",
load: 0,
nowMs: stoppedAt,
scheduledShutdownAt: stoppingAt,
stoppedAt,
});
const workers = await t.query((internal.queue as any).listWorkersForScheduler, {});
const worker = workers.find(
(row: { workerId: string }) => row.workerId === "worker-heartbeat-1",
);
expect(worker?.status).toBe("stopped");
expect(worker?.heartbeatAt).toBe(stoppingAt);
expect(worker?.scheduledShutdownAt).toBe(stoppingAt);
expect(worker?.stoppedAt).toBe(stoppedAt);
});
test("scheduler caps desired workers by distinct ready conversations", async () => {
const t = initConvexTest();
vi.stubGlobal(
"fetch",
vi.fn(async (input: RequestInfo | URL, init?: RequestInit) => {
const url = String(input);
const method = init?.method ?? "GET";
if (url.endsWith(`/apps/${TEST_PROVIDER_CONFIG.appName}/machines`) && method === "GET") {
return jsonResponse([]);
}
if (url.endsWith(`/apps/${TEST_PROVIDER_CONFIG.appName}/volumes`) && method === "GET") {
return jsonResponse([]);
}
throw new Error(`Unexpected fetch ${method} ${url}`);
}),
);
await t.mutation(api.queue.upsertAgentProfile, {
agentKey: "support-agent",
version: "1.0.0",
secretsRef: [],
enabled: true,
});
await t.mutation(api.queue.importPlaintextSecret, {
secretRef: "fly.apiToken",
plaintextValue: "fly-token",
});
await t.mutation(api.queue.importPlaintextSecret, {
secretRef: "convex.url",
plaintextValue: "https://example.convex.cloud",
});
await t.mutation(api.queue.enqueueMessage, {
conversationId: "telegram:chat:cap-1",
agentKey: "support-agent",
payload: {
provider: "telegram",
providerUserId: "u-cap",
messageText: "first",
},
});
await t.mutation(api.queue.enqueueMessage, {
conversationId: "telegram:chat:cap-1",
agentKey: "support-agent",
payload: {
provider: "telegram",
providerUserId: "u-cap",
messageText: "second",
},
});
await t.mutation(internal.queue.upsertWorkerState, {
workerId: "worker-cap-1",
provider: "fly",
status: "active",
load: 0,
});
await t.mutation(internal.queue.upsertWorkerState, {
workerId: "worker-cap-2",
provider: "fly",
status: "active",
load: 0,
});
await t.mutation(internal.queue.upsertWorkerState, {
workerId: "worker-cap-3",
provider: "fly",
status: "active",
load: 0,
});
const reconcile = await t.action(api.scheduler.reconcileWorkerPool, {
scalingPolicy: {
maxWorkers: 5,
queuePerWorkerTarget: 1,
spawnStep: 1,
idleTimeoutMs: 300_000,
reconcileIntervalMs: 15_000,
},
providerConfig: TEST_PROVIDER_CONFIG,
});
expect(reconcile.activeWorkers).toBe(3);
expect(reconcile.spawned).toBe(0);
expect(reconcile.terminated).toBe(0);
});
test("scheduler desired workers increases with distinct ready conversations", async () => {
const t = initConvexTest();
vi.stubGlobal(
"fetch",
vi.fn(async (input: RequestInfo | URL, init?: RequestInit) => {
const url = String(input);
const method = init?.method ?? "GET";
if (url.endsWith(`/apps/${TEST_PROVIDER_CONFIG.appName}/machines`) && method === "GET") {
return jsonResponse([]);
}
if (url.endsWith(`/apps/${TEST_PROVIDER_CONFIG.appName}/volumes`) && method === "GET") {
return jsonResponse([]);
}
throw new Error(`Unexpected fetch ${method} ${url}`);
}),
);
await t.mutation(api.queue.upsertAgentProfile, {
agentKey: "support-agent",
version: "1.0.0",
secretsRef: [],
enabled: true,
});
await t.mutation(api.queue.importPlaintextSecret, {
secretRef: "fly.apiToken",
plaintextValue: "fly-token",
});
await t.mutation(api.queue.importPlaintextSecret, {
secretRef: "convex.url",
plaintextValue: "https://example.convex.cloud",
});
await t.mutation(api.queue.enqueueMessage, {
conversationId: "telegram:chat:cap-a",
agentKey: "support-agent",
payload: {
provider: "telegram",
providerUserId: "u-cap-a",
messageText: "hello",
},
});
await t.mutation(api.queue.enqueueMessage, {
conversationId: "telegram:chat:cap-b",
agentKey: "support-agent",
payload: {
provider: "telegram",
providerUserId: "u-cap-b",
messageText: "hello",
},
});
await t.mutation(internal.queue.upsertWorkerState, {
workerId: "worker-cap-4",
provider: "fly",
status: "active",
load: 0,
});
await t.mutation(internal.queue.upsertWorkerState, {
workerId: "worker-cap-5",
provider: "fly",
status: "active",
load: 0,
});
await t.mutation(internal.queue.upsertWorkerState, {
workerId: "worker-cap-6",
provider: "fly",
status: "active",
load: 0,
});
const reconcile = await t.action(api.scheduler.reconcileWorkerPool, {
scalingPolicy: {
maxWorkers: 5,
queuePerWorkerTarget: 1,
spawnStep: 1,
idleTimeoutMs: 300_000,
reconcileIntervalMs: 15_000,
},
providerConfig: TEST_PROVIDER_CONFIG,
});
expect(reconcile.activeWorkers).toBe(3);
expect(reconcile.spawned).toBe(0);
expect(reconcile.terminated).toBe(0);
});
test("scheduler should spawn a new worker when another conversation is already processing", async () => {
const t = initConvexTest();
const nowMs = Date.UTC(2026, 0, 1, 17, 0, 0);
vi.setSystemTime(nowMs);
let machineCreateBody:
| {
name?: string;
config?: {
env?: Record;
};
}
| null = null;
const fetchMock = vi.fn(async (input: RequestInfo | URL, init?: RequestInit) => {
const url = String(input);
const method = init?.method ?? "GET";
if (url.endsWith(`/apps/${TEST_PROVIDER_CONFIG.appName}/machines`) && method === "GET") {
return jsonResponse([
{
id: "machine-busy-1",
name: "worker-busy-1",
region: TEST_PROVIDER_CONFIG.region,
state: "started",
config: { image: TEST_PROVIDER_CONFIG.image },
},
]);
}
if (url.endsWith(`/apps/${TEST_PROVIDER_CONFIG.appName}/volumes`) && method === "GET") {
return jsonResponse([]);
}
if (url.endsWith(`/apps/${TEST_PROVIDER_CONFIG.appName}/volumes`) && method === "POST") {
return jsonResponse({
id: "vol-new-worker",
name: buildDedicatedVolumeName(TEST_PROVIDER_CONFIG.volumeName, "afw-176"),
region: TEST_PROVIDER_CONFIG.region,
});
}
if (url.endsWith(`/apps/${TEST_PROVIDER_CONFIG.appName}/machines`) && method === "POST") {
const body = JSON.parse(String(init?.body ?? "{}")) as {
name?: string;
config?: {
env?: Record;
};
};
machineCreateBody = body;
return jsonResponse({
id: "machine-new-worker",
name: body.name,
region: TEST_PROVIDER_CONFIG.region,
state: "started",
config: { image: TEST_PROVIDER_CONFIG.image },
});
}
throw new Error(`Unexpected fetch ${method} ${url}`);
});
vi.stubGlobal("fetch", fetchMock);
await t.mutation(api.queue.upsertAgentProfile, {
agentKey: "support-agent",
version: "1.0.0",
secretsRef: [],
enabled: true,
});
await t.mutation(api.queue.importPlaintextSecret, {
secretRef: "fly.apiToken",
plaintextValue: "fly-token",
});
await t.mutation(api.queue.importPlaintextSecret, {
secretRef: "convex.url",
plaintextValue: "https://example.convex.cloud",
});
await t.mutation(api.queue.enqueueMessage, {
conversationId: "telegram:chat:spawn-a",
agentKey: "support-agent",
payload: {
provider: "telegram",
providerUserId: "u-spawn-a",
messageText: "first",
},
});
await t.mutation(api.queue.enqueueMessage, {
conversationId: "telegram:chat:spawn-b",
agentKey: "support-agent",
payload: {
provider: "telegram",
providerUserId: "u-spawn-b",
messageText: "second",
},
});
const claim = await t.mutation(api.lib.claim, { workerId: "worker-busy-1", nowMs });
expect(claim?.conversationId).toBe("telegram:chat:spawn-a");
await t.mutation(internal.queue.upsertWorkerState, {
workerId: "worker-busy-1",
provider: "fly",
status: "active",
load: 1,
nowMs,
machineId: "machine-busy-1",
appName: TEST_PROVIDER_CONFIG.appName,
region: TEST_PROVIDER_CONFIG.region,
});
const reconcile = await t.action(api.scheduler.reconcileWorkerPool, {
nowMs,
flyApiToken: "fly-token",
convexUrl: "https://example.convex.cloud",
scalingPolicy: {
maxWorkers: 5,
queuePerWorkerTarget: 1,
spawnStep: 2,
idleTimeoutMs: 300_000,
reconcileIntervalMs: 15_000,
},
providerConfig: TEST_PROVIDER_CONFIG,
});
expect(reconcile.spawned).toBe(1);
expect(reconcile.activeWorkers).toBe(2);
const machineCreateEnv = (
machineCreateBody as
| {
config?: {
env?: Record;
};
}
| null
)?.config?.env;
expect(machineCreateEnv?.OPENCLAW_CONVERSATION_ID).toBe("telegram:chat:spawn-b");
expect(machineCreateEnv?.OPENCLAW_AGENT_KEY).toBe("support-agent");
const workers = await t.query((internal.queue as any).listWorkersForScheduler, {});
const spawnedWorker = workers.find(
(row: { workerId: string }) => row.workerId === `afw-${nowMs}-0`,
);
expect(spawnedWorker?.assignment).toEqual({
conversationId: "telegram:chat:spawn-b",
agentKey: "support-agent",
leaseId: `spawn:afw-${nowMs}-0`,
assignedAt: nowMs,
});
});
test("scheduler should forward OpenClaw bridge env to spawned machines", async () => {
const t = initConvexTest();
const nowMs = Date.UTC(2026, 0, 1, 17, 5, 0);
vi.setSystemTime(nowMs);
let machineCreateBody:
| {
config?: {
env?: Record;
};
}
| null = null;
const fetchMock = vi.fn(async (input: RequestInfo | URL, init?: RequestInit) => {
const url = String(input);
const method = init?.method ?? "GET";
if (url.endsWith(`/apps/${TEST_PROVIDER_CONFIG.appName}/machines`) && method === "GET") {
return jsonResponse([]);
}
if (url.endsWith(`/apps/${TEST_PROVIDER_CONFIG.appName}/volumes`) && method === "GET") {
return jsonResponse([]);
}
if (url.endsWith(`/apps/${TEST_PROVIDER_CONFIG.appName}/volumes`) && method === "POST") {
return jsonResponse({
id: "vol-forwarded-env",
name: buildDedicatedVolumeName(TEST_PROVIDER_CONFIG.volumeName, `afw-${nowMs}-0`),
region: TEST_PROVIDER_CONFIG.region,
});
}
if (url.endsWith(`/apps/${TEST_PROVIDER_CONFIG.appName}/machines`) && method === "POST") {
machineCreateBody = JSON.parse(String(init?.body ?? "{}")) as {
config?: {
env?: Record;
};
};
return jsonResponse({
id: "machine-forwarded-env",
name: "afw-forwarded-env",
region: TEST_PROVIDER_CONFIG.region,
state: "started",
config: { image: TEST_PROVIDER_CONFIG.image },
});
}
throw new Error(`Unexpected fetch ${method} ${url}`);
});
vi.stubGlobal("fetch", fetchMock);
await t.mutation(api.queue.upsertAgentProfile, {
agentKey: "support-agent",
version: "1.0.0",
secretsRef: [],
bridgeConfig: {
enabled: true,
serviceId: "openclaw-prod",
},
enabled: true,
});
await t.mutation(api.queue.importPlaintextSecret, {
secretRef: "fly.apiToken",
plaintextValue: "fly-token",
});
await t.mutation(api.queue.importPlaintextSecret, {
secretRef: "convex.url",
plaintextValue: "https://example.convex.cloud",
});
await t.mutation(api.queue.importPlaintextSecret, {
secretRef: "agent-bridge.serviceKey",
plaintextValue: "abs_live_bridge_key",
});
await t.mutation(api.queue.importPlaintextSecret, {
secretRef: "agent-bridge.linkingSharedSecret",
plaintextValue: "abs_linking_secret",
});
await t.mutation(api.queue.enqueueMessage, {
conversationId: "telegram:chat:spawn-env",
agentKey: "support-agent",
payload: {
provider: "telegram",
providerUserId: "u-spawn-env",
messageText: "hello",
},
});
const reconcile = await t.action(api.scheduler.reconcileWorkerPool, {
nowMs,
flyApiToken: "fly-token",
convexUrl: "https://example.convex.cloud",
scalingPolicy: {
maxWorkers: 5,
queuePerWorkerTarget: 1,
spawnStep: 1,
idleTimeoutMs: 300_000,
reconcileIntervalMs: 15_000,
},
providerConfig: TEST_PROVIDER_CONFIG,
});
expect(reconcile.spawned).toBe(1);
expect(machineCreateBody).not.toBeNull();
expect(machineCreateBody!.config?.env?.OPENCLAW_SERVICE_ID).toBe("openclaw-prod");
expect(machineCreateBody!.config?.env?.OPENCLAW_SERVICE_KEY).toBe("abs_live_bridge_key");
expect(machineCreateBody!.config?.env?.OPENCLAW_LINKING_SHARED_SECRET).toBe(
"abs_linking_secret",
);
});
test("scheduler should omit unresolved OpenClaw bridge env keys from spawned machines", async () => {
const t = initConvexTest();
const nowMs = Date.UTC(2026, 0, 1, 17, 6, 0);
vi.setSystemTime(nowMs);
let machineCreateBody:
| {
config?: {
env?: Record;
};
}
| null = null;
const fetchMock = vi.fn(async (input: RequestInfo | URL, init?: RequestInit) => {
const url = String(input);
const method = init?.method ?? "GET";
if (url.endsWith(`/apps/${TEST_PROVIDER_CONFIG.appName}/machines`) && method === "GET") {
return jsonResponse([]);
}
if (url.endsWith(`/apps/${TEST_PROVIDER_CONFIG.appName}/volumes`) && method === "GET") {
return jsonResponse([]);
}
if (url.endsWith(`/apps/${TEST_PROVIDER_CONFIG.appName}/volumes`) && method === "POST") {
return jsonResponse({
id: "vol-forwarded-env-missing",
name: buildDedicatedVolumeName(TEST_PROVIDER_CONFIG.volumeName, `afw-${nowMs}-0`),
region: TEST_PROVIDER_CONFIG.region,
});
}
if (url.endsWith(`/apps/${TEST_PROVIDER_CONFIG.appName}/machines`) && method === "POST") {
machineCreateBody = JSON.parse(String(init?.body ?? "{}")) as {
config?: {
env?: Record;
};
};
return jsonResponse({
id: "machine-forwarded-env-missing",
name: "afw-forwarded-env-missing",
region: TEST_PROVIDER_CONFIG.region,
state: "started",
config: { image: TEST_PROVIDER_CONFIG.image },
});
}
throw new Error(`Unexpected fetch ${method} ${url}`);
});
vi.stubGlobal("fetch", fetchMock);
await t.mutation(api.queue.upsertAgentProfile, {
agentKey: "support-agent",
version: "1.0.0",
secretsRef: [],
bridgeConfig: {
enabled: true,
serviceId: "openclaw-prod",
},
enabled: true,
});
await t.mutation(api.queue.importPlaintextSecret, {
secretRef: "fly.apiToken",
plaintextValue: "fly-token",
});
await t.mutation(api.queue.importPlaintextSecret, {
secretRef: "convex.url",
plaintextValue: "https://example.convex.cloud",
});
await t.mutation(api.queue.enqueueMessage, {
conversationId: "telegram:chat:spawn-env-missing",
agentKey: "support-agent",
payload: {
provider: "telegram",
providerUserId: "u-spawn-env-missing",
messageText: "hello",
},
});
const reconcile = await t.action(api.scheduler.reconcileWorkerPool, {
nowMs,
flyApiToken: "fly-token",
convexUrl: "https://example.convex.cloud",
scalingPolicy: {
maxWorkers: 5,
queuePerWorkerTarget: 1,
spawnStep: 1,
idleTimeoutMs: 300_000,
reconcileIntervalMs: 15_000,
},
providerConfig: TEST_PROVIDER_CONFIG,
});
expect(reconcile.spawned).toBe(1);
expect(machineCreateBody).not.toBeNull();
expect(machineCreateBody!.config?.env?.OPENCLAW_SERVICE_ID).toBe("openclaw-prod");
expect(machineCreateBody!.config?.env).not.toHaveProperty("OPENCLAW_SERVICE_KEY");
expect(machineCreateBody!.config?.env).not.toHaveProperty(
"OPENCLAW_LINKING_SHARED_SECRET",
);
});
test("worker assignment should prevent cross-conversation claims after completion", async () => {
const t = initConvexTest();
const nowMs = Date.UTC(2026, 0, 1, 17, 30, 0);
vi.setSystemTime(nowMs);
await t.mutation(api.queue.upsertAgentProfile, {
agentKey: "support-agent",
version: "1.0.0",
secretsRef: [],
enabled: true,
});
const conversationA = "telegram:chat:sticky-a";
const conversationB = "telegram:chat:sticky-b";
const messageA = await t.mutation(api.queue.enqueueMessage, {
conversationId: conversationA,
agentKey: "support-agent",
payload: {
provider: "telegram",
providerUserId: "u-sticky-a",
messageText: "first",
},
nowMs,
});
const firstClaim = await t.mutation(api.lib.claim, {
workerId: "worker-sticky-1",
conversationId: conversationA,
nowMs,
});
expect(firstClaim?.messageId).toBe(messageA);
const completed = await t.mutation(api.lib.complete, {
workerId: "worker-sticky-1",
messageId: messageA,
leaseId: firstClaim?.leaseId ?? "",
nowMs: nowMs + 1_000,
});
expect(completed).toBe(true);
await t.mutation(api.queue.enqueueMessage, {
conversationId: conversationB,
agentKey: "support-agent",
payload: {
provider: "telegram",
providerUserId: "u-sticky-b",
messageText: "second",
},
nowMs: nowMs + 2_000,
});
const mismatchedClaim = await t.mutation(api.lib.claim, {
workerId: "worker-sticky-1",
nowMs: nowMs + 2_000,
});
expect(mismatchedClaim).toBeNull();
const queuedJobs = await t.query(api.queue.listJobsByStatus, {
status: "queued",
limit: 10,
});
expect(queuedJobs.some((job) => job.conversationId === conversationB)).toBe(true);
const workers = await t.query((internal.queue as any).listWorkersForScheduler, {});
const worker = workers.find((row: { workerId: string }) => row.workerId === "worker-sticky-1");
expect(worker?.assignment).toEqual({
conversationId: conversationA,
agentKey: "support-agent",
leaseId: firstClaim?.leaseId ?? "",
assignedAt: nowMs,
});
});
test("preassigned workers should claim only their assigned conversation on first claim", async () => {
const t = initConvexTest();
const nowMs = Date.UTC(2026, 0, 1, 17, 35, 0);
vi.setSystemTime(nowMs);
await t.mutation(api.queue.upsertAgentProfile, {
agentKey: "support-agent",
version: "1.0.0",
secretsRef: [],
enabled: true,
});
await t.mutation(api.queue.enqueueMessage, {
conversationId: "telegram:chat:preassign-a",
agentKey: "support-agent",
payload: {
provider: "telegram",
providerUserId: "u-preassign-a",
messageText: "first",
},
nowMs,
});
const messageB = await t.mutation(api.queue.enqueueMessage, {
conversationId: "telegram:chat:preassign-b",
agentKey: "support-agent",
payload: {
provider: "telegram",
providerUserId: "u-preassign-b",
messageText: "second",
},
nowMs: nowMs + 1,
});
await t.mutation(internal.queue.upsertWorkerState, {
workerId: "worker-preassigned-1",
provider: "fly",
status: "active",
load: 0,
nowMs,
assignment: {
conversationId: "telegram:chat:preassign-b",
agentKey: "support-agent",
leaseId: "spawn:worker-preassigned-1",
assignedAt: nowMs,
},
});
const claim = await t.mutation(api.lib.claim, {
workerId: "worker-preassigned-1",
nowMs: nowMs + 10,
});
expect(claim?.messageId).toBe(messageB);
expect(claim?.conversationId).toBe("telegram:chat:preassign-b");
});
test("exclusive ownership should block another worker and let the owner reclaim", async () => {
const t = initConvexTest();
const nowMs = Date.UTC(2026, 0, 1, 17, 40, 0);
vi.setSystemTime(nowMs);
await t.mutation(api.queue.upsertAgentProfile, {
agentKey: "support-agent",
version: "1.0.0",
secretsRef: [],
enabled: true,
});
const conversationId = "telegram:chat:exclusive-owner";
const firstMessageId = await t.mutation(api.queue.enqueueMessage, {
conversationId,
agentKey: "support-agent",
payload: {
provider: "telegram",
providerUserId: "u-exclusive-1",
messageText: "first",
},
nowMs,
});
const firstClaim = await t.mutation(api.lib.claim, {
workerId: "worker-exclusive-1",
conversationId,
nowMs,
});
expect(firstClaim?.messageId).toBe(firstMessageId);
await t.mutation(api.lib.complete, {
workerId: "worker-exclusive-1",
messageId: firstMessageId,
leaseId: firstClaim?.leaseId ?? "",
nowMs: nowMs + 1_000,
});
const secondMessageId = await t.mutation(api.queue.enqueueMessage, {
conversationId,
agentKey: "support-agent",
payload: {
provider: "telegram",
providerUserId: "u-exclusive-2",
messageText: "second",
},
nowMs: nowMs + 2_000,
});
const blockedClaim = await t.mutation(api.lib.claim, {
workerId: "worker-exclusive-2",
conversationId,
nowMs: nowMs + 2_000,
});
expect(blockedClaim).toBeNull();
const ownerReclaim = await t.mutation(api.lib.claim, {
workerId: "worker-exclusive-1",
nowMs: nowMs + 2_000,
});
expect(ownerReclaim?.messageId).toBe(secondMessageId);
expect(ownerReclaim?.conversationId).toBe(conversationId);
});
test("scheduler should spawn when only an idle worker pinned to another conversation exists", async () => {
const t = initConvexTest();
const nowMs = Date.UTC(2026, 0, 1, 17, 45, 0);
vi.setSystemTime(nowMs);
const fetchMock = vi.fn(async (input: RequestInfo | URL, init?: RequestInit) => {
const url = String(input);
const method = init?.method ?? "GET";
if (url.endsWith(`/apps/${TEST_PROVIDER_CONFIG.appName}/machines`) && method === "GET") {
return jsonResponse([]);
}
if (url.endsWith(`/apps/${TEST_PROVIDER_CONFIG.appName}/volumes`) && method === "GET") {
return jsonResponse([]);
}
if (url.endsWith(`/apps/${TEST_PROVIDER_CONFIG.appName}/volumes`) && method === "POST") {
return jsonResponse({
id: "vol-pinned-new-worker",
name: buildDedicatedVolumeName(TEST_PROVIDER_CONFIG.volumeName, "afw-177"),
region: TEST_PROVIDER_CONFIG.region,
});
}
if (url.endsWith(`/apps/${TEST_PROVIDER_CONFIG.appName}/machines`) && method === "POST") {
const body = JSON.parse(String(init?.body ?? "{}")) as { name?: string };
return jsonResponse({
id: "machine-pinned-new-worker",
name: body.name,
region: TEST_PROVIDER_CONFIG.region,
state: "started",
config: { image: TEST_PROVIDER_CONFIG.image },
});
}
throw new Error(`Unexpected fetch ${method} ${url}`);
});
vi.stubGlobal("fetch", fetchMock);
await t.mutation(api.queue.upsertAgentProfile, {
agentKey: "support-agent",
version: "1.0.0",
secretsRef: [],
enabled: true,
});
await t.mutation(api.queue.importPlaintextSecret, {
secretRef: "fly.apiToken",
plaintextValue: "fly-token",
});
await t.mutation(api.queue.importPlaintextSecret, {
secretRef: "convex.url",
plaintextValue: "https://example.convex.cloud",
});
const messageA = await t.mutation(api.queue.enqueueMessage, {
conversationId: "telegram:chat:pinned-a",
agentKey: "support-agent",
payload: {
provider: "telegram",
providerUserId: "u-pinned-a",
messageText: "first",
},
nowMs,
});
const claimA = await t.mutation(api.lib.claim, {
workerId: "worker-pinned-1",
conversationId: "telegram:chat:pinned-a",
nowMs,
});
expect(claimA?.messageId).toBe(messageA);
await t.mutation(api.lib.complete, {
workerId: "worker-pinned-1",
messageId: messageA,
leaseId: claimA?.leaseId ?? "",
nowMs: nowMs + 1_000,
});
await t.mutation(api.queue.enqueueMessage, {
conversationId: "telegram:chat:pinned-b",
agentKey: "support-agent",
payload: {
provider: "telegram",
providerUserId: "u-pinned-b",
messageText: "second",
},
nowMs: nowMs + 2_000,
});
const reconcile = await t.action(api.scheduler.reconcileWorkerPool, {
nowMs: nowMs + 2_000,
flyApiToken: "fly-token",
convexUrl: "https://example.convex.cloud",
scalingPolicy: {
maxWorkers: 5,
queuePerWorkerTarget: 1,
spawnStep: 1,
idleTimeoutMs: 300_000,
reconcileIntervalMs: 15_000,
},
providerConfig: TEST_PROVIDER_CONFIG,
});
expect(reconcile.spawned).toBe(1);
expect(reconcile.activeWorkers).toBe(2);
});
test("stale owner should allow another worker to take over the conversation", async () => {
const t = initConvexTest();
const nowMs = Date.UTC(2026, 0, 1, 17, 47, 0);
vi.setSystemTime(nowMs);
await t.mutation(api.queue.upsertAgentProfile, {
agentKey: "support-agent",
version: "1.0.0",
secretsRef: [],
enabled: true,
});
const conversationId = "telegram:chat:stale-owner";
const firstMessageId = await t.mutation(api.queue.enqueueMessage, {
conversationId,
agentKey: "support-agent",
payload: {
provider: "telegram",
providerUserId: "u-stale-1",
messageText: "first",
},
nowMs,
});
const firstClaim = await t.mutation(api.lib.claim, {
workerId: "worker-stale-1",
conversationId,
nowMs,
});
expect(firstClaim?.messageId).toBe(firstMessageId);
await t.mutation(api.lib.complete, {
workerId: "worker-stale-1",
messageId: firstMessageId,
leaseId: firstClaim?.leaseId ?? "",
nowMs: nowMs + 1_000,
});
await t.run(async (ctx) => {
const worker = await ctx.db
.query("workers")
.withIndex("by_workerId", (q) => q.eq("workerId", "worker-stale-1"))
.unique();
expect(worker).not.toBeNull();
await ctx.db.patch(worker!._id, {
heartbeatAt: nowMs + 2_000 - DEFAULT_CONFIG.lease.staleAfterMs - 1,
});
});
const secondMessageId = await t.mutation(api.queue.enqueueMessage, {
conversationId,
agentKey: "support-agent",
payload: {
provider: "telegram",
providerUserId: "u-stale-2",
messageText: "second",
},
nowMs: nowMs + 2_000,
});
const takeoverClaim = await t.mutation(api.lib.claim, {
workerId: "worker-stale-2",
conversationId,
nowMs: nowMs + 2_000,
});
expect(takeoverClaim?.messageId).toBe(secondMessageId);
expect(takeoverClaim?.conversationId).toBe(conversationId);
});
test("scheduler should dedupe duplicated sticky workers for the same conversation", async () => {
const t = initConvexTest();
const nowMs = Date.UTC(2026, 0, 1, 17, 48, 0);
vi.setSystemTime(nowMs);
const fetchMock = vi.fn(async (input: RequestInfo | URL, init?: RequestInit) => {
const url = String(input);
const method = init?.method ?? "GET";
if (url.endsWith(`/apps/${TEST_PROVIDER_CONFIG.appName}/machines`) && method === "GET") {
return jsonResponse([]);
}
if (url.endsWith(`/apps/${TEST_PROVIDER_CONFIG.appName}/volumes`) && method === "GET") {
return jsonResponse([]);
}
if (url.endsWith(`/apps/${TEST_PROVIDER_CONFIG.appName}/volumes`) && method === "POST") {
return jsonResponse({
id: "vol-deduped-sticky-worker",
name: buildDedicatedVolumeName(TEST_PROVIDER_CONFIG.volumeName, "afw-deduped"),
region: TEST_PROVIDER_CONFIG.region,
});
}
if (url.endsWith(`/apps/${TEST_PROVIDER_CONFIG.appName}/machines`) && method === "POST") {
const body = JSON.parse(String(init?.body ?? "{}")) as { name?: string };
return jsonResponse({
id: "machine-deduped-sticky-worker",
name: body.name,
region: TEST_PROVIDER_CONFIG.region,
state: "started",
config: { image: TEST_PROVIDER_CONFIG.image },
});
}
throw new Error(`Unexpected fetch ${method} ${url}`);
});
vi.stubGlobal("fetch", fetchMock);
await t.mutation(api.queue.upsertAgentProfile, {
agentKey: "support-agent",
version: "1.0.0",
secretsRef: [],
enabled: true,
});
await t.mutation(api.queue.importPlaintextSecret, {
secretRef: "fly.apiToken",
plaintextValue: "fly-token",
});
await t.mutation(api.queue.importPlaintextSecret, {
secretRef: "convex.url",
plaintextValue: "https://example.convex.cloud",
});
await t.mutation(api.queue.enqueueMessage, {
conversationId: "telegram:chat:dedupe-a",
agentKey: "support-agent",
payload: {
provider: "telegram",
providerUserId: "u-dedupe-a",
messageText: "first",
},
nowMs,
});
await t.mutation(api.queue.enqueueMessage, {
conversationId: "telegram:chat:dedupe-b",
agentKey: "support-agent",
payload: {
provider: "telegram",
providerUserId: "u-dedupe-b",
messageText: "second",
},
nowMs,
});
await t.run(async (ctx) => {
for (const workerId of ["worker-dedupe-1", "worker-dedupe-2", "worker-dedupe-3"]) {
await ctx.db.insert("workers", {
workerId,
provider: "fly",
status: "active",
load: 0,
heartbeatAt: nowMs,
lastClaimAt: nowMs,
scheduledShutdownAt: nowMs + 300_000,
assignment: {
conversationId: "telegram:chat:dedupe-a",
agentKey: "support-agent",
leaseId: `${workerId}-lease`,
assignedAt: nowMs,
},
machineRef: {
appName: TEST_PROVIDER_CONFIG.appName,
machineId: `${workerId}-machine`,
region: TEST_PROVIDER_CONFIG.region,
},
capabilities: [],
});
}
});
const reconcile = await t.action(api.scheduler.reconcileWorkerPool, {
nowMs,
flyApiToken: "fly-token",
convexUrl: "https://example.convex.cloud",
scalingPolicy: {
maxWorkers: 5,
queuePerWorkerTarget: 1,
spawnStep: 1,
idleTimeoutMs: 300_000,
reconcileIntervalMs: 15_000,
},
providerConfig: TEST_PROVIDER_CONFIG,
});
expect(reconcile.spawned).toBe(1);
});
test("snapshot restore should require a matching conversation when conversationId is provided", async () => {
const t = initConvexTest();
const nowMs = Date.UTC(2026, 0, 1, 17, 50, 0);
const snapshot = await t.mutation(api.queue.prepareDataSnapshotUpload as any, {
workerId: "worker-snapshot-1",
workspaceId: "default",
agentKey: "support-agent",
conversationId: "telegram:chat:snapshot-a",
reason: "manual",
nowMs,
});
const storageId = await t.run(async (ctx) => {
return await ctx.storage.store(new Blob(["snapshot-a"]));
});
const finalized = await t.mutation(api.queue.finalizeDataSnapshotUpload as any, {
workerId: "worker-snapshot-1",
snapshotId: snapshot.snapshotId,
storageId,
sha256: "beadfeed",
sizeBytes: 10,
nowMs: nowMs + 1,
});
expect(finalized).toBe(true);
const missingConversationSnapshot = await t.query(
api.queue.getLatestDataSnapshotForRestore as any,
{
workspaceId: "default",
agentKey: "support-agent",
conversationId: "telegram:chat:snapshot-b",
nowMs: nowMs + 2,
},
);
expect(missingConversationSnapshot).toBeNull();
});
test("snapshot APIs should reject missing conversationId", async () => {
const t = initConvexTest();
const nowMs = Date.UTC(2026, 0, 1, 17, 52, 0);
await expect(
t.mutation(api.queue.prepareDataSnapshotUpload as any, {
workerId: "worker-snapshot-missing",
workspaceId: "default",
agentKey: "support-agent",
reason: "manual",
nowMs,
}),
).rejects.toThrow();
await expect(
t.query(api.queue.getLatestDataSnapshotForRestore as any, {
workspaceId: "default",
agentKey: "support-agent",
nowMs,
}),
).rejects.toThrow();
});
test("user-agent snapshot listing should stay isolated per conversation", async () => {
const t = initConvexTest();
const nowMs = Date.UTC(2026, 0, 1, 17, 54, 0);
await t.mutation(api.queue.upsertAgentProfile, {
agentKey: "snapshot-agent",
version: "1.0.0",
secretsRef: [],
enabled: true,
});
await t.mutation(api.lib.bindUserAgent, {
consumerUserId: "snapshot-user-a",
agentKey: "snapshot-agent",
source: "manual",
});
await t.mutation(api.lib.bindUserAgent, {
consumerUserId: "snapshot-user-b",
agentKey: "snapshot-agent",
source: "manual",
});
const snapshotA = await t.mutation(api.queue.prepareDataSnapshotUpload as any, {
workerId: "worker-snapshot-a",
workspaceId: "default",
agentKey: "snapshot-agent",
conversationId: "user-agent:snapshot-agent:snapshot-user-a",
reason: "manual",
nowMs,
});
const storageA = await t.run(async (ctx) => {
return await ctx.storage.store(new Blob(["snapshot-a"]));
});
await t.mutation(api.queue.finalizeDataSnapshotUpload as any, {
workerId: "worker-snapshot-a",
snapshotId: snapshotA.snapshotId,
storageId: storageA,
sha256: "snapshot-a",
sizeBytes: 10,
nowMs: nowMs + 1,
});
const snapshotB = await t.mutation(api.queue.prepareDataSnapshotUpload as any, {
workerId: "worker-snapshot-b",
workspaceId: "default",
agentKey: "snapshot-agent",
conversationId: "user-agent:snapshot-agent:snapshot-user-b",
reason: "manual",
nowMs: nowMs + 2,
});
const storageB = await t.run(async (ctx) => {
return await ctx.storage.store(new Blob(["snapshot-b"]));
});
await t.mutation(api.queue.finalizeDataSnapshotUpload as any, {
workerId: "worker-snapshot-b",
snapshotId: snapshotB.snapshotId,
storageId: storageB,
sha256: "snapshot-b",
sizeBytes: 10,
nowMs: nowMs + 3,
});
const listed = await t.query((api.lib as any).listSnapshotsForUserAgent, {
consumerUserId: "snapshot-user-a",
agentKey: "snapshot-agent",
nowMs: nowMs + 4,
});
expect(listed).toHaveLength(1);
expect(listed[0]?.conversationId).toBe("user-agent:snapshot-agent:snapshot-user-a");
const latest = await t.query((api.lib as any).getLatestSnapshotForUserAgent, {
consumerUserId: "snapshot-user-a",
agentKey: "snapshot-agent",
nowMs: nowMs + 4,
});
expect(latest?.conversationId).toBe("user-agent:snapshot-agent:snapshot-user-a");
});
test("release stuck jobs should clear worker assignment when the lease is recovered", async () => {
const t = initConvexTest();
const nowMs = Date.UTC(2026, 0, 1, 17, 55, 0);
vi.setSystemTime(nowMs);
await t.mutation(api.queue.upsertAgentProfile, {
agentKey: "support-agent",
version: "1.0.0",
secretsRef: [],
enabled: true,
});
const messageId = await t.mutation(api.queue.enqueueMessage, {
conversationId: "telegram:chat:lease-clear",
agentKey: "support-agent",
payload: {
provider: "telegram",
providerUserId: "u-lease-clear",
messageText: "recover me",
},
nowMs,
});
const claim = await t.mutation(api.lib.claim, {
workerId: "worker-lease-clear-1",
conversationId: "telegram:chat:lease-clear",
nowMs,
});
expect(claim?.messageId).toBe(messageId);
const released = await t.mutation(api.queue.releaseStuckJobs, {
nowMs: nowMs + DEFAULT_CONFIG.lease.leaseMs + 1,
limit: 10,
});
expect(released.requeued).toBe(1);
expect(released.unlocked).toBe(1);
const workers = await t.query((internal.queue as any).listWorkersForScheduler, {});
const worker = workers.find(
(row: { workerId: string }) => row.workerId === "worker-lease-clear-1",
);
expect(worker?.assignment).toBeNull();
});
test("checkIdleShutdowns should backfill missing scheduledShutdownAt for idle active workers", async () => {
const t = initConvexTest();
const nowMs = Date.UTC(2026, 0, 1, 18, 0, 0);
vi.setSystemTime(nowMs);
const fetchMock = vi.fn(async (input: RequestInfo | URL, init?: RequestInit) => {
const url = String(input);
const method = init?.method ?? "GET";
if (url.endsWith(`/apps/${TEST_PROVIDER_CONFIG.appName}/machines`) && method === "GET") {
return jsonResponse([]);
}
if (url.endsWith(`/apps/${TEST_PROVIDER_CONFIG.appName}/volumes`) && method === "GET") {
return jsonResponse([]);
}
throw new Error(`Unexpected fetch ${method} ${url}`);
});
vi.stubGlobal("fetch", fetchMock);
await t.mutation(internal.queue.upsertWorkerState, {
workerId: "worker-missing-shutdown-1",
provider: "fly",
status: "active",
load: 0,
nowMs: nowMs - 600_000,
});
await t.run(async (ctx) => {
const worker = await ctx.db
.query("workers")
.withIndex("by_workerId", (q) => q.eq("workerId", "worker-missing-shutdown-1"))
.unique();
if (!worker) {
throw new Error("Worker not found");
}
await ctx.db.patch(worker._id, {
lastClaimAt: nowMs - 600_000,
heartbeatAt: nowMs,
scheduledShutdownAt: undefined,
});
});
const result = await t.action(api.scheduler.checkIdleShutdowns, {
nowMs,
flyApiToken: "fly-token",
providerConfig: TEST_PROVIDER_CONFIG,
});
expect(result.stopped).toBe(0);
expect(result.pending).toBe(0);
const workers = await t.query((internal.queue as any).listWorkersForScheduler, {});
const worker = workers.find(
(row: { workerId: string }) => row.workerId === "worker-missing-shutdown-1",
);
expect(worker?.status).toBe("active");
expect(worker?.scheduledShutdownAt).toBe((nowMs - 600_000) + DEFAULT_CONFIG.scaling.idleTimeoutMs);
});
test("provider-stopped machines should not remain active in the worker table", async () => {
const t = initConvexTest();
const nowMs = Date.UTC(2026, 0, 1, 19, 0, 0);
vi.setSystemTime(nowMs);
const fetchMock = vi.fn(async (input: RequestInfo | URL, init?: RequestInit) => {
const url = String(input);
const method = init?.method ?? "GET";
if (url.endsWith(`/apps/${TEST_PROVIDER_CONFIG.appName}/machines`) && method === "GET") {
return jsonResponse([
{
id: "machine-provider-stopped-1",
name: "worker-provider-stopped-1",
region: TEST_PROVIDER_CONFIG.region,
state: "stopped",
config: { image: TEST_PROVIDER_CONFIG.image },
},
]);
}
if (url.endsWith(`/machines/machine-provider-stopped-1`) && method === "GET") {
return jsonResponse({
id: "machine-provider-stopped-1",
config: { mounts: [] },
});
}
if (url.endsWith(`/machines/machine-provider-stopped-1/cordon`) && method === "POST") {
return emptyResponse();
}
if (url.endsWith(`/machines/machine-provider-stopped-1/stop`) && method === "POST") {
return emptyResponse();
}
if (url.endsWith(`/machines/machine-provider-stopped-1`) && method === "DELETE") {
return emptyResponse();
}
if (url.endsWith(`/apps/${TEST_PROVIDER_CONFIG.appName}/volumes`) && method === "GET") {
return jsonResponse([]);
}
throw new Error(`Unexpected fetch ${method} ${url}`);
});
vi.stubGlobal("fetch", fetchMock);
await t.mutation(internal.queue.upsertWorkerState, {
workerId: "worker-provider-stopped-1",
provider: "fly",
status: "active",
load: 1,
nowMs: nowMs - 60_000,
machineId: "machine-provider-stopped-1",
appName: TEST_PROVIDER_CONFIG.appName,
region: TEST_PROVIDER_CONFIG.region,
});
const reconcile = await t.action(api.scheduler.checkIdleShutdowns, {
nowMs,
flyApiToken: "fly-token",
providerConfig: TEST_PROVIDER_CONFIG,
});
expect(reconcile.stopped).toBe(1);
expect(reconcile.pending).toBe(0);
const workers = await t.query((internal.queue as any).listWorkersForScheduler, {});
const worker = workers.find(
(row: { workerId: string }) => row.workerId === "worker-provider-stopped-1",
);
expect(worker?.status).toBe("stopped");
});
test("provider transient machine states should remain active", async () => {
const t = initConvexTest();
const nowMs = Date.UTC(2026, 0, 1, 19, 15, 0);
vi.setSystemTime(nowMs);
const fetchMock = vi.fn(async (input: RequestInfo | URL, init?: RequestInit) => {
const url = String(input);
const method = init?.method ?? "GET";
if (url.endsWith(`/apps/${TEST_PROVIDER_CONFIG.appName}/machines`) && method === "GET") {
return jsonResponse([
{
id: "machine-provider-creating-1",
name: "worker-provider-creating-1",
region: TEST_PROVIDER_CONFIG.region,
state: "creating",
config: { image: TEST_PROVIDER_CONFIG.image },
},
]);
}
if (url.endsWith(`/apps/${TEST_PROVIDER_CONFIG.appName}/volumes`) && method === "GET") {
return jsonResponse([]);
}
throw new Error(`Unexpected fetch ${method} ${url}`);
});
vi.stubGlobal("fetch", fetchMock);
await t.mutation(internal.queue.upsertWorkerState, {
workerId: "worker-provider-creating-1",
provider: "fly",
status: "active",
load: 1,
nowMs,
machineId: "machine-provider-creating-1",
appName: TEST_PROVIDER_CONFIG.appName,
region: TEST_PROVIDER_CONFIG.region,
});
const reconcile = await t.action(api.scheduler.checkIdleShutdowns, {
nowMs,
flyApiToken: "fly-token",
providerConfig: TEST_PROVIDER_CONFIG,
});
expect(reconcile.stopped).toBe(0);
const workers = await t.query((internal.queue as any).listWorkersForScheduler, {});
const worker = workers.find(
(row: { workerId: string }) => row.workerId === "worker-provider-creating-1",
);
expect(worker?.status).toBe("active");
});
test("fresh missing provider workers should remain active during grace window", async () => {
const t = initConvexTest();
const nowMs = Date.UTC(2026, 0, 1, 19, 20, 0);
vi.setSystemTime(nowMs);
const fetchMock = vi.fn(async (input: RequestInfo | URL, init?: RequestInit) => {
const url = String(input);
const method = init?.method ?? "GET";
if (url.endsWith(`/apps/${TEST_PROVIDER_CONFIG.appName}/machines`) && method === "GET") {
return jsonResponse([]);
}
if (url.endsWith(`/apps/${TEST_PROVIDER_CONFIG.appName}/volumes`) && method === "GET") {
return jsonResponse([]);
}
throw new Error(`Unexpected fetch ${method} ${url}`);
});
vi.stubGlobal("fetch", fetchMock);
await t.mutation(internal.queue.upsertWorkerState, {
workerId: "worker-provider-missing-fresh-1",
provider: "fly",
status: "active",
load: 0,
nowMs,
machineId: "machine-provider-missing-fresh-1",
appName: TEST_PROVIDER_CONFIG.appName,
region: TEST_PROVIDER_CONFIG.region,
});
const reconcile = await t.action(api.scheduler.checkIdleShutdowns, {
nowMs: nowMs + 15_000,
flyApiToken: "fly-token",
providerConfig: TEST_PROVIDER_CONFIG,
});
expect(reconcile.stopped).toBe(0);
const workers = await t.query((internal.queue as any).listWorkersForScheduler, {});
const worker = workers.find(
(row: { workerId: string }) => row.workerId === "worker-provider-missing-fresh-1",
);
expect(worker?.status).toBe("active");
});
test("provider-unavailable active workers should enter draining before teardown", async () => {
const t = initConvexTest();
const nowMs = Date.UTC(2026, 0, 1, 19, 30, 0);
vi.setSystemTime(nowMs);
const fetchMock = vi.fn(async (input: RequestInfo | URL, init?: RequestInit) => {
const url = String(input);
const method = init?.method ?? "GET";
if (url.endsWith(`/apps/${TEST_PROVIDER_CONFIG.appName}/machines`) && method === "GET") {
return jsonResponse([
{
id: "machine-provider-draining-1",
name: "worker-provider-draining-1",
region: TEST_PROVIDER_CONFIG.region,
state: "stopped",
config: { image: TEST_PROVIDER_CONFIG.image },
},
]);
}
if (url.endsWith(`/apps/${TEST_PROVIDER_CONFIG.appName}/volumes`) && method === "GET") {
return jsonResponse([]);
}
throw new Error(`Unexpected fetch ${method} ${url}`);
});
vi.stubGlobal("fetch", fetchMock);
await t.mutation(api.queue.upsertAgentProfile, {
agentKey: "support-agent",
version: "1.0.0",
secretsRef: [],
enabled: true,
});
const messageId = await t.mutation(api.lib.enqueue, {
conversationId: "telegram:chat:provider-draining",
agentKey: "support-agent",
payload: {
provider: "telegram",
providerUserId: "u-provider-draining",
messageText: "snapshot me",
},
});
const claim = await t.mutation(api.lib.claim, {
workerId: "worker-provider-draining-1",
nowMs,
});
expect(claim?.messageId).toBe(messageId);
await t.mutation(internal.queue.upsertWorkerState, {
workerId: "worker-provider-draining-1",
provider: "fly",
status: "active",
load: 1,
nowMs,
machineId: "machine-provider-draining-1",
appName: TEST_PROVIDER_CONFIG.appName,
region: TEST_PROVIDER_CONFIG.region,
});
const reconcile = await t.action(api.scheduler.checkIdleShutdowns, {
nowMs,
flyApiToken: "fly-token",
providerConfig: TEST_PROVIDER_CONFIG,
});
expect(reconcile.stopped).toBe(1);
expect(reconcile.pending).toBe(1);
const workers = await t.query((internal.queue as any).listWorkersForScheduler, {});
const worker = workers.find(
(row: { workerId: string }) => row.workerId === "worker-provider-draining-1",
);
expect(worker?.status).toBe("draining");
expect(worker?.stoppedAt).toBeNull();
const teardownCalls = fetchMock.mock.calls.filter((call) => {
const url = String(call[0]);
const method = ((call[1] as RequestInit | undefined)?.method ?? "GET").toUpperCase();
return (
url.includes("/machines/machine-provider-draining-1/cordon") ||
url.includes("/machines/machine-provider-draining-1/stop") ||
(url.endsWith("/machines/machine-provider-draining-1") && method === "DELETE")
);
});
expect(teardownCalls).toHaveLength(0);
});
test("spawn failure should not leave preregistered worker active", async () => {
const t = initConvexTest();
const nowMs = Date.UTC(2026, 0, 1, 19, 35, 0);
vi.setSystemTime(nowMs);
const fetchMock = vi.fn(async (input: RequestInfo | URL, init?: RequestInit) => {
const url = String(input);
const method = init?.method ?? "GET";
if (url.endsWith(`/apps/${TEST_PROVIDER_CONFIG.appName}/machines`) && method === "GET") {
return jsonResponse([]);
}
if (url.endsWith(`/apps/${TEST_PROVIDER_CONFIG.appName}/volumes`) && method === "GET") {
return jsonResponse([]);
}
if (url.endsWith(`/apps/${TEST_PROVIDER_CONFIG.appName}/volumes`) && method === "POST") {
return jsonResponse({
id: "vol-spawn-failure-1",
name: buildDedicatedVolumeName(TEST_PROVIDER_CONFIG.volumeName, "afw-spawn-failure"),
region: TEST_PROVIDER_CONFIG.region,
});
}
if (url.endsWith(`/apps/${TEST_PROVIDER_CONFIG.appName}/machines`) && method === "POST") {
throw new Error("simulated spawn failure");
}
if (url.endsWith(`/volumes/vol-spawn-failure-1`) && method === "DELETE") {
return emptyResponse();
}
throw new Error(`Unexpected fetch ${method} ${url}`);
});
vi.stubGlobal("fetch", fetchMock);
await t.mutation(api.queue.upsertAgentProfile, {
agentKey: "support-agent",
version: "1.0.0",
secretsRef: [],
enabled: true,
});
await t.mutation(api.queue.importPlaintextSecret, {
secretRef: "fly.apiToken",
plaintextValue: "fly-token",
});
await t.mutation(api.queue.importPlaintextSecret, {
secretRef: "convex.url",
plaintextValue: "https://example.convex.cloud",
});
await t.mutation(api.queue.enqueueMessage, {
conversationId: "telegram:chat:spawn-failure",
agentKey: "support-agent",
payload: {
provider: "telegram",
providerUserId: "u-spawn-failure",
messageText: "hello",
},
nowMs,
});
await expect(
t.action(api.scheduler.reconcileWorkerPool, {
nowMs,
flyApiToken: "fly-token",
convexUrl: "https://example.convex.cloud",
scalingPolicy: {
maxWorkers: 5,
queuePerWorkerTarget: 1,
spawnStep: 1,
idleTimeoutMs: 300_000,
reconcileIntervalMs: 15_000,
},
providerConfig: TEST_PROVIDER_CONFIG,
}),
).rejects.toThrow("simulated spawn failure");
const workers = await t.query((internal.queue as any).listWorkersForScheduler, {});
const worker = workers.find((row: { workerId: string }) => row.workerId === `afw-${nowMs}-0`);
expect(worker?.status).toBe("stopped");
expect(worker?.machineId).toBeNull();
expect(worker?.volumeId).toBeNull();
});
test("push jobs should dispatch scheduled messages with a stable user-agent conversation id", async () => {
const t = initConvexTest();
await t.mutation(api.queue.upsertAgentProfile, {
agentKey: "push-agent",
version: "1.0.0",
secretsRef: [],
enabled: true,
});
await t.mutation(api.lib.bindUserAgent, {
consumerUserId: "user-push-1",
agentKey: "push-agent",
source: "manual",
metadata: { companyId: "co-1" },
});
const baseMs = Date.UTC(2026, 0, 1, 7, 59, 0);
const jobId = await t.mutation((api.lib as any).createPushJobCustom, {
companyId: "co-1",
consumerUserId: "user-push-1",
title: "Daily check",
text: "Ping automatico",
periodicity: "daily",
timezone: "UTC",
schedule: {
kind: "daily",
time: "08:00",
},
nowMs: baseMs,
});
const dispatch = await t.mutation((api.lib as any).dispatchDuePushJobs, {
nowMs: baseMs + 6 * 60_000,
limit: 50,
});
expect(dispatch.enqueued).toBe(1);
expect(dispatch.failed).toBe(0);
const queueStats = await t.query(api.lib.queueStats, {});
expect(queueStats.queuedReady).toBe(1);
const claim = await t.mutation(api.lib.claim, { workerId: "worker-push-fallback-1" });
expect(claim?.conversationId).toBe("user-agent:push-agent:user-push-1");
expect(claim?.payload.provider).toBe("system_push");
expect(claim?.payload.providerUserId).toBe("user-push-1");
const jobDispatches = await t.query((api.lib as any).listPushDispatchesByJob, {
jobId,
limit: 10,
});
expect(jobDispatches.length).toBe(1);
expect(jobDispatches[0].status).toBe("enqueued");
});
test("triggerPushJobNow should keep the stable conversation id after telegram pairing", async () => {
const t = initConvexTest();
await t.mutation(api.queue.upsertAgentProfile, {
agentKey: "push-telegram-manual-agent",
version: "1.0.0",
secretsRef: [],
botIdentity: "push-telegram-manual-bot",
enabled: true,
});
await t.mutation(api.lib.bindUserAgent, {
consumerUserId: "user-push-telegram-manual",
agentKey: "push-telegram-manual-agent",
botIdentity: "push-telegram-manual-bot",
source: "telegram_pairing",
telegramUserId: "tg-user-manual-1",
telegramChatId: "8246761447",
});
const nowMs = Date.UTC(2026, 0, 1, 8, 0, 0);
const jobId = await t.mutation((api.lib as any).createPushJobCustom, {
companyId: "co-tg-manual",
consumerUserId: "user-push-telegram-manual",
title: "Manual push",
text: "Messaggio manuale",
periodicity: "manual",
timezone: "UTC",
schedule: {
kind: "manual",
},
nowMs,
});
await t.mutation((api.lib as any).triggerPushJobNow, {
jobId,
nowMs,
});
const claim = await t.mutation(api.lib.claim, { workerId: "worker-push-telegram-manual-1" });
expect(claim?.conversationId).toBe(
"user-agent:push-telegram-manual-agent:user-push-telegram-manual",
);
expect(claim?.payload.provider).toBe("telegram");
expect(claim?.payload.providerUserId).toBe("tg-user-manual-1");
expect(claim?.payload.metadata?.telegramChatId).toBe("8246761447");
expect(claim?.payload.metadata?.telegramUserId).toBe("tg-user-manual-1");
});
test("dispatchDuePushJobs should keep the stable conversation id when telegram is available", async () => {
const t = initConvexTest();
await t.mutation(api.queue.upsertAgentProfile, {
agentKey: "push-telegram-scheduled-agent",
version: "1.0.0",
secretsRef: [],
botIdentity: "push-telegram-scheduled-bot",
enabled: true,
});
await t.mutation(api.lib.bindUserAgent, {
consumerUserId: "user-push-telegram-scheduled",
agentKey: "push-telegram-scheduled-agent",
botIdentity: "push-telegram-scheduled-bot",
source: "telegram_pairing",
telegramUserId: "tg-user-scheduled-1",
telegramChatId: "9988776655",
});
const baseMs = Date.UTC(2026, 0, 1, 7, 59, 0);
await t.mutation((api.lib as any).createPushJobCustom, {
companyId: "co-tg-scheduled",
consumerUserId: "user-push-telegram-scheduled",
title: "Daily telegram check",
text: "Ping telegram",
periodicity: "daily",
timezone: "UTC",
schedule: {
kind: "daily",
time: "08:00",
},
nowMs: baseMs,
});
const dispatch = await t.mutation((api.lib as any).dispatchDuePushJobs, {
nowMs: baseMs + 6 * 60_000,
limit: 50,
});
expect(dispatch.enqueued).toBe(1);
expect(dispatch.failed).toBe(0);
const claim = await t.mutation(api.lib.claim, { workerId: "worker-push-telegram-scheduled-1" });
expect(claim?.conversationId).toBe(
"user-agent:push-telegram-scheduled-agent:user-push-telegram-scheduled",
);
expect(claim?.payload.provider).toBe("telegram");
expect(claim?.payload.providerUserId).toBe("tg-user-scheduled-1");
expect(claim?.payload.metadata?.telegramChatId).toBe("9988776655");
expect(claim?.payload.metadata?.telegramUserId).toBe("tg-user-scheduled-1");
});
test("admin broadcast should enqueue to all active company agents", async () => {
const t = initConvexTest();
await t.mutation(api.queue.upsertAgentProfile, {
agentKey: "broadcast-agent-a",
version: "1.0.0",
secretsRef: [],
enabled: true,
});
await t.mutation(api.queue.upsertAgentProfile, {
agentKey: "broadcast-agent-b",
version: "1.0.0",
secretsRef: [],
enabled: true,
});
await t.mutation(api.lib.bindUserAgent, {
consumerUserId: "company-user-a",
agentKey: "broadcast-agent-a",
source: "manual",
metadata: { companyId: "company-broadcast" },
});
await t.mutation(api.lib.bindUserAgent, {
consumerUserId: "company-user-b",
agentKey: "broadcast-agent-b",
source: "manual",
metadata: { companyId: "company-broadcast" },
});
const result = await t.mutation((api.lib as any).sendBroadcastToAllActiveAgents, {
companyId: "company-broadcast",
title: "Aggiornamento policy",
text: "Sincronizza le nuove istruzioni",
requestedBy: "admin-1",
});
expect(result.totalTargets).toBe(2);
expect(result.enqueued).toBe(2);
expect(result.failed).toBe(0);
const queueStats = await t.query(api.lib.queueStats, {});
expect(queueStats.queuedReady).toBe(2);
});
});