import { Athena, AWSError } from 'aws-sdk' import { ResultConfiguration } from 'aws-sdk/clients/athena' import { Readable } from 'stream' export interface AthenaRequestConfig { bucketUri: string baseRetryWait?: number retryWaitMax?: number retryCountMax?: number database?: string encryptionOption?: string encryptionKmsKey?: string workGroup?: string } interface EncryptionConfigurationParam { EncryptionOption: string KmsKey?: string } interface ResultConfigurationParam { OutputLocation: string EncryptionConfiguration?: EncryptionConfigurationParam } interface QueryExecutionContextParam { Database: string } interface AthenaRequestParams { QueryString: string ResultConfiguration: ResultConfigurationParam QueryExecutionContext: QueryExecutionContextParam WorkGroup: string } const defaultBaseRetryWait = 200 const defaultRetryWaitMax = 10000 const defaultRetryCountMax = 10 export class AthenaRequest { private athena: any private s3: any constructor(athena: any, s3: any) { this.athena = athena this.s3 = s3 } public startQuery(query: string, config: AthenaRequestConfig) { return new Promise((resolve, reject) => { let retryCount = 0 const params: AthenaRequestParams = { QueryString: query, ResultConfiguration: { OutputLocation: config.bucketUri, ...(config.encryptionOption && { EncryptionConfiguration: { EncryptionOption: config.encryptionOption, ...(config.encryptionKmsKey && { KmsKey: config.encryptionKmsKey, }), }, }), }, QueryExecutionContext: { Database: config.database || 'default', }, WorkGroup: config.workGroup || 'primary', } const loopFunc = () => { this.athena.startQueryExecution(params, (err: AWSError, data: any) => { if (err && isRetryException(err) && canRetry(retryCount, config)) { let wait = (config.baseRetryWait || defaultBaseRetryWait) * Math.pow(2, retryCount++) wait = Math.min(wait, config.retryWaitMax || defaultRetryWaitMax) return setTimeout(loopFunc, wait) } else if (err) { return reject(err) } return resolve(data.QueryExecutionId) }) } loopFunc() }) } public checkQuery(queryId: string, config: AthenaRequestConfig) { return new Promise((resolve, reject) => { this.getQueryExecution(queryId, config) .then((queryExecution: any) => { const state = queryExecution.Status.State let isSucceed: boolean = false let error: Error | null = null switch (state) { case 'QUEUED': case 'RUNNING': isSucceed = false break case 'SUCCEEDED': isSucceed = true break case 'FAILED': isSucceed = false const errMsg = queryExecution.Status.StateChangeReason || 'FAILED: Execution Error' error = new Error(errMsg) break case 'CANCELLED': isSucceed = false error = new Error('FAILED: Query CANCELLED') break default: isSucceed = false error = new Error(`FAILED: UnKnown State ${state}`) } if (error) { return reject(error) } return resolve(isSucceed) }) .catch((err: AWSError) => { return reject(err) }) }) } public stopQuery(queryId: string, config: AthenaRequestConfig) { return new Promise((resolve, reject) => { let retryCount = 0 const params = { QueryExecutionId: queryId, } const loopFunc = () => { this.athena.stopQueryExecution(params, (err: AWSError) => { if (err && isRetryException(err) && canRetry(retryCount, config)) { const wait = Math.pow( config.baseRetryWait || defaultBaseRetryWait, retryCount++, ) return setTimeout(loopFunc, wait) } else if (err) { return reject(err) } return resolve() }) } loopFunc() }) } public getQueryExecution(queryId: string, config: AthenaRequestConfig) { return new Promise((resolve, reject) => { let retryCount = 0 const params = { QueryExecutionId: queryId, } const loopFunc = () => { this.athena.getQueryExecution(params, (err: AWSError, data: any) => { if (err && isRetryException(err) && canRetry(retryCount, config)) { const wait = Math.pow( config.baseRetryWait || defaultBaseRetryWait, retryCount++, ) return setTimeout(loopFunc, wait) } else if (err) { return reject(err) } return resolve(data.QueryExecution) }) } loopFunc() }) } public getResultsStream(s3Uri: string): Readable { const arr = s3Uri.replace('s3://', '').split('/') const bucket = arr.shift() || '' const key = arr.join('/') return this.s3 .getObject({ Bucket: bucket, Key: key, }) .createReadStream() } } function isRetryException(err: AWSError) { return ( err.code === 'ThrottlingException' || err.code === 'TooManyRequestsException' || err.message === 'Query exhausted resources at this scale factor' ) } function canRetry(retryCount: number, config: AthenaRequestConfig) { return retryCount < (config.retryCountMax || defaultRetryCountMax) }