/** * Copyright 2023 Google LLC * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * https://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ import * as functions from "firebase-functions/v1"; import * as admin from "firebase-admin"; import { getFunctions } from "firebase-admin/functions"; import * as utils from "./utils"; export interface BackfillTask

{ tasksDoc: string; taskId: string; collectionName: string; chunk: P[]; } export function taskThreadTaskHandler

( handler: ( chunk: P[], ) => Promise<{ success: number; failed: number; skipped: number }>, queueName: string, extensionInstanceId?: string, ) { return async (data: BackfillTask

) => { // TODO: this needs to be matching what we send const { taskId, chunk, tasksDoc } = data; // functions.logger.info( // `Handling task ${JSON.stringify(taskId)}, chunk ${JSON.stringify(chunk)} chunk length ${chunk?.length},task doc ${tasksDoc}` // ); if (!chunk || chunk.length === 0) { functions.logger.info("No data to handle, skipping..."); return; } functions.logger.info(`Handling ${chunk.length} documents`); const taskRef = admin.firestore().doc(`${tasksDoc}/enqueues/${taskId}`); await taskRef.update({ status: "PROCESSING", }); const { success, failed, skipped } = await handler(chunk); await taskRef.update({ status: utils.BackfillStatus.DONE, }); functions.logger.info( `Task ${taskId} completed with ${success} success(es)`, ); const tasksDocSnap = await admin.firestore().doc(tasksDoc).get(); let { backfillJobsTotal: totalTasks, backfillJobsProcessed: processedTasks, backfillJobsSkipped: skippedTasks, backfillJobsFailed: failedTasks, } = tasksDocSnap.data() as any; // check if null or undefined or not a number if ( [totalTasks, processedTasks, skippedTasks, failedTasks].some( (val) => val === null || val === undefined || typeof val !== "number", ) ) { throw new Error("Invalid task document"); } processedTasks += success; skippedTasks += skipped; failedTasks += failed; await admin .firestore() .doc(tasksDoc) .update({ backfillJobsFailed: admin.firestore.FieldValue.increment(failed), backfillJobsSkipped: admin.firestore.FieldValue.increment(skipped), backfillJobsProcessed: admin.firestore.FieldValue.increment(success), }); functions.logger.info( `Current state: ${processedTasks} processed, ${skippedTasks} skipped, ${failedTasks} failed out of ${totalTasks} total tasks`, ); if (processedTasks + skippedTasks + failedTasks === totalTasks) { await admin.firestore().doc(tasksDoc).update({ backfillStatus: utils.BackfillStatus.DONE, }); } else { await _createNextTask(taskId, tasksDoc, queueName, extensionInstanceId); } }; } export function getNextTaskId( prevId: string, extensionInstanceId?: string, ): string { const taskPattern = /^task-\d+$/; const extTaskPattern = new RegExp(`^ext-${extensionInstanceId}-task-\\d+$`); if ( !taskPattern.test(prevId) && (!extensionInstanceId || !extTaskPattern.test(prevId)) ) { throw new Error(`Invalid task ID format: ${prevId}`); } const taskNum = prevId.split("task-")[1]; const nextId = extensionInstanceId ? `ext-${extensionInstanceId}-task-${parseInt(taskNum) + 1}` : `task-${parseInt(taskNum) + 1}`; return nextId; } async function _createNextTask( prevId: string, tasksDoc: string, queueName: string, extensionInstanceId?: string, ) { const nextId = getNextTaskId(prevId, extensionInstanceId); if (!nextId) { throw new Error("Generated task ID is undefined or invalid"); } const nextTask = await admin .firestore() .doc(`${tasksDoc}/enqueues/${nextId}`) .get(); if (!nextTask.exists) { functions.logger.error(`Next task document ${nextId} not found.`); throw new Error(`Next task document ${nextId} does not exist.`); } const nextTaskData = nextTask.data(); if (!nextTaskData?.chunk || nextTaskData.chunk.length === 0) { functions.logger.error( `Next task ${nextId} has an invalid or empty chunk.`, ); throw new Error(`Next task ${nextId} does not have valid chunk data.`); } const queue = getFunctions().taskQueue(queueName, extensionInstanceId); await queue.enqueue({ taskId: nextId, // Ensure this is set correctly chunk: nextTaskData.chunk, tasksDoc, }); functions.logger.info(`Successfully enqueued task ${nextId}`); }