/** * CDP Edge — Camada D1 (Database) * Todas as operações de escrita/leitura no banco D1. * Bindings: env.DB, env.GEO_CACHE, env.AUDIT_LOGS */ import { sha256, normalizePhone, normalizeCity } from './utils.js'; import { Env, TrackPayload } from '../types.js'; import { D1Database } from '@cloudflare/workers-types'; // ── Tipos ─────────────────────────────────────────────────────────────────────── export interface GeoData { country: string | null; continent: string | null; asn: string | null; asOrg: string | null; colo: string | null; city: string | null; region: string | null; regionCode: string | null; postalCode: string | null; latitude: number | null; longitude: number | null; timezone: string | null; metroCode: string | null; } export interface LtvResult { value: number; class: string; score?: number; } export interface HealthMetrics { platform: string; hours: number; events_sent: number; events_failed: number; success_rate: number; errors_detected: Array<{ code: string; count: number }>; issues: string[]; } export interface DailyReport { platform: string; status: string; } // ── saveLead — inserir evento de conversão ──────────────────────────────────── export async function saveLead(env: Env, eventName: string, payload: TrackPayload, request: Request, platform: string = 'website'): Promise { if (!env.DB) return; try { const { email, phone, firstName, lastName, city, state, country, fbp, fbc, userId, utmSource, utmMedium, utmCampaign, utmContent, utmTerm, pageUrl, value, currency, eventId, event_id, botScore, engagementScore, intentionLevel, utmRestored, } = payload; await env.DB.prepare(` INSERT INTO leads ( event_name, event_id, email, phone, first_name, last_name, city, state, country, fbp, fbc, user_id, utm_source, utm_medium, utm_campaign, utm_content, utm_term, page_url, value, currency, ip_address, platform, bot_score, engagement_score, intention_level, utm_restored, created_at ) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,datetime('now')) `).bind( eventName, eventId || event_id || null, email || null, normalizePhone(phone) || null, firstName || null, lastName || null, city || null, state || null, (country || (request as any).cf?.country || null), fbp || null, fbc || null, userId || null, utmSource || null, utmMedium || null, utmCampaign || null, utmContent || null, utmTerm || null, pageUrl || null, value !== undefined ? parseFloat(String(value)) : null, currency || 'BRL', request.headers.get('CF-Connecting-IP') || null, platform, botScore || 0, engagementScore !== undefined ? parseFloat(String(engagementScore)) : null, intentionLevel || null, utmRestored ? 1 : 0, ).run(); } catch (err: any) { console.error('D1 saveLead error:', err?.message || String(err)); } } // ── calculateCohortLabel ────────────────────────────────────────────────────── export function calculateCohortLabel(score: number, eventName: string): string { if (eventName === 'Purchase') return 'buyer_lookalike'; if (score >= 80) return 'high_intent'; if (score >= 30) return 'nurture'; return 'lost'; } // ── upsertProfile — acumula cookies/scores entre visitas ───────────────────── export async function upsertProfile(env: Env, eventName: string, payload: TrackPayload, request: Request): Promise { if (!env.DB || !payload.userId) return; try { const { userId, email, phone, fbp, fbc, ttp, gclid, ttclid, gaClientId, wbraid, gbraid, msclkid, city, state, country, engagementScore, userScore, } = payload; const scoreMap: Record = { PageView: 5, ViewContent: 10, ScrollDepth: 3, TimeOnPage: 5, Lead: 30, InitiateCheckout: 50, Purchase: 100 }; const eventScore = scoreMap[eventName] || 2; const behaviorBonus = userScore ? Math.round((Math.min(userScore, 100) / 100) * 20) : (engagementScore ? Math.round((Math.min(engagementScore, 5) / 5) * 10) : 0); const totalDelta = eventScore + behaviorBonus; await env.DB.prepare(` INSERT INTO user_profiles (user_id, email, phone, fbp, fbc, ttp, gclid, ttclid, ga_client_id, wbraid, gbraid, msclkid, city, state, country, score, cohort_label, created_at, updated_at) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,datetime('now'),datetime('now')) ON CONFLICT(user_id) DO UPDATE SET email = COALESCE(excluded.email, user_profiles.email), phone = COALESCE(excluded.phone, user_profiles.phone), fbp = COALESCE(excluded.fbp, user_profiles.fbp), fbc = COALESCE(excluded.fbc, user_profiles.fbc), ttp = COALESCE(excluded.ttp, user_profiles.ttp), gclid = COALESCE(excluded.gclid, user_profiles.gclid), ttclid = COALESCE(excluded.ttclid, user_profiles.ttclid), ga_client_id = COALESCE(excluded.ga_client_id, user_profiles.ga_client_id), wbraid = COALESCE(excluded.wbraid, user_profiles.wbraid), gbraid = COALESCE(excluded.gbraid, user_profiles.gbraid), msclkid = COALESCE(excluded.msclkid, user_profiles.msclkid), city = COALESCE(excluded.city, user_profiles.city), state = COALESCE(excluded.state, user_profiles.state), country = COALESCE(excluded.country, user_profiles.country), score = user_profiles.score + excluded.score, cohort_label = excluded.cohort_label, updated_at = datetime('now') `).bind( userId, email || null, normalizePhone(phone) || null, fbp || null, fbc || null, ttp || null, gclid || null, ttclid || null, gaClientId || null, wbraid || null, gbraid || null, msclkid || null, city || null, state || null, (country || (request as any).cf?.country || null), totalDelta, calculateCohortLabel(totalDelta, eventName), ).run(); } catch (err: any) { console.error('D1 upsertProfile error:', err?.message || String(err)); } } // ── resolveDeviceGraph — Cross-Device Identity ──────────────────────────────── export async function resolveDeviceGraph(DB: D1Database, currentUserId: string, email?: string | null, phone?: string | null): Promise { if (!DB || !currentUserId) return; if (!email && !phone) return; try { const conditions: string[] = []; const bindings: (string | number)[] = []; if (email) { conditions.push('email = ?'); bindings.push(email.toLowerCase().trim()); } if (phone) { const digits = String(phone).replace(/\D/g, ''); if (digits.length >= 10) { conditions.push('phone LIKE ?'); bindings.push(`%${digits.slice(-10)}`); } } if (conditions.length === 0) return; bindings.push(currentUserId); const rows = await DB.prepare(` SELECT user_id, email, phone, created_at FROM user_profiles WHERE (${conditions.join(' OR ')}) AND user_id != ? ORDER BY created_at ASC LIMIT 5 `).bind(...bindings).all(); if (!rows.results || rows.results.length === 0) return; for (const match of rows.results) { const emailMatch = email && match.email && email.toLowerCase().trim() === (match.email as string).toLowerCase().trim(); const phoneMatch = phone && match.phone && (() => { const a = String(phone).replace(/\D/g, ''); const b = String(match.phone).replace(/\D/g, ''); return a.slice(-10) === b.slice(-10) && a.length >= 10; })(); if (!emailMatch && !phoneMatch) continue; const matchType = emailMatch && phoneMatch ? 'email+phone' : (emailMatch ? 'email' : 'phone'); const matchConfidence = emailMatch && phoneMatch ? 0.99 : (emailMatch ? 0.95 : 0.85); const primary = match.user_id as string; const secondary = currentUserId; await DB.prepare(` INSERT OR IGNORE INTO device_graph (primary_user_id, secondary_user_id, match_type, match_confidence) VALUES (?, ?, ?, ?) `).bind(primary, secondary, matchType, matchConfidence).run(); // sem log de user IDs — dados sensíveis não entram em Workers log } } catch (err: any) { console.error('resolveDeviceGraph error:', err?.message || String(err)); } } // ── fireAutomation — dispara regras de automação (WA/Email) ────────────────── export async function fireAutomation(env: Env, eventName: string, leadId: number | null, payload: TrackPayload): Promise { if (!env.DB) return; try { const { results: rules } = await env.DB .prepare( `SELECT id, channel, subject_template, message_template FROM automation_rules WHERE trigger_event = ?1 AND is_active = 1` ) .bind(eventName) .all(); if (!rules || rules.length === 0) return; const vars: Record = { name: String(payload.firstName || (payload as any).name || ''), email: String(payload.email || ''), phone: String(payload.phone || ''), campaign: String(payload.utmCampaign || payload.utm_campaign || ''), intention: String(payload.intentionLevel || payload.intention_level || ''), }; const interpolate = (tpl: string) => tpl.replace(/\{\{(\w+)\}\}/g, (_, k) => vars[k] ?? ''); for (const rule of rules) { const message = interpolate(rule.message_template as string); const subject = rule.subject_template ? interpolate(rule.subject_template as string) : null; try { if (rule.channel === 'whatsapp' && payload.phone && env.WHATSAPP_ACCESS_TOKEN && env.WHATSAPP_PHONE_NUMBER_ID) { const digits = String(payload.phone).replace(/\D/g, ''); const e164 = digits.startsWith('55') ? `+${digits}` : `+55${digits}`; const waRes = await fetch( `https://graph.facebook.com/v25.0/${env.WHATSAPP_PHONE_NUMBER_ID}/messages`, { method: 'POST', headers: { 'Authorization': `Bearer ${env.WHATSAPP_ACCESS_TOKEN}`, 'Content-Type': 'application/json' }, body: JSON.stringify({ messaging_product: 'whatsapp', recipient_type: 'individual', to: e164, type: 'text', text: { body: message } }), } ); const waData = await waRes.json(); const status = waRes.ok ? 'sent' : 'failed'; const meta = waRes.ok ? ((waData as any).messages?.[0]?.id ?? null) : JSON.stringify(waData); await env.DB.prepare( `INSERT INTO messaging_history (lead_id, channel, recipient, subject, content, status, meta) VALUES (?1,?2,?3,?4,?5,?6,?7)` ).bind(leadId, 'whatsapp', e164, null, message, status, meta).run(); } else if (rule.channel === 'email' && payload.email && env.RESEND_API_KEY) { const resendRes = await fetch('https://api.resend.com/emails', { method: 'POST', headers: { 'Authorization': `Bearer ${env.RESEND_API_KEY}`, 'Content-Type': 'application/json' }, body: JSON.stringify({ from: env.RESEND_FROM_EMAIL || 'noreply@cdp-edge.app', to: [payload.email], subject: subject || `Olá, ${vars.name || 'você'}!`, html: `

${message.replace(/\n/g, '
')}

`, }), }); const resendData = await resendRes.json(); const status = resendRes.ok ? 'sent' : 'failed'; const meta = resendRes.ok ? ((resendData as any).id ?? null) : JSON.stringify(resendData); await env.DB.prepare( `INSERT INTO messaging_history (lead_id, channel, recipient, subject, content, status, meta) VALUES (?1,?2,?3,?4,?5,?6,?7)` ).bind(leadId, 'email', payload.email, subject, message, status, meta).run(); } } catch (err: any) { console.error(`[Automation] rule ${(rule as any).id} error:`, err?.message || String(err)); } } } catch (err: any) { console.error('[Automation] fireAutomation error:', err?.message || String(err)); } } // ── getProfileByEmail ───────────────────────────────────────────────────────── export async function getProfileByEmail(env: Env, email: string): Promise { if (!env.DB || !email) return null; try { return await env.DB.prepare( 'SELECT * FROM user_profiles WHERE email = ? ORDER BY updated_at DESC LIMIT 1' ).bind(email.toLowerCase().trim()).first(); } catch { return null; } } // ── enrichGeoFromEdge — enriquece payload com dados de geolocalização ───────── export async function enrichGeoFromEdge(request: Request, env: Env, payload: TrackPayload): Promise { const cf = (request as any).cf || {}; const ip = request.headers.get('CF-Connecting-IP') || ''; let geoData: GeoData | null = null; if (env.GEO_CACHE && ip) { try { const cached = await env.GEO_CACHE.get(`geo:${ip}`, 'json') as GeoData | null; if (cached) geoData = cached; } catch (err: any) { console.error('[DB] Error fetching geo data from cache:', { ip, error: err?.message || String(err), stack: err?.stack, }); } } if (!geoData) { geoData = { country: cf.country || null, continent: cf.continent || null, asn: cf.asn || null, asOrg: cf.asOrganization || null, colo: cf.colo || null, city: cf.city || null, region: cf.region || null, regionCode: cf.regionCode || null, postalCode: cf.postalCode || null, latitude: cf.latitude || null, longitude: cf.longitude || null, timezone: cf.timezone || null, metroCode: cf.metroCode || null, }; if (env.GEO_CACHE && ip && geoData.country) { try { await env.GEO_CACHE.put(`geo:${ip}`, JSON.stringify(geoData), { expirationTtl: 3600 }); } catch (err: any) { console.error('[DB] Error caching geo data:', { ip, country: geoData.country, error: err?.message || String(err), stack: err?.stack, }); } } } payload.country = payload.country || geoData.country; payload.city = payload.city || geoData.city; payload.state = payload.state || geoData.regionCode; payload.zip = payload.zip || geoData.postalCode; (payload as any).geo = geoData; return geoData; } // ── writeAuditLog — grava evento no R2 ─────────────────────────────────────── export async function writeAuditLog(env: Env, eventName: string, payload: TrackPayload, geoData: GeoData | null): Promise { if (!env.AUDIT_LOGS) return; try { const now = new Date(); const y = now.getUTCFullYear(); const m = String(now.getUTCMonth() + 1).padStart(2, '0'); const d = String(now.getUTCDate()).padStart(2, '0'); const key = `logs/${y}/${m}/${d}/${now.getTime()}_${eventName}.json`; const log = { timestamp: now.toISOString(), event: eventName, userId: payload.userId || null, eventId: payload.eventId || null, value: payload.value || null, currency: payload.currency || null, ltvClass: payload.ltvClass || null, utm: { source: payload.utmSource || null, medium: payload.utmMedium || null, campaign: payload.utmCampaign || null, content: payload.utmContent || null, term: payload.utmTerm || null, restored: payload.utmRestored || false, }, geo: geoData || null, }; await env.AUDIT_LOGS.put(key, JSON.stringify(log), { httpMetadata: { contentType: 'application/json' }, }); } catch (err: any) { console.error('[R2 Audit] Error:', err?.message || String(err)); } } // ── generateEdgeFingerprint ─────────────────────────────────────────────────── export async function generateEdgeFingerprint(request: Request): Promise { const asn = String((request as any).cf?.asn || '0'); const lang = (request.headers.get('Accept-Language') || 'unknown').split(',')[0].trim(); const ua = request.headers.get('User-Agent') || ''; const uaBase = ua .toLowerCase() .replace(/[\d.]+/g, '') .replace(/[^a-z\s]/g, ' ') .split(' ') .filter(w => w.length > 3) .slice(0, 4) .join(' ') .trim(); const raw = `${asn}|${lang}|${uaBase}`; return sha256(raw); } // ── saveEdgeFingerprint ─────────────────────────────────────────────────────── export async function saveEdgeFingerprint(DB: D1Database, fingerprint: string | undefined, userId: string | undefined, payload: TrackPayload): Promise { if (!DB || !fingerprint) return; const { utmSource, utmMedium, utmCampaign, utmContent, utmTerm } = payload; if (!utmSource) return; try { await DB.prepare(` INSERT INTO edge_fingerprints (fingerprint, user_id, utm_source, utm_medium, utm_campaign, utm_content, utm_term) VALUES (?, ?, ?, ?, ?, ?, ?) `).bind( fingerprint, userId || null, utmSource || null, utmMedium || null, utmCampaign || null, utmContent || null, utmTerm || null, ).run(); } catch (err: any) { console.error('saveEdgeFingerprint error:', err?.message || String(err)); } } // ── resurrectUTM ────────────────────────────────────────────────────────────── export async function resurrectUTM(DB: D1Database, fingerprint: string | undefined): Promise { if (!DB || !fingerprint) return null; try { return await DB.prepare(` SELECT utm_source, utm_medium, utm_campaign, utm_content, utm_term FROM edge_fingerprints WHERE fingerprint = ? AND utm_source IS NOT NULL AND created_at > datetime('now', '-48 hours') ORDER BY created_at DESC LIMIT 1 `).bind(fingerprint).first(); } catch { return null; } } // ── upsertLtvProfile — persiste LTV no perfil ──────────────────────────────── export async function upsertLtvProfile(env: Env, userId: string, ltv: LtvResult): Promise { if (!env.DB || !userId) return; try { await env.DB.prepare(` UPDATE user_profiles SET predicted_ltv_class = ?, predicted_ltv_value = ?, updated_at = datetime('now') WHERE user_id = ? `).bind(ltv.class, ltv.value, userId).run(); } catch (err: any) { console.error('upsertLtvProfile error:', err?.message || String(err)); } } // ── recordLtvFeedback — fecha o ciclo preditivo com valor real de compra ───── // Chamado em background quando um Purchase chega com payload.value > 0. // Atualiza user_profiles + ltv_ab_assignments + ltv_ab_variations em cascata. export async function recordLtvFeedback(env: Env, userId: string, realValue: number): Promise { if (!env.DB || !userId || !realValue || realValue <= 0) return; try { // 1. Busca predicted_ltv_value atual do perfil const profile = await env.DB.prepare(` SELECT predicted_ltv_value FROM user_profiles WHERE user_id = ? `).bind(userId).first(); // accuracy = 1 - |pred-real|/real (0–1, mesmo padrão do A/B test accuracy_score) const predictedValue = profile?.predicted_ltv_value; const ltv_accuracy = (predictedValue !== null && predictedValue !== undefined) ? Math.max(0, Math.round((1 - Math.abs(Number(predictedValue) - realValue) / Math.max(realValue, 1)) * 100) / 100) : null; // 2. Grava valor real + accuracy no perfil await env.DB.prepare(` UPDATE user_profiles SET real_ltv_value = ?, ltv_accuracy = ?, ltv_feedback_at = datetime('now'), updated_at = datetime('now') WHERE user_id = ? `).bind(realValue, ltv_accuracy, userId).run(); // 3. Fecha assignment do A/B test mais recente não convertido (janela 60 dias) const assignment = await env.DB.prepare(` SELECT id, variation_id, predicted_ltv FROM ltv_ab_assignments WHERE user_id = ? AND converted = 0 AND assigned_at > datetime('now', '-60 days') ORDER BY assigned_at DESC LIMIT 1 `).bind(userId).first(); if (!assignment) return; // 3a. Marca assignment como convertido await env.DB.prepare(` UPDATE ltv_ab_assignments SET converted = 1, real_revenue = ?, converted_at = datetime('now') WHERE id = ? `).bind(realValue, (assignment as any).id).run(); // 3b. Atualiza métricas acumuladas da variação (running average — safe para concorrência D1) const predLtv = (assignment as any).predicted_ltv || 0; const indivAcc = Math.max(0, 1 - Math.abs(predLtv - realValue) / Math.max(realValue, 1)); await env.DB.prepare(` UPDATE ltv_ab_variations SET total_purchases = total_purchases + 1, sum_real_revenue = sum_real_revenue + ?, avg_real_revenue = (sum_real_revenue + ?) / (total_purchases + 1), accuracy_score = ROUND( (COALESCE(accuracy_score, 0) * total_purchases + ?) / (total_purchases + 1), 4 ) WHERE id = ? `).bind(realValue, realValue, indivAcc, (assignment as any).variation_id).run(); } catch (err: any) { console.error('[LTV-Feedback] recordLtvFeedback error:', err?.message || String(err)); } } // ── Feedback Loop — Log de falhas e métricas de saúde ──────────────────────── export async function logApiFailure(DB: D1Database, platform: string, eventName: string, errorCode: string | number, errorMessage: string, eventId: string, rawPayload: string): Promise { try { await DB.prepare(` INSERT INTO api_failures (platform, event_name, error_code, error_message, event_id, raw_payload, retry_count, final_status) VALUES (?, ?, ?, ?, ?, ?, 0, 'failed') `).bind(platform, eventName, String(errorCode), errorMessage, eventId, rawPayload).run(); } catch (err: any) { console.error('Failed to log API failure:', err?.message || String(err)); } } export async function getHealthMetrics(DB: D1Database, platform: string, hours: number = 24): Promise { try { const failures = await DB.prepare(` SELECT COUNT(*) as count, error_code FROM api_failures WHERE platform = ? AND created_at > datetime('now', '-${hours} hours') GROUP BY error_code `).bind(platform).all(); const totalSent = await DB.prepare(` SELECT COUNT(*) as count FROM leads WHERE platform = ? AND created_at > datetime('now', '-${hours} hours') `).bind(platform).first(); const totalFailed = failures.results?.reduce((sum: number, f: any) => sum + f.count, 0) || 0; const successRate = (totalSent as any)?.count > 0 ? (((totalSent as any).count - totalFailed) / (totalSent as any).count) * 100 : 100; return { platform, hours, events_sent: (totalSent as any)?.count || 0, events_failed: totalFailed, success_rate: successRate, errors_detected: (failures.results || []).map((f: any) => ({ code: f.error_code, count: f.count })), issues: totalFailed > ((totalSent as any)?.count || 0) * 0.1 ? ['high_error_rate'] : [], }; } catch (err: any) { console.error('Failed to get health metrics:', err?.message || String(err)); return { platform, hours, events_sent: 0, events_failed: 0, success_rate: 0, errors_detected: [], issues: ['metrics_unavailable'] }; } } export async function generateDailyReport(DB: D1Database): Promise { const platforms = ['meta', 'ga4', 'tiktok', 'pinterest', 'reddit']; const today = new Date().toISOString().split('T')[0]; const reports: DailyReport[] = []; for (const platform of platforms) { const metrics = await getHealthMetrics(DB, platform, 24); try { await DB.prepare(` INSERT INTO health_reports (report_date, platform, events_sent, events_failed, success_rate, errors_detected, issues_detected) VALUES (?, ?, ?, ?, ?, ?, ?) `).bind( today, platform, metrics.events_sent, metrics.events_failed, metrics.success_rate, JSON.stringify(metrics.errors_detected), JSON.stringify(metrics.issues) ).run(); reports.push({ platform, status: 'ok' }); } catch (err: any) { console.error(`Failed to generate report for ${platform}:`, err?.message || String(err)); reports.push({ platform, status: 'failed' }); } } return reports; } export async function logIntelligence(DB: D1Database, runType: string, platform: string, checkType: string, status: string, currentValue: any, expectedValue: any, message: string, alertSent: boolean = false): Promise { if (!DB) return; try { await DB.prepare(` INSERT INTO intelligence_logs (run_type, platform, check_type, status, current_value, expected_value, message, alert_sent) VALUES (?, ?, ?, ?, ?, ?, ?, ?) `).bind(runType, platform, checkType, status, String(currentValue ?? ''), String(expectedValue ?? ''), message, alertSent ? 1 : 0).run(); } catch (err: any) { console.error('logIntelligence error:', err?.message || String(err)); } } // ── Webhook Processing Helper ───────────────────────────────────────────────── /** * Processa webhook de e-commerce (Hotmart, Kiwify, Ticto, etc.) * - Verifica duplicatas * - Registra evento em webhook_events * - Retorna true se já foi processado, false se deve continuar */ export async function processWebhookDuplicateCheck( env: Env, platform: string, transactionId: string, rawPayload: string, context?: { email?: string; orderId?: string } ): Promise<{ duplicate: boolean; error?: string }> { if (!env.DB) { return { duplicate: false }; } try { // 1. Verifica duplicatas const dup = await env.DB.prepare( 'SELECT id FROM webhook_events WHERE transaction_id = ? AND platform = ? AND status = ?' ).bind(transactionId, platform, 'processed').first(); if (dup) { console.log(`[Webhook] Duplicate event skipped: ${platform}/${transactionId}`); return { duplicate: true }; } // 2. Registra o evento await env.DB.prepare( 'INSERT OR IGNORE INTO webhook_events (platform, transaction_id, email, status, raw_payload) VALUES (?,?,?,?,?)' ).bind(platform, transactionId, context?.email || null, 'processed', rawPayload).run(); return { duplicate: false }; } catch (err: any) { console.error(`[Webhook] Error processing ${platform}/${transactionId}:`, { platform, transactionId, email: context?.email, error: err?.message || String(err), stack: err?.stack, }); return { duplicate: false, error: err?.message || String(err) }; } }