import { allErrors, FlashAuditError } from "./errors"; import { EventEmitter } from "events" import * as crypto from 'crypto'; import http from "http" import https from "https" import { type AuditMessage, type CreateValues, type EditValues, type TriggerValues, type DeleteValues, validateMessage, } from './audit' process.env.KAFKAJS_NO_PARTITIONER_WARNING='1'; const DEFAULT_AUDIT_VERSION = 1; type FlashAuditConfig = { client: string, product: string, serverBrokers: string[], sslEnabled: boolean, auth?: { username: string, password: string }, verbose?: boolean, runtimeValidation?: boolean, version?: 1, debug?: boolean, } type InputEditAudit = { user: string, object: string, searchable: string[], values: EditValues, } type InputCreateAudit = { user: string, object: string, searchable: string[], values: CreateValues, } type InputDeleteAudit = { user: string, object: string, searchable: string[] values?: DeleteValues, } type InputTriggerAudit = { user: string, object: string, searchable: string[] values: TriggerValues } export class FlashAudit extends EventEmitter { private config : FlashAuditConfig; private topic : string; constructor (config:FlashAuditConfig) { super(); let self = this; this.config = config; if (self.config.version == undefined) { self.config.version = DEFAULT_AUDIT_VERSION; } this.topic = self.config.client; } send (body: AuditMessage) : Promise { let self = this; return new Promise(async function(resolve,reject){ if (self.config.runtimeValidation) { let errMsg = validateMessage(body); if (errMsg) { return resolve(allErrors.invalid_format.extra(errMsg)); } } // This is a retry mechanism that cycles between every broker. // This is only neccessary because we are using the Redpanda Web API for now let key = crypto.randomBytes(12).toString('base64'); let brokersAmount = self.config.serverBrokers.length; let firstBrokerIndex = Math.floor((Math.random()*100)) % brokersAmount; let lastError:void|FlashAuditError = undefined; for( let tryNumber = 0 ; tryNumber < brokersAmount ; tryNumber++) { let brokerHost = self.config.serverBrokers[(firstBrokerIndex+tryNumber)%brokersAmount]; lastError = await self._sendToBroker(body,brokerHost,key); if (lastError) { if (self.config.debug) console.log(lastError); } else { return resolve(); break; } } return resolve(lastError); }) } _sendToBroker(body:AuditMessage, brokerHost:string, key:string) { let self = this; return new Promise(async function(resolve,reject){ let requestBody = { "records": [ { "key": key, "value": body } ] } let stringifiedBody = JSON.stringify(requestBody) let scheme = self.config.sslEnabled ? https : http; let request = scheme.request({ method: 'POST', hostname: brokerHost.split(":")[0], port: brokerHost.split(":")[1], path: `/topics/${self.topic}`, headers: { 'Content-Type': 'application/vnd.kafka.json.v2+json', 'Content-Length': Buffer.byteLength(stringifiedBody) }, auth: `${self.config.auth?.username}:${self.config.auth?.password}`, timeout: 62000, }, function(res){ let data = ''; res.on('data',(chunk)=>data+=chunk); res.on('error',function(){ resolve(allErrors.broker_error) }) res.on('end',function(){ let responseBody: any; try { responseBody = JSON.parse(data) } catch { return resolve(allErrors.broker_error) } if (res.statusCode==401) { return resolve(allErrors.unauthorized) } else if (res.statusCode!<200 || res.statusCode!>=300) { return resolve(allErrors.broker_error) } else { if ( responseBody.offsets && responseBody.offsets.length>0 && responseBody.offsets[0].error_code ) { if ([3,29].find((val)=>(responseBody.offsets[0].error_code==val))) { return resolve(allErrors.unauthorized.extra(`Not authorized to write on topic "${self.topic}"`)) } return resolve(allErrors.broker_error.extra(`error_code: ${responseBody.offsets[0].error_code}`)); } return resolve(); } }) }) request.on('timeout',function(){ return resolve(allErrors.server_timedout) }) request.on("error",function(err){ if (self.config.debug) console.log(err) return resolve(allErrors.broker_unavailable) }) request.write(stringifiedBody) request.end() }) } sendCreate (body:InputCreateAudit) : Promise { let self = this; if (!body || typeof(body) != 'object') { return new Promise(function(resolve,reject){ return resolve(allErrors.invalid_format); }); } else { return self.send({ product: self.config.product, client: self.config.client, date: Date.now(), version: self.config.version ?? DEFAULT_AUDIT_VERSION, operation: 'create', user: body.user, object: body.object, searchable: body.searchable, values: body.values }); } } sendEdit (body:InputEditAudit) : Promise { let self = this; if (!body || typeof(body) != 'object') { return new Promise(function(resolve,reject){ return resolve(allErrors.invalid_format); }); } else { return self.send({ product: self.config.product, client: self.config.client, date: Date.now(), version: self.config.version ?? DEFAULT_AUDIT_VERSION, operation: 'edit', user: body.user, object: body.object, searchable: body.searchable, values: body.values }); } } sendDelete (body:InputDeleteAudit) : Promise { let self = this; if (!body || typeof(body) != 'object') { return new Promise(function(resolve,reject){ return resolve(allErrors.invalid_format); }); } else { return self.send({ product: self.config.product, client: self.config.client, date: Date.now(), version: self.config.version ?? DEFAULT_AUDIT_VERSION, operation: 'delete', user: body.user, object: body.object, searchable: body.searchable, ...body.values && {values: body.values}, }); } } sendTrigger (body:InputTriggerAudit) : Promise { let self = this; if (!body || typeof(body) != 'object') { return new Promise(function(resolve,reject){ return resolve(allErrors.invalid_format); }); } else { return self.send({ product: self.config.product, client: self.config.client, date: Date.now(), version: self.config.version ?? DEFAULT_AUDIT_VERSION, operation: 'trigger', user: body.user, object: body.object, searchable: body.searchable, values: body.values }); } } }