import type { Context } from "../context.js"; import { Resource } from "../resource.js"; import { CloudflareApiError, handleApiError } from "./api-error.js"; import { CloudflareApi, createCloudflareApi, type CloudflareApiOptions, } from "./api.js"; /** * Settings for a Cloudflare Queue */ export interface QueueSettings { /** * Delay in seconds before message delivery * Queue will not deliver messages until this time has elapsed */ deliveryDelay?: number; /** * Whether delivery is paused * If true, the queue will not deliver messages to consumers */ deliveryPaused?: boolean; /** * Period in seconds to retain messages * Messages will be automatically deleted after this time */ messageRetentionPeriod?: number; } /** * Properties for creating or updating a Cloudflare Queue */ export interface QueueProps extends CloudflareApiOptions { /** * Name of the queue * Required during creation * Cannot be changed after creation * * @default id */ name?: string; /** * Settings for the queue * These can be updated after queue creation */ settings?: QueueSettings; /** * Whether to delete the queue. * If set to false, the queue will remain but the resource will be removed from state * * @default true */ delete?: boolean; } export function isQueue(eventSource: any): eventSource is Queue { return "Kind" in eventSource && eventSource.Kind === "cloudflare::Queue"; } /** * Output returned after Cloudflare Queue creation/update */ export interface Queue extends Resource<"cloudflare::Queue">, QueueProps { /** * Type identifier for Cloudflare Queue */ type: "queue"; /** * The unique ID of the queue */ id: string; /** * The name of the queue */ name: string; /** * Time when the queue was created */ createdOn: string; /** * Modified timestamp */ modifiedOn: string; /** * Phantom property to allow type inference */ Body: Body; Batch: MessageBatch; } interface CloudflareQueueResponse { result: { queue_id?: string; queue_name: string; created_on?: string; modified_on?: string; settings?: { delivery_delay?: number; delivery_paused?: boolean; message_retention_period?: number; }; }; success: boolean; errors: Array<{ code: number; message: string }>; messages: string[]; } /** * Creates and manages Cloudflare Queues. * * Queues provide a managed queue system for reliable message delivery * between workers and other systems. * * @example * // Create a basic queue with default settings * const basicQueue = await Queue("my-app-queue", { * name: "my-app-queue" * }); * * @example * // Create a queue with custom settings * const customQueue = await Queue("delayed-queue", { * name: "delayed-queue", * settings: { * deliveryDelay: 30, // 30 second delay before message delivery * messageRetentionPeriod: 86400 // Store messages for 1 day * } * }); * * @example * // Create a paused queue for later activation * const pausedQueue = await Queue("paused-queue", { * name: "paused-queue", * settings: { * deliveryPaused: true * } * }); * * @see https://developers.cloudflare.com/queues/ */ export const Queue = Resource("cloudflare::Queue", async function < T = unknown, >(this: Context>, id: string, props: QueueProps = {}): Promise< Queue > { const api = await createCloudflareApi(props); const queueName = props.name ?? id; if (this.phase === "delete") { console.log("Deleting Cloudflare Queue:", queueName); if (props.delete !== false) { // Delete Queue await deleteQueue(api, this.output?.id); } // Return void (a deleted queue has no content) return this.destroy(); } else { let queueData: CloudflareQueueResponse; if (this.phase === "create") { console.log("Creating Cloudflare Queue:", queueName); queueData = await createQueue(api, queueName, props); } else { // Update operation if (this.output?.id) { console.log("Updating Cloudflare Queue:", queueName); // Check if name is being changed, which is not allowed if (props.name !== this.output.name) { throw new Error( "Cannot update Queue name after creation. Queue name is immutable." ); } // Update the queue with new settings queueData = await updateQueue(api, this.output.id, props); } else { // If no ID exists, fall back to creating a new queue console.log( "No existing Queue ID found, creating new Cloudflare Queue:", queueName ); queueData = await createQueue(api, queueName, props); } } return this({ type: "queue", id: queueData.result.queue_id || "", name: queueName, settings: queueData.result.settings ? { deliveryDelay: queueData.result.settings.delivery_delay, deliveryPaused: queueData.result.settings.delivery_paused, messageRetentionPeriod: queueData.result.settings.message_retention_period, } : undefined, createdOn: queueData.result.created_on || new Date().toISOString(), modifiedOn: queueData.result.modified_on || new Date().toISOString(), accountId: api.accountId, // phantom properties Body: undefined as T, Batch: undefined! as MessageBatch, }); } }); /** * Create a new Cloudflare Queue */ export async function createQueue( api: CloudflareApi, queueName: string, props: QueueProps ): Promise { // Prepare the create payload const createPayload: any = { queue_name: queueName, }; // Add settings if provided if (props.settings) { createPayload.settings = {}; if (props.settings.deliveryDelay !== undefined) { createPayload.settings.delivery_delay = props.settings.deliveryDelay; } if (props.settings.deliveryPaused !== undefined) { createPayload.settings.delivery_paused = props.settings.deliveryPaused; } if (props.settings.messageRetentionPeriod !== undefined) { createPayload.settings.message_retention_period = props.settings.messageRetentionPeriod; } } const createResponse = await api.post( `/accounts/${api.accountId}/queues`, createPayload ); if (!createResponse.ok) { return await handleApiError(createResponse, "creating", "Queue", queueName); } return (await createResponse.json()) as CloudflareQueueResponse; } /** * Get a Cloudflare Queue */ export async function getQueue( api: CloudflareApi, queueId: string ): Promise { const response = await api.get( `/accounts/${api.accountId}/queues/${queueId}` ); if (!response.ok) { return await handleApiError(response, "getting", "Queue", queueId); } return (await response.json()) as CloudflareQueueResponse; } /** * Delete a Cloudflare Queue */ export async function deleteQueue( api: CloudflareApi, queueId?: string ): Promise { if (!queueId) { console.log("No Queue ID provided, skipping delete"); return; } // Delete Queue const deleteResponse = await api.delete( `/accounts/${api.accountId}/queues/${queueId}` ); if (!deleteResponse.ok && deleteResponse.status !== 404) { const errorData: any = await deleteResponse.json().catch(() => ({ errors: [{ message: deleteResponse.statusText }], })); throw new CloudflareApiError( `Error deleting Cloudflare Queue '${queueId}': ${errorData.errors?.[0]?.message || deleteResponse.statusText}`, deleteResponse ); } } /** * Update a Cloudflare Queue * * Note: According to Cloudflare API, the queue name cannot be changed after creation. * Only the settings can be updated. */ export async function updateQueue( api: CloudflareApi, queueId: string, props: QueueProps ): Promise { // Prepare the update payload - only include settings const updatePayload: any = {}; // Add settings if provided if (props.settings) { updatePayload.settings = {}; if (props.settings.deliveryDelay !== undefined) { updatePayload.settings.delivery_delay = props.settings.deliveryDelay; } if (props.settings.deliveryPaused !== undefined) { updatePayload.settings.delivery_paused = props.settings.deliveryPaused; } if (props.settings.messageRetentionPeriod !== undefined) { updatePayload.settings.message_retention_period = props.settings.messageRetentionPeriod; } } // Use PATCH for partial updates (only settings can be updated) const updateResponse = await api.patch( `/accounts/${api.accountId}/queues/${queueId}`, updatePayload ); if (!updateResponse.ok) { return await handleApiError(updateResponse, "updating", "Queue", queueId); } return (await updateResponse.json()) as CloudflareQueueResponse; } /** * List all Cloudflare Queues in an account */ export async function listQueues( api: CloudflareApi ): Promise<{ name: string; id: string }[]> { const response = await api.get(`/accounts/${api.accountId}/queues`); if (!response.ok) { throw new CloudflareApiError( `Failed to list queues: ${response.statusText}`, response ); } const data = (await response.json()) as { success: boolean; errors?: Array<{ code: number; message: string }>; result?: Array<{ queue_name: string; queue_id: string; }>; }; if (!data.success) { const errorMessage = data.errors?.[0]?.message || "Unknown error"; throw new Error(`Failed to list queues: ${errorMessage}`); } // Transform API response return (data.result || []).map((queue) => ({ name: queue.queue_name, id: queue.queue_id, })); }