import type { Context } from "../context.ts";
import { Resource, ResourceKind } from "../resource.ts";
import { Scope } from "../scope.ts";
import { CloudflareApiError, handleApiError } from "./api-error.ts";
import {
createCloudflareApi,
type CloudflareApi,
type CloudflareApiOptions,
} from "./api.ts";
/**
* Settings for a Cloudflare Queue
*/
export interface QueueSettings {
/**
* Delay in seconds before message delivery
* Queue will not deliver messages until this time has elapsed
*/
deliveryDelay?: number;
/**
* Whether delivery is paused
* If true, the queue will not deliver messages to consumers
*/
deliveryPaused?: boolean;
/**
* Period in seconds to retain messages
* Messages will be automatically deleted after this time
*/
messageRetentionPeriod?: number;
}
/**
* Properties for creating or updating a Cloudflare Queue
*/
export interface QueueProps extends CloudflareApiOptions {
/**
* Name of the queue
* Required during creation
* Cannot be changed after creation
*
* @default ${app}-${stage}-${id}
*/
name?: string;
/**
* Settings for the queue
* These can be updated after queue creation
*/
settings?: QueueSettings;
/**
* Dead letter queue for failed messages
* Can be either a queue name (string) or a Queue object
*/
dlq?: string | Queue;
/**
* Whether to delete the queue.
* If set to false, the queue will remain but the resource will be removed from state
*
* @default true
*/
delete?: boolean;
/**
* Whether to adopt an existing queue with the same name if it exists
* If true, during creation, if a queue with the same name exists, it will be adopted instead of creating a new one
*
* @default false
*/
adopt?: boolean;
/**
* Whether to emulate the queue locally when Alchemy is running in watch mode.
*/
dev?: {
/**
* Whether to run the queue remotely instead of locally
* @default false
*/
remote?: boolean;
/**
* Set when `Scope.local` is true to force update to the queue even if it was already deployed live.
* @internal
*/
force?: boolean;
};
}
export function isQueue(eventSource: any): eventSource is Queue {
return (
ResourceKind in eventSource &&
eventSource[ResourceKind] === "cloudflare::Queue"
);
}
/**
* Output returned after Cloudflare Queue creation/update
*/
export type Queue
= Omit & {
/**
* Type identifier for Cloudflare Queue
*/
type: "queue";
/**
* The unique ID of the queue
*/
id: string;
/**
* The name of the queue
*/
name: string;
/**
* Time when the queue was created
*/
createdOn: string;
/**
* Modified timestamp
*/
modifiedOn: string;
/**
* Phantom property to allow type inference
*/
Body: Body;
Batch: MessageBatch;
/**
* Development mode properties
* @internal
*/
dev: {
/**
* The ID of the queue in development mode
*/
id: string;
/**
* Whether the queue is running remotely
*/
remote: boolean;
};
};
/**
* Creates and manages Cloudflare Queues.
*
* Queues provide a managed queue system for reliable message delivery
* between workers and other systems.
*
* @example
* // Create a basic queue with default settings
* const basicQueue = await Queue("my-app-queue", {
* name: "my-app-queue"
* });
*
* @example
* // Create a queue with custom settings
* const customQueue = await Queue("delayed-queue", {
* name: "delayed-queue",
* settings: {
* deliveryDelay: 30, // 30 second delay before message delivery
* messageRetentionPeriod: 86400 // Store messages for 1 day
* }
* });
*
* @example
* // Create a paused queue for later activation
* const pausedQueue = await Queue("paused-queue", {
* name: "paused-queue",
* settings: {
* deliveryPaused: true
* }
* });
*
* @example
* // Create a queue with a dead letter queue using string reference
* const dlqQueue = await Queue("dlq-queue", {
* name: "dlq-queue"
* });
*
* const mainQueue = await Queue("main-queue", {
* name: "main-queue",
* dlq: "dlq-queue"
* });
*
* @example
* // Create a queue with a dead letter queue using Queue object
* const dlqQueue = await Queue("dlq-queue", {
* name: "dlq-queue"
* });
*
* const mainQueue = await Queue("main-queue", {
* name: "main-queue",
* dlq: dlqQueue
* });
*
* @example
* // Create a queue and configure Worker consumer with custom settings
* const processingQueue = await Queue("processing-queue", {
* name: "processing-queue"
* });
*
* const processingWorker = await Worker("processor", {
* entrypoint: "./src/processor.ts",
* bindings: {
* QUEUE: processingQueue // Producer: bind queue for sending messages
* },
* eventSources: [{ // Consumer: configure processing settings
* queue: processingQueue,
* settings: {
* batchSize: 25, // Process 25 messages at once
* maxConcurrency: 5, // Allow 5 concurrent invocations
* maxRetries: 3, // Retry failed messages up to 3 times
* maxWaitTimeMs: 1500, // Wait up to 1.5 seconds to fill a batch
* retryDelay: 45, // Wait 45 seconds before retrying failed messages
* deadLetterQueue: "failed-processing" // Send failed messages to DLQ
* }
* }]
* });
*
* @see https://developers.cloudflare.com/queues/
*/
export async function Queue(
id: string,
props: QueueProps = {},
): Promise> {
return await _Queue(id, {
...props,
dev: {
...(props.dev ?? {}),
force: Scope.current.local,
},
});
}
const _Queue = Resource("cloudflare::Queue", async function <
T = unknown,
>(this: Context>, id: string, props: QueueProps = {}): Promise<
Queue
> {
const queueName =
props.name ?? this.output?.name ?? this.scope.createPhysicalName(id);
const dev = {
id: this.output?.dev?.id ?? this.output?.id ?? id,
remote: props.dev?.remote ?? false,
};
if (this.scope.local && !props.dev?.remote) {
return {
type: "queue",
id: this.output?.id ?? "",
name: queueName,
dev,
createdOn: this.output?.createdOn ?? new Date().toISOString(),
modifiedOn: this.output?.modifiedOn ?? new Date().toISOString(),
Body: undefined as T,
Batch: undefined! as MessageBatch,
};
}
const api = await createCloudflareApi(props);
if (this.phase === "delete") {
if (props.delete !== false && this.output?.id) {
// Delete Queue
await deleteQueue(api, this.output.id);
}
// Return void (a deleted queue has no content)
return this.destroy();
}
let queueData: CloudflareQueueResponse;
if (this.phase === "create" || !this.output?.id) {
try {
queueData = await createQueue(api, queueName, props);
} catch (error) {
if (error instanceof CloudflareApiError && error.status === 409) {
if (!(props.adopt ?? this.scope.adopt)) {
throw error;
}
// Queue already exists, try to find it by name
const existingQueue = await findQueueByName(api, queueName);
if (!existingQueue) {
throw new Error(
`Queue with name ${queueName} not found despite 409 conflict`,
);
}
queueData = existingQueue;
queueData = await updateQueue(api, queueData.result.queue_id!, {
...props,
name: queueName,
});
} else {
throw error;
}
}
} else {
// Update operation
if (this.output?.id) {
// Update the queue with new settings
queueData = await updateQueue(api, this.output.id, {
...props,
name: queueName,
});
} else {
// If no ID exists, fall back to creating a new queue
queueData = await createQueue(api, queueName, props);
}
}
return {
type: "queue",
id: queueData.result.queue_id || "",
name: queueName,
settings: queueData.result.settings
? {
deliveryDelay: queueData.result.settings.delivery_delay,
deliveryPaused: queueData.result.settings.delivery_paused,
messageRetentionPeriod:
queueData.result.settings.message_retention_period,
}
: undefined,
dlq: props.dlq,
createdOn: queueData.result.created_on || new Date().toISOString(),
modifiedOn: queueData.result.modified_on || new Date().toISOString(),
accountId: api.accountId,
dev,
// phantom properties
Body: undefined as T,
Batch: undefined! as MessageBatch,
};
});
interface CloudflareQueueResponse {
result: {
queue_id?: string;
queue_name: string;
created_on?: string;
modified_on?: string;
settings?: {
delivery_delay?: number;
delivery_paused?: boolean;
message_retention_period?: number;
};
};
success: boolean;
errors: Array<{ code: number; message: string }>;
messages: string[];
}
/**
* Create a new Cloudflare Queue
*/
export async function createQueue(
api: CloudflareApi,
queueName: string,
props: QueueProps,
): Promise {
// Prepare the create payload
const createPayload: any = {
queue_name: queueName,
};
// Add settings if provided
if (props.settings) {
createPayload.settings = {};
if (props.settings.deliveryDelay !== undefined) {
createPayload.settings.delivery_delay = props.settings.deliveryDelay;
}
if (props.settings.deliveryPaused !== undefined) {
createPayload.settings.delivery_paused = props.settings.deliveryPaused;
}
if (props.settings.messageRetentionPeriod !== undefined) {
createPayload.settings.message_retention_period =
props.settings.messageRetentionPeriod;
}
}
const createResponse = await api.post(
`/accounts/${api.accountId}/queues`,
createPayload,
);
if (!createResponse.ok) {
return await handleApiError(createResponse, "creating", "Queue", queueName);
}
return (await createResponse.json()) as CloudflareQueueResponse;
}
/**
* Get a Cloudflare Queue
*/
export async function getQueue(
api: CloudflareApi,
queueId: string,
): Promise {
const response = await api.get(
`/accounts/${api.accountId}/queues/${queueId}`,
);
if (!response.ok) {
return await handleApiError(response, "getting", "Queue", queueId);
}
return (await response.json()) as CloudflareQueueResponse;
}
/**
* Delete a Cloudflare Queue
*/
export async function deleteQueue(
api: CloudflareApi,
queueId: string,
): Promise {
// Delete Queue
const deleteResponse = await api.delete(
`/accounts/${api.accountId}/queues/${queueId}`,
);
if (!deleteResponse.ok && deleteResponse.status !== 404) {
const errorData: any = await deleteResponse.json().catch(() => ({
errors: [{ message: deleteResponse.statusText }],
}));
throw new CloudflareApiError(
`Error deleting Cloudflare Queue '${queueId}': ${errorData.errors?.[0]?.message || deleteResponse.statusText}`,
deleteResponse,
);
}
}
/**
* Update a Cloudflare Queue
*
* Note: According to Cloudflare API, the queue name cannot be changed after creation.
* Only the settings can be updated.
*/
export async function updateQueue(
api: CloudflareApi,
queueId: string,
props: QueueProps & {
name: string;
},
): Promise {
// Prepare the update payload - only include settings
const updatePayload: {
queue_name?: string;
settings?: {
delivery_delay?: number;
delivery_paused?: boolean;
message_retention_period?: number;
};
} = {
queue_name: props.name,
};
// Add settings if provided
if (props.settings) {
updatePayload.settings = {};
if (props.settings.deliveryDelay !== undefined) {
updatePayload.settings.delivery_delay = props.settings.deliveryDelay;
}
if (props.settings.deliveryPaused !== undefined) {
updatePayload.settings.delivery_paused = props.settings.deliveryPaused;
}
if (props.settings.messageRetentionPeriod !== undefined) {
updatePayload.settings.message_retention_period =
props.settings.messageRetentionPeriod;
}
}
// Use PATCH for partial updates (only settings can be updated)
const updateResponse = await api.patch(
`/accounts/${api.accountId}/queues/${queueId}`,
updatePayload,
);
if (!updateResponse.ok) {
return await handleApiError(updateResponse, "updating", "Queue", queueId);
}
return (await updateResponse.json()) as CloudflareQueueResponse;
}
/**
* List all Cloudflare Queues in an account
*/
type CloudflareQueueListItem = {
queue_name: string;
queue_id: string;
created_on?: string;
modified_on?: string;
settings?: {
delivery_delay?: number;
delivery_paused?: boolean;
message_retention_period?: number;
};
};
type CloudflareQueueListResponse = {
success: boolean;
errors?: Array<{ code: number; message: string }>;
result?: CloudflareQueueListItem[];
result_info?: {
per_page?: number;
total_pages?: number;
};
};
const CLOUDFLARE_QUEUE_PAGE_SIZE = 100;
async function fetchQueuePage(
api: CloudflareApi,
page: number,
perPage = CLOUDFLARE_QUEUE_PAGE_SIZE,
): Promise {
const searchParams = new URLSearchParams({
page: page.toString(),
per_page: perPage.toString(),
});
const response = await api.get(
`/accounts/${api.accountId}/queues?${searchParams.toString()}`,
);
if (!response.ok) {
return await handleApiError(response, "listing", "Queues", "");
}
const data = (await response.json()) as CloudflareQueueListResponse;
if (!data.success) {
const errorMessage = data.errors?.[0]?.message || "Unknown error";
throw new Error(`Failed to list queues: ${errorMessage}`);
}
return data;
}
function hasMoreQueuePages(
data: CloudflareQueueListResponse,
currentPage: number,
perPage: number,
): boolean {
if (data.result_info?.total_pages !== undefined) {
return currentPage < data.result_info.total_pages;
}
const pageSize = data.result_info?.per_page ?? perPage;
const currentCount = data.result?.length ?? 0;
return currentCount > 0 && currentCount >= pageSize;
}
export async function listQueues(
api: CloudflareApi,
): Promise<{ name: string; id: string }[]> {
const queues: CloudflareQueueListItem[] = [];
let page = 1;
// Paginate through all queues to ensure nothing is missed on later pages
while (true) {
const data = await fetchQueuePage(api, page);
queues.push(...(data.result ?? []));
if (!hasMoreQueuePages(data, page, CLOUDFLARE_QUEUE_PAGE_SIZE)) {
break;
}
page += 1;
}
// Transform API response
return queues.map((queue) => ({
name: queue.queue_name,
id: queue.queue_id,
}));
}
/**
* Find a Cloudflare Queue by name
*/
export async function findQueueByName(
api: CloudflareApi,
queueName: string,
): Promise {
let page = 1;
while (true) {
const data = await fetchQueuePage(api, page);
const queue = data.result?.find((q) => q.queue_name === queueName);
if (queue) {
return {
result: {
queue_id: queue.queue_id,
queue_name: queue.queue_name,
created_on: queue.created_on,
modified_on: queue.modified_on,
settings: queue.settings,
},
success: true,
errors: [],
messages: [],
};
}
if (!hasMoreQueuePages(data, page, CLOUDFLARE_QUEUE_PAGE_SIZE)) {
return null;
}
page += 1;
}
}