import { DestinationServer } from '@walkeros/server-core'; import { Flow } from '@walkeros/core'; /** * Mock-friendly Producer interface used by the destination. * Tests provide this via env.Kafka; production creates a real * kafkajs Producer and adapts it through settings._producer. */ interface KafkaProducerMock { connect: () => Promise; disconnect: () => Promise; send: (record: ProducerRecord) => Promise; } /** * Mock-friendly Admin interface used by setup (subset of kafkajs.Admin). * Tests provide this via env.Kafka; production creates a real * kafkajs Admin client. */ interface KafkaAdminMock { connect: () => Promise; disconnect: () => Promise; createTopics: (args: { topics: Array<{ topic: string; numPartitions: number; replicationFactor: number; configEntries?: Array<{ name: string; value: string; }>; }>; validateOnly?: boolean; waitForLeaders?: boolean; timeout?: number; }) => Promise; fetchTopicMetadata: (args: { topics?: string[]; }) => Promise<{ topics: Array<{ name: string; partitions: Array<{ partitionId: number; leader: number; replicas: number[]; isr: number[]; }>; }>; }>; describeConfigs: (args: { resources: Array<{ type: number; name: string; }>; includeSynonyms?: boolean; }) => Promise<{ resources: Array<{ resourceName: string; configEntries: Array<{ configName: string; configValue: string; }>; }>; }>; } /** * Mock-friendly Kafka client interface (subset of kafkajs.Kafka). */ interface KafkaClientMock { producer: (config?: ProducerConfig) => KafkaProducerMock; admin: () => KafkaAdminMock; } /** * Constructor signature for the Kafka client. Accepts a config * object and returns a client with producer() factory. */ type KafkaClientConstructor = new (config: KafkaClientConfig) => KafkaClientMock; interface KafkaClientConfig { clientId?: string; brokers: string[]; ssl?: boolean | Record; sasl?: SASLConfig; connectionTimeout?: number; requestTimeout?: number; retry?: RetryConfig; } interface ProducerConfig { allowAutoTopicCreation?: boolean; idempotent?: boolean; } interface ProducerRecord { topic: string; messages: ProducerMessage[]; acks?: number; compression?: number; timeout?: number; } interface ProducerMessage { key?: string; value: string; headers?: Record; timestamp?: string; partition?: number; } interface CompressionTypesMap { None: number; GZIP: number; Snappy: number; LZ4: number; ZSTD: number; } type CompressionType = 'none' | 'gzip' | 'snappy' | 'lz4' | 'zstd'; interface SASLConfig { mechanism: 'plain' | 'scram-sha-256' | 'scram-sha-512' | 'aws' | 'oauthbearer'; username?: string; password?: string; accessKeyId?: string; secretAccessKey?: string; sessionToken?: string; authorizationIdentity?: string; } interface RetryConfig { maxRetryTime?: number; initialRetryTime?: number; retries?: number; } interface KafkaSettings { brokers: string[]; clientId?: string; ssl?: boolean | Record; sasl?: SASLConfig; connectionTimeout?: number; requestTimeout?: number; topic: string; acks?: number; timeout?: number; compression?: CompressionType; idempotent?: boolean; allowAutoTopicCreation?: boolean; key?: string; headers?: Record; retry?: RetryConfig; _producer?: KafkaProducerMock; } interface Settings { kafka: KafkaSettings; } /** * Env -- optional Kafka SDK override. Production leaves this undefined * and the destination creates real Kafka client instances. Tests provide * mocks via env.Kafka. */ interface Env extends DestinationServer.Env { Kafka?: { Kafka: KafkaClientConstructor; CompressionTypes: CompressionTypesMap; }; } declare const push: Env; declare const simulation: string[]; declare const env_push: typeof push; declare const env_simulation: typeof simulation; declare namespace env { export { env_push as push, env_simulation as simulation }; } /** * Extended step example that may carry destination-level settings overrides. */ type KafkaStepExample = Flow.StepExample & { settings?: Partial; }; /** * Default event -- full WalkerOS.Event serialized as JSON to the configured * topic. Message key defaults to entity_action when no key path is set. */ declare const defaultEvent: KafkaStepExample; /** * Mapped event name -- rule.name renames the event, which also changes the * default message key when no key mapping is configured. */ declare const mappedEventName: KafkaStepExample; /** * Mapped data -- data.map transforms the event payload. Value is the mapped * object serialized as JSON. */ declare const mappedData: KafkaStepExample; /** * Key from user -- settings.kafka.key path resolves the message key from * the event (here user.id). */ declare const keyFromUser: KafkaStepExample; /** * Topic override -- rule.settings.topic routes this rule to a different * topic than the destination default. */ declare const topicOverride: KafkaStepExample; /** * Ignored event -- mapping.ignore: true produces no producer.send call. */ declare const ignoredEvent: KafkaStepExample; type step_KafkaStepExample = KafkaStepExample; declare const step_defaultEvent: typeof defaultEvent; declare const step_ignoredEvent: typeof ignoredEvent; declare const step_keyFromUser: typeof keyFromUser; declare const step_mappedData: typeof mappedData; declare const step_mappedEventName: typeof mappedEventName; declare const step_topicOverride: typeof topicOverride; declare namespace step { export { type step_KafkaStepExample as KafkaStepExample, step_defaultEvent as defaultEvent, step_ignoredEvent as ignoredEvent, step_keyFromUser as keyFromUser, step_mappedData as mappedData, step_mappedEventName as mappedEventName, step_topicOverride as topicOverride }; } export { env, step };