// Time Series API Implementation (InfluxDB-like operations) import { CBDAdapter } from '../cbd-adapter.js'; import { TimeSeriesQuery, TimeSeriesPoint, CNDError } from '../types.js'; export class TimeSeriesAPI implements TimeSeriesQuery { private rangeStart?: Date; private rangeEnd?: Date; private tagFilters: Record = {}; constructor( private metric: string, private adapter: CBDAdapter ) { } async insert( timestamp: Date, value: number, tags?: Record ): Promise { try { await this.adapter.storeTimeSeries(this.metric, timestamp, value, tags || {}); } catch (error) { throw new CNDError( `Time series insert failed: ${error}`, 'TIMESERIES_INSERT_ERROR', { metric: this.metric, timestamp, value, tags } ); } } range(start: Date, end: Date): TimeSeriesAPI { this.rangeStart = start; this.rangeEnd = end; return this; } where(tags: Record): TimeSeriesAPI { this.tagFilters = { ...this.tagFilters, ...tags }; return this; } async aggregate( func: 'avg' | 'sum' | 'min' | 'max' | 'count', interval?: string ): Promise { try { if (!this.rangeStart || !this.rangeEnd) { throw new CNDError( 'Time range is required for aggregation', 'TIMESERIES_RANGE_REQUIRED', { func, interval } ); } const points = await this.adapter.queryTimeSeries( this.metric, this.rangeStart, this.rangeEnd, this.tagFilters ); return this.performAggregation(points, func, interval); } catch (error) { throw new CNDError( `Time series aggregation failed: ${error}`, 'TIMESERIES_AGGREGATION_ERROR', { metric: this.metric, func, interval, range: [this.rangeStart, this.rangeEnd] } ); } } async latest(): Promise { try { const endTime = new Date(); const startTime = new Date(endTime.getTime() - 24 * 60 * 60 * 1000); // Last 24 hours const points = await this.adapter.queryTimeSeries( this.metric, startTime, endTime, this.tagFilters ); if (points.length === 0) return null; return points.reduce((latest, current) => current.timestamp > latest.timestamp ? current : latest ); } catch (error) { throw new CNDError( `Time series latest failed: ${error}`, 'TIMESERIES_LATEST_ERROR', { metric: this.metric, tags: this.tagFilters } ); } } // Additional time series operations async insertBatch(points: Array<{ timestamp: Date; value: number; tags?: Record; }>): Promise { try { for (const point of points) { await this.insert(point.timestamp, point.value, point.tags); } } catch (error) { throw new CNDError( `Time series batch insert failed: ${error}`, 'TIMESERIES_BATCH_INSERT_ERROR', { metric: this.metric, batchSize: points.length } ); } } async query(options?: { start?: Date; end?: Date; tags?: Record; limit?: number; orderBy?: 'asc' | 'desc'; }): Promise { try { const start = options?.start || this.rangeStart || new Date(Date.now() - 24 * 60 * 60 * 1000); const end = options?.end || this.rangeEnd || new Date(); const tags = { ...this.tagFilters, ...(options?.tags || {}) }; let points = await this.adapter.queryTimeSeries(this.metric, start, end, tags); // Apply ordering if (options?.orderBy === 'desc') { points.sort((a, b) => b.timestamp.getTime() - a.timestamp.getTime()); } else { points.sort((a, b) => a.timestamp.getTime() - b.timestamp.getTime()); } // Apply limit if (options?.limit) { points = points.slice(0, options.limit); } return points; } catch (error) { throw new CNDError( `Time series query failed: ${error}`, 'TIMESERIES_QUERY_ERROR', { metric: this.metric, options } ); } } async delete(start: Date, end: Date, tags?: Record): Promise { try { // In real implementation, this would delete points from CBD Engine console.log(`Time Series API: Deleting points from ${this.metric}`, { start, end, tags }); return 0; } catch (error) { throw new CNDError( `Time series delete failed: ${error}`, 'TIMESERIES_DELETE_ERROR', { metric: this.metric, start, end, tags } ); } } // Statistical operations async stats(options?: { start?: Date; end?: Date; tags?: Record; }): Promise<{ count: number; min: number; max: number; avg: number; sum: number; stdDev: number; }> { try { const points = await this.query(options); if (points.length === 0) { return { count: 0, min: 0, max: 0, avg: 0, sum: 0, stdDev: 0 }; } const values = points.map(p => p.value); const count = values.length; const sum = values.reduce((a, b) => a + b, 0); const avg = sum / count; const min = Math.min(...values); const max = Math.max(...values); const variance = values.reduce((acc, val) => acc + Math.pow(val - avg, 2), 0) / count; const stdDev = Math.sqrt(variance); return { count, min, max, avg, sum, stdDev }; } catch (error) { throw new CNDError( `Time series stats failed: ${error}`, 'TIMESERIES_STATS_ERROR', { metric: this.metric, options } ); } } async downsample( interval: string, aggregationFunc: 'avg' | 'sum' | 'min' | 'max' | 'count' = 'avg' ): Promise { try { if (!this.rangeStart || !this.rangeEnd) { throw new CNDError( 'Time range is required for downsampling', 'TIMESERIES_RANGE_REQUIRED', { interval, aggregationFunc } ); } return this.aggregate(aggregationFunc, interval); } catch (error) { throw new CNDError( `Time series downsampling failed: ${error}`, 'TIMESERIES_DOWNSAMPLE_ERROR', { metric: this.metric, interval, aggregationFunc } ); } } async interpolate( method: 'linear' | 'cubic' | 'nearest' = 'linear', interval: string = '1m' ): Promise { try { const points = await this.query(); // Simple linear interpolation implementation // In real implementation, this would use more sophisticated algorithms return this.performInterpolation(points, method, interval); } catch (error) { throw new CNDError( `Time series interpolation failed: ${error}`, 'TIMESERIES_INTERPOLATION_ERROR', { metric: this.metric, method, interval } ); } } // Forecasting and anomaly detection async forecast( periodsAhead: number, model: 'linear' | 'seasonal' | 'arima' = 'linear' ): Promise { try { // In real implementation, this would use forecasting algorithms in CBD Engine console.log(`Time Series API: Forecasting ${this.metric}`, { periodsAhead, model }); return []; } catch (error) { throw new CNDError( `Time series forecasting failed: ${error}`, 'TIMESERIES_FORECAST_ERROR', { metric: this.metric, periodsAhead, model } ); } } async detectAnomalies( algorithm: 'zscore' | 'iqr' | 'isolation_forest' = 'zscore', threshold: number = 2 ): Promise> { try { const points = await this.query(); // Simple z-score anomaly detection if (algorithm === 'zscore') { const stats = await this.stats(); return points .map(point => ({ point, anomalyScore: Math.abs((point.value - stats.avg) / stats.stdDev) })) .filter(result => result.anomalyScore > threshold); } return []; } catch (error) { throw new CNDError( `Time series anomaly detection failed: ${error}`, 'TIMESERIES_ANOMALY_DETECTION_ERROR', { metric: this.metric, algorithm, threshold } ); } } // Helper methods private performAggregation( points: TimeSeriesPoint[], func: 'avg' | 'sum' | 'min' | 'max' | 'count', interval?: string ): TimeSeriesPoint[] { if (!interval) { // Aggregate all points into one const value = this.calculateAggregateValue(points, func); return [{ timestamp: points[0]?.timestamp || new Date(), value, tags: {} }]; } // Group points by interval and aggregate each group const groups = this.groupByInterval(points, interval); return Object.entries(groups).map(([timestamp, groupPoints]) => ({ timestamp: new Date(timestamp), value: this.calculateAggregateValue(groupPoints, func), tags: {} })); } private calculateAggregateValue( points: TimeSeriesPoint[], func: 'avg' | 'sum' | 'min' | 'max' | 'count' ): number { if (points.length === 0) return 0; const values = points.map(p => p.value); switch (func) { case 'avg': return values.reduce((a, b) => a + b, 0) / values.length; case 'sum': return values.reduce((a, b) => a + b, 0); case 'min': return Math.min(...values); case 'max': return Math.max(...values); case 'count': return values.length; default: return 0; } } private groupByInterval( points: TimeSeriesPoint[], interval: string ): Record { const groups: Record = {}; const intervalMs = this.parseInterval(interval); for (const point of points) { const bucketTime = Math.floor(point.timestamp.getTime() / intervalMs) * intervalMs; const bucketKey = new Date(bucketTime).toISOString(); if (!groups[bucketKey]) { groups[bucketKey] = []; } groups[bucketKey].push(point); } return groups; } private parseInterval(interval: string): number { const match = interval.match(/^(\d+)([smhd])$/); if (!match) return 60000; // Default to 1 minute const value = parseInt(match[1]); const unit = match[2]; switch (unit) { case 's': return value * 1000; case 'm': return value * 60 * 1000; case 'h': return value * 60 * 60 * 1000; case 'd': return value * 24 * 60 * 60 * 1000; default: return 60000; } } private performInterpolation( points: TimeSeriesPoint[], method: 'linear' | 'cubic' | 'nearest', interval: string ): TimeSeriesPoint[] { // Simple linear interpolation implementation // In a real implementation, this would be more sophisticated console.log(`Time Series API: Interpolating ${this.metric}`, { pointCount: points.length, method, interval }); return points; } }