/** * Cortex SDK - Mutable Store API (Layer 1c) * * ACID-compliant mutable storage for live, frequently-changing data * Namespaces: inventory, config, counters, sessions, state, etc. */ import { ConvexError, v } from "convex/values"; import { mutation, query } from "./_generated/server"; // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ // Mutations (Write Operations) // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ /** * Set a key to a value (creates or overwrites) */ export const set = mutation({ args: { namespace: v.string(), key: v.string(), value: v.any(), userId: v.optional(v.string()), tenantId: v.optional(v.string()), // Multi-tenancy: SaaS platform isolation metadata: v.optional(v.any()), }, handler: async (ctx, args) => { const now = Date.now(); // Check if entry exists - must match both namespace/key AND tenantId for proper isolation let existing; if (args.tenantId) { // With tenant isolation: look for tenant-specific entry existing = await ctx.db .query("mutable") .withIndex("by_tenant_namespace_key", (q) => q .eq("tenantId", args.tenantId!) .eq("namespace", args.namespace) .eq("key", args.key), ) .first(); } else { // Without tenant: fallback to global namespace/key lookup // SECURITY: Must verify the matched record has no tenantId to prevent cross-tenant access const candidate = await ctx.db .query("mutable") .withIndex("by_namespace_key", (q) => q.eq("namespace", args.namespace).eq("key", args.key), ) .first(); // Only match if the record is truly global (no tenantId) existing = candidate && !candidate.tenantId ? candidate : null; } if (existing) { // Update existing await ctx.db.patch(existing._id, { value: args.value, userId: args.userId !== undefined ? args.userId : existing.userId, metadata: args.metadata !== undefined ? args.metadata : existing.metadata, updatedAt: now, }); return await ctx.db.get(existing._id); } // Create new const _id = await ctx.db.insert("mutable", { namespace: args.namespace, key: args.key, value: args.value, userId: args.userId, tenantId: args.tenantId, // Store tenantId metadata: args.metadata, createdAt: now, updatedAt: now, }); return await ctx.db.get(_id); }, }); /** * Atomic update using updater function */ export const update = mutation({ args: { namespace: v.string(), key: v.string(), // Note: updater function is passed as serialized code in the actual implementation // For now, we'll handle simple operations operation: v.union( v.literal("increment"), v.literal("decrement"), v.literal("append"), v.literal("custom"), ), operand: v.optional(v.any()), tenantId: v.optional(v.string()), // Multi-tenancy: SaaS platform isolation }, handler: async (ctx, args) => { let existing; if (args.tenantId) { // Tenant-isolated lookup existing = await ctx.db .query("mutable") .withIndex("by_tenant_namespace_key", (q) => q .eq("tenantId", args.tenantId!) .eq("namespace", args.namespace) .eq("key", args.key), ) .first(); } else { // Global namespace/key lookup for non-tenant records only // SECURITY: Must verify the matched record has no tenantId to prevent cross-tenant updates const candidate = await ctx.db .query("mutable") .withIndex("by_namespace_key", (q) => q.eq("namespace", args.namespace).eq("key", args.key), ) .first(); // Only match if the record is truly global (no tenantId) existing = candidate && !candidate.tenantId ? candidate : null; } if (!existing) { throw new ConvexError("MUTABLE_KEY_NOT_FOUND"); } let newValue = existing.value; // Apply operation switch (args.operation) { case "increment": newValue = (existing.value || 0) + (args.operand || 1); break; case "decrement": newValue = (existing.value || 0) - (args.operand || 1); break; case "append": if (Array.isArray(existing.value)) { newValue = [...existing.value, args.operand]; } else { throw new Error("MUTABLE_VALUE_NOT_ARRAY"); } break; case "custom": newValue = args.operand; // For custom updates via SDK break; } await ctx.db.patch(existing._id, { value: newValue, updatedAt: Date.now(), }); return await ctx.db.get(existing._id); }, }); /** * Delete a key */ export const deleteKey = mutation({ args: { namespace: v.string(), key: v.string(), tenantId: v.optional(v.string()), // Multi-tenancy: SaaS platform isolation }, handler: async (ctx, args) => { let entry; if (args.tenantId) { // Tenant-isolated delete entry = await ctx.db .query("mutable") .withIndex("by_tenant_namespace_key", (q) => q .eq("tenantId", args.tenantId!) .eq("namespace", args.namespace) .eq("key", args.key), ) .first(); } else { // Global namespace/key delete for non-tenant records only // SECURITY: Must verify the matched record has no tenantId to prevent cross-tenant deletion const candidate = await ctx.db .query("mutable") .withIndex("by_namespace_key", (q) => q.eq("namespace", args.namespace).eq("key", args.key), ) .first(); // Only match if the record is truly global (no tenantId) entry = candidate && !candidate.tenantId ? candidate : null; } if (!entry) { throw new ConvexError("MUTABLE_KEY_NOT_FOUND"); } await ctx.db.delete(entry._id); return { deleted: true, namespace: args.namespace, key: args.key, }; }, }); /** * Purge all keys in a namespace * * SECURITY: When tenantId is provided, only entries belonging to that tenant * within the namespace are purged. This prevents cross-tenant data deletion. */ export const purgeNamespace = mutation({ args: { namespace: v.string(), dryRun: v.optional(v.boolean()), tenantId: v.optional(v.string()), // Multi-tenancy: SaaS platform isolation }, handler: async (ctx, args) => { let entries; if (args.tenantId) { // Tenant-isolated purge: only delete entries for this tenant entries = await ctx.db .query("mutable") .withIndex("by_tenant_namespace", (q) => q.eq("tenantId", args.tenantId!).eq("namespace", args.namespace), ) .collect(); } else { // Global namespace purge (use with caution) entries = await ctx.db .query("mutable") .withIndex("by_namespace", (q) => q.eq("namespace", args.namespace)) .collect(); } // If dryRun, return what would be deleted without actually deleting if (args.dryRun) { return { deleted: entries.length, namespace: args.namespace, keys: entries.map((e) => e.key), dryRun: true, }; } let deleted = 0; for (const entry of entries) { await ctx.db.delete(entry._id); deleted++; } return { deleted, namespace: args.namespace, dryRun: false, }; }, }); /** * Execute multiple operations atomically * * SECURITY: When tenantId is provided, all operations are scoped to that tenant. * This prevents cross-tenant data modification in atomic transactions. */ export const transaction = mutation({ args: { operations: v.array( v.object({ op: v.union( v.literal("set"), v.literal("update"), v.literal("delete"), v.literal("increment"), v.literal("decrement"), ), namespace: v.string(), key: v.string(), value: v.optional(v.any()), amount: v.optional(v.number()), }), ), tenantId: v.optional(v.string()), // Multi-tenancy: SaaS platform isolation }, handler: async (ctx, args) => { const results = []; for (const operation of args.operations) { let existing; if (args.tenantId) { // Tenant-isolated lookup existing = await ctx.db .query("mutable") .withIndex("by_tenant_namespace_key", (q) => q .eq("tenantId", args.tenantId!) .eq("namespace", operation.namespace) .eq("key", operation.key), ) .first(); } else { // Global namespace/key lookup for non-tenant records only // SECURITY: Must verify the matched record has no tenantId to prevent cross-tenant access const candidate = await ctx.db .query("mutable") .withIndex("by_namespace_key", (q) => q.eq("namespace", operation.namespace).eq("key", operation.key), ) .first(); // Only match if the record is truly global (no tenantId) existing = candidate && !candidate.tenantId ? candidate : null; } if (operation.op === "set") { if (existing) { await ctx.db.patch(existing._id, { value: operation.value, updatedAt: Date.now(), }); results.push(await ctx.db.get(existing._id)); } else { const now = Date.now(); const _id = await ctx.db.insert("mutable", { namespace: operation.namespace, key: operation.key, value: operation.value, tenantId: args.tenantId, // Store tenantId for new entries createdAt: now, updatedAt: now, }); results.push(await ctx.db.get(_id)); } } else if ( operation.op === "update" || operation.op === "increment" || operation.op === "decrement" ) { if (!existing) { throw new ConvexError( `MUTABLE_KEY_NOT_FOUND: ${operation.namespace}/${operation.key}`, ); } let newValue = existing.value; if (operation.op === "increment") { newValue = (existing.value || 0) + (operation.amount || 1); } else if (operation.op === "decrement") { newValue = (existing.value || 0) - (operation.amount || 1); } else { // update with provided value newValue = operation.value; } await ctx.db.patch(existing._id, { value: newValue, updatedAt: Date.now(), }); results.push(await ctx.db.get(existing._id)); } else if (operation.op === "delete") { if (!existing) { throw new ConvexError( `MUTABLE_KEY_NOT_FOUND: ${operation.namespace}/${operation.key}`, ); } await ctx.db.delete(existing._id); results.push({ deleted: true, namespace: operation.namespace, key: operation.key, }); } } return { success: true, operationsExecuted: args.operations.length, results, }; }, }); /** * Bulk delete keys matching filters * * SECURITY: When tenantId is provided, only entries belonging to that tenant * are deleted. This prevents cross-tenant data deletion. */ export const purgeMany = mutation({ args: { namespace: v.string(), keyPrefix: v.optional(v.string()), userId: v.optional(v.string()), updatedBefore: v.optional(v.number()), tenantId: v.optional(v.string()), // Multi-tenancy: SaaS platform isolation }, handler: async (ctx, args) => { let entries; if (args.tenantId) { // Tenant-isolated purge entries = await ctx.db .query("mutable") .withIndex("by_tenant_namespace", (q) => q.eq("tenantId", args.tenantId!).eq("namespace", args.namespace), ) .collect(); } else { entries = await ctx.db .query("mutable") .withIndex("by_namespace", (q) => q.eq("namespace", args.namespace)) .collect(); } // Apply filters if (args.keyPrefix) { entries = entries.filter((e) => e.key.startsWith(args.keyPrefix!)); } if (args.userId) { entries = entries.filter((e) => e.userId === args.userId); } // Filter by updatedBefore if provided if (args.updatedBefore !== undefined) { entries = entries.filter((e) => e.updatedAt < args.updatedBefore!); } let deleted = 0; for (const entry of entries) { await ctx.db.delete(entry._id); deleted++; } return { deleted, namespace: args.namespace, keys: entries.map((e) => e.key), }; }, }); // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ // Queries (Read Operations) // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ /** * Get value for a key */ export const get = query({ args: { namespace: v.string(), key: v.string(), tenantId: v.optional(v.string()), // Multi-tenancy filter }, handler: async (ctx, args) => { let entry; if (args.tenantId) { // Tenant-isolated lookup entry = await ctx.db .query("mutable") .withIndex("by_tenant_namespace_key", (q) => q .eq("tenantId", args.tenantId!) .eq("namespace", args.namespace) .eq("key", args.key), ) .first(); } else { // Global namespace/key lookup for non-tenant records only // SECURITY: Must verify the matched record has no tenantId to prevent cross-tenant reads const candidate = await ctx.db .query("mutable") .withIndex("by_namespace_key", (q) => q.eq("namespace", args.namespace).eq("key", args.key), ) .first(); // Only match if the record is truly global (no tenantId) entry = candidate && !candidate.tenantId ? candidate : null; } return entry || null; }, }); /** * Check if key exists */ export const exists = query({ args: { namespace: v.string(), key: v.string(), tenantId: v.optional(v.string()), // Multi-tenancy filter }, handler: async (ctx, args) => { let entry; if (args.tenantId) { entry = await ctx.db .query("mutable") .withIndex("by_tenant_namespace_key", (q) => q .eq("tenantId", args.tenantId!) .eq("namespace", args.namespace) .eq("key", args.key), ) .first(); } else { // Global namespace/key lookup for non-tenant records only // SECURITY: Must verify the matched record has no tenantId to prevent cross-tenant checks const candidate = await ctx.db .query("mutable") .withIndex("by_namespace_key", (q) => q.eq("namespace", args.namespace).eq("key", args.key), ) .first(); // Only match if the record is truly global (no tenantId) entry = candidate && !candidate.tenantId ? candidate : null; } return entry !== null; }, }); /** * List keys in namespace */ export const list = query({ args: { namespace: v.string(), keyPrefix: v.optional(v.string()), userId: v.optional(v.string()), tenantId: v.optional(v.string()), // Multi-tenancy filter limit: v.optional(v.number()), offset: v.optional(v.number()), updatedAfter: v.optional(v.number()), updatedBefore: v.optional(v.number()), sortBy: v.optional(v.string()), sortOrder: v.optional(v.string()), }, handler: async (ctx, args) => { // Collect all matching entries first for filtering and sorting let entries; if (args.tenantId) { // Tenant-isolated query entries = await ctx.db .query("mutable") .withIndex("by_tenant_namespace", (q) => q.eq("tenantId", args.tenantId!).eq("namespace", args.namespace), ) .collect(); } else { entries = await ctx.db .query("mutable") .withIndex("by_namespace", (q) => q.eq("namespace", args.namespace)) .collect(); } // Filter by key prefix if provided if (args.keyPrefix) { entries = entries.filter((e) => e.key.startsWith(args.keyPrefix!)); } // Filter by userId if provided if (args.userId) { entries = entries.filter((e) => e.userId === args.userId); } // Filter by updatedAfter if provided if (args.updatedAfter !== undefined) { entries = entries.filter((e) => e.updatedAt > args.updatedAfter!); } // Filter by updatedBefore if provided if (args.updatedBefore !== undefined) { entries = entries.filter((e) => e.updatedAt < args.updatedBefore!); } // Sort entries const sortBy = args.sortBy || "key"; const sortOrder = args.sortOrder || "asc"; const sortMultiplier = sortOrder === "desc" ? -1 : 1; entries.sort((a, b) => { let comparison = 0; if (sortBy === "key") { comparison = a.key.localeCompare(b.key); } else if (sortBy === "updatedAt") { comparison = a.updatedAt - b.updatedAt; } return comparison * sortMultiplier; }); // Apply offset const offset = args.offset || 0; entries = entries.slice(offset); // Apply limit const limit = args.limit || 100; entries = entries.slice(0, limit); return entries; }, }); /** * Count keys in namespace */ export const count = query({ args: { namespace: v.string(), userId: v.optional(v.string()), tenantId: v.optional(v.string()), // Multi-tenancy filter keyPrefix: v.optional(v.string()), updatedAfter: v.optional(v.number()), updatedBefore: v.optional(v.number()), }, handler: async (ctx, args) => { let entries; if (args.tenantId) { entries = await ctx.db .query("mutable") .withIndex("by_tenant_namespace", (q) => q.eq("tenantId", args.tenantId!).eq("namespace", args.namespace), ) .collect(); } else { entries = await ctx.db .query("mutable") .withIndex("by_namespace", (q) => q.eq("namespace", args.namespace)) .collect(); } // Filter by userId if provided if (args.userId) { entries = entries.filter((e) => e.userId === args.userId); } // Filter by key prefix if provided if (args.keyPrefix) { entries = entries.filter((e) => e.key.startsWith(args.keyPrefix!)); } // Filter by updatedAfter if provided if (args.updatedAfter !== undefined) { entries = entries.filter((e) => e.updatedAt > args.updatedAfter!); } // Filter by updatedBefore if provided if (args.updatedBefore !== undefined) { entries = entries.filter((e) => e.updatedAt < args.updatedBefore!); } return entries.length; }, }); /** * Purge all mutable entries (TEST/DEV ONLY) * * WARNING: This permanently deletes mutable entries! * Only available in test/dev environments. * * SECURITY: When tenantId is provided, only entries belonging to that tenant * are deleted. Without tenantId, ALL entries are deleted (dangerous!). */ export const purgeAll = mutation({ args: { tenantId: v.optional(v.string()), // Multi-tenancy: SaaS platform isolation }, handler: async (ctx, args) => { // Safety check: Only allow in test/dev environments const siteUrl = process.env.CONVEX_SITE_URL || ""; const isLocal = siteUrl.includes("localhost") || siteUrl.includes("127.0.0.1"); const isDevDeployment = siteUrl.includes(".convex.site") || siteUrl.includes("dev-") || siteUrl.includes("convex.cloud"); const isTestEnv = process.env.NODE_ENV === "test" || process.env.CONVEX_ENVIRONMENT === "test"; if (!isLocal && !isDevDeployment && !isTestEnv) { throw new Error( "PURGE_DISABLED_IN_PRODUCTION: purgeAll is only available in test/dev environments.", ); } let allEntries; if (args.tenantId) { // Tenant-isolated purge: only delete entries for this tenant allEntries = await ctx.db .query("mutable") .withIndex("by_tenantId", (q) => q.eq("tenantId", args.tenantId!)) .collect(); } else { // Global purge (use with extreme caution) allEntries = await ctx.db.query("mutable").collect(); } for (const entry of allEntries) { await ctx.db.delete(entry._id); } return { deleted: allEntries.length }; }, });