import { Duration, RemovalPolicy } from 'aws-cdk-lib'; import { IMetric } from 'aws-cdk-lib/aws-cloudwatch'; import { Table } from 'aws-cdk-lib/aws-dynamodb'; import { Key } from 'aws-cdk-lib/aws-kms'; import { IChainable, StateMachine } from 'aws-cdk-lib/aws-stepfunctions'; import { BedrockInvokeModel, DynamoAttributeValue, LambdaInvoke, StepFunctionsStartExecution } from 'aws-cdk-lib/aws-stepfunctions-tasks'; import { Construct } from 'constructs'; import { IAdapter } from './adapter'; import { Network } from '../framework'; import { EventbridgeBroker } from '../framework/foundation/eventbridge-broker'; import { LogGroupDataProtectionProps } from '../utilities'; import { IObservable, ObservableProps } from '../utilities/observability/observable'; /** * Configuration properties for BaseDocumentProcessing construct. */ export interface BaseDocumentProcessingProps extends ObservableProps { /** * Adapter that defines how the document processing workflow is triggered * * @default QueuedS3Adapter */ readonly ingressAdapter?: IAdapter; /** * DynamoDB table for storing document processing metadata and workflow state. * If not provided, a new table will be created with DocumentId as partition key. */ readonly documentProcessingTable?: Table; /** * Maximum execution time for the Step Functions workflow. * @default Duration.minutes(30) */ readonly workflowTimeout?: Duration; /** * Removal policy for created resources (bucket, table, queue). * @default RemovalPolicy.DESTROY */ readonly removalPolicy?: RemovalPolicy; /** * Optional EventBridge broker for publishing custom events during processing. * If not provided, no custom events will be sent out. */ readonly eventbridgeBroker?: EventbridgeBroker; /** * Enable logging and tracing for all supporting resource * @default false */ readonly enableObservability?: boolean; /** * Resources that can run inside a VPC will follow the provided network configuration * @default resources will run outside of a VPC */ readonly network?: Network; /** * KMS key to be used. * @default A new key would be created */ readonly encryptionKey?: Key; } /** * Union type for Step Functions tasks that can be used in document processing workflows. * Supports Bedrock model invocation, Lambda function invocation, and nested Step Functions execution. */ export type DocumentProcessingStepType = BedrockInvokeModel | LambdaInvoke | StepFunctionsStartExecution; /** * Abstract base class for serverless document processing workflows. * * Provides a complete document processing pipeline with: * - **S3 Storage**: Organized with prefixes (raw/, processed/, failed/) for document lifecycle management * - **SQS Queue**: Reliable message processing with configurable visibility timeout and dead letter queue * - **DynamoDB Table**: Workflow metadata tracking with DocumentId as partition key * - **Step Functions**: Orchestrated workflow with automatic file movement based on processing outcome * - **Auto-triggering**: S3 event notifications automatically start processing when files are uploaded to raw/ prefix * - **Error Handling**: Failed documents are moved to failed/ prefix with error details stored in DynamoDB * - **EventBridge Integration**: Optional custom event publishing for workflow state changes * * ## Architecture Flow * S3 Upload (raw/) → SQS → Lambda Consumer → Step Functions → Processing Steps → S3 (processed/failed/) * * ## Implementation Requirements * Subclasses must implement four abstract methods to define the processing workflow: * - `classificationStep()`: Document type classification * - `extractionStep()`: Data extraction from documents * - `enrichmentStep()`: Optional data enrichment (return undefined to skip) * - `postProcessingStep()`: Optional post-processing (return undefined to skip) */ export declare abstract class BaseDocumentProcessing extends Construct implements IObservable { /** Business metric service name. This is part of the initial service dimension */ readonly metricServiceName: string; /** Business metric namespace. */ readonly metricNamespace: string; /** log group data protection configuration */ readonly logGroupDataProtection: LogGroupDataProtectionProps; /** DynamoDB table for storing document processing metadata and workflow state */ readonly documentProcessingTable: Table; /** Configuration properties for the document processing pipeline */ private readonly props; /** KMS key */ readonly encryptionKey: Key; /** Ingress adapter, responsible for triggering workflow */ readonly ingressAdapter: IAdapter; /** * Creates a new BaseDocumentProcessing construct. * * Initializes the complete document processing infrastructure including S3 bucket, * SQS queue, DynamoDB table, and sets up S3 event notifications to trigger processing. * * @param scope - The scope in which to define this construct * @param id - The scoped construct ID. Must be unique within the scope. * @param props - Configuration properties for the document processing pipeline */ constructor(scope: Construct, id: string, props: BaseDocumentProcessingProps); protected handleStateMachineCreation(stateMachineId: string): StateMachine; private createStateMachineRole; private createMoveToFailedChain; private createMoveToProcessedChain; metrics(): IMetric[]; /** * Defines the optional preprocessing step of the workflow. * * This step runs BEFORE Init Metadata and can be used for: * - Document chunking for large files * - Document validation * - Format conversion * - Any other preprocessing needed before classification * * Concrete implementations can return undefined to skip preprocessing, * maintaining backward compatibility with existing workflows. * * @returns Step Functions task for preprocessing, or undefined to skip this step */ protected abstract preprocessingStep(): DocumentProcessingStepType | undefined; /** * Hook for concrete implementations to add preprocessing-specific metadata to DynamoDB. * * This method is called during InitMetadata creation and allows subclasses to extend * the DynamoDB schema with their own fields without the base class knowing the details. * * The base class provides the core document fields (DocumentId, ContentType, etc.), * and subclasses can add their own fields (e.g., chunking metadata) by overriding this method. * * @returns Record of additional DynamoDB attribute values to include in InitMetadata * @default {} (no additional metadata) */ protected preprocessingMetadata(): Record; /** * Creates the processing workflow after preprocessing and initialization. * * Concrete implementations can customize this to handle preprocessing results. * For example, BedrockDocumentProcessing uses this to add conditional branching * for chunked vs non-chunked documents. * * Implementations can call `createStandardProcessingWorkflow()` to reuse the * standard processing flow (Classification → Processing → Enrichment → PostProcessing). * * @returns Step Functions chain for processing the document */ protected abstract createProcessingWorkflow(): IChainable; /** * Creates the standard processing workflow (no preprocessing customization). * * This is the existing workflow: Classification → Processing → Enrichment → PostProcessing * Concrete classes can call this method to reuse the standard flow when they don't * need custom workflow branching. * * @param idPrefix Optional prefix for construct IDs to ensure uniqueness when called multiple times * @returns Step Functions chain for standard processing */ protected createStandardProcessingWorkflow(idPrefix?: string): IChainable; /** * Defines the document classification step of the workflow. * * **CRITICAL**: Must set `outputPath` to preserve workflow state for subsequent steps. * The classification result should be available at `$.classificationResult` for DynamoDB storage. * * @returns Step Functions task for document classification */ protected abstract classificationStep(): DocumentProcessingStepType; /** * Defines the document processing step of the workflow. * * **CRITICAL**: Must set `outputPath` to preserve workflow state for subsequent steps. * The extraction result should be available at `$.processingResult` for DynamoDB storage. * * @returns Step Functions task for document extraction */ protected abstract processingStep(): DocumentProcessingStepType; /** * Defines the optional document enrichment step of the workflow. * * **CRITICAL**: If implemented, must set `outputPath` to preserve workflow state. * The enrichment result should be available at `$.enrichedResult` for DynamoDB storage. * * @returns Step Functions task for document enrichment, or undefined to skip this step */ protected abstract enrichmentStep(): DocumentProcessingStepType | undefined; /** * Defines the optional post-processing step of the workflow. * * **CRITICAL**: If implemented, must set `outputPath` to preserve workflow state. * The post-processing result should be available at `$.postProcessedResult` for DynamoDB storage. * * @returns Step Functions task for post-processing, or undefined to skip this step */ protected abstract postProcessingStep(): DocumentProcessingStepType | undefined; }