import { create } from 'zustand'; import { toast } from 'sonner'; import React, { useEffect, useState } from 'react'; // --- Interfaces --- interface JobProgress { completed?: number; total?: number; status: 'pending' | 'running' | 'completed' | 'error' | 'timeout'; error?: any; } interface PromiseCallbacks { resolve: (value: boolean | PromiseLike) => void; reject: (reason?: any) => void; toastId?: string | number; toastTitle?: string; // Store title to recreate toast if needed } interface JobConnection { eventSource: EventSource; status: 'connecting' | 'open' | 'closed'; subscribers: Set; timeoutId?: NodeJS.Timeout; } interface JobStoreState { connections: Map; progress: Map; getJobProgress: (url: string) => JobProgress | undefined; subscribeToJob: (url: string, toastTitle?: string) => Promise; _closeConnection: (url: string, reason: 'completed' | 'error' | 'timeout', errorDetails?: any) => void; unsubscribeFromJob: (url: string, callbacks: PromiseCallbacks) => void; // Needed if a component unmounts } // --- Internal Component for Toast Content --- const JobProgressBarInternal = ({ url }: { url: string }) => { const jobProgress = useJobStore(state => state.progress.get(url)); const [progress, setProgress] = useState(0); useEffect(() => { if (jobProgress?.completed && jobProgress?.total) { setProgress((jobProgress.completed / jobProgress.total) * 100); } else if (jobProgress?.status === 'completed') { setProgress(100); } else if (jobProgress?.status === 'pending' || jobProgress?.status === 'running') { setProgress(jobProgress?.completed && jobProgress?.total ? (jobProgress.completed / jobProgress.total) * 100 : 0); } else { setProgress(0); // Error or timeout state } }, [jobProgress]); return (
); } const ToastContent = ({ url, title }: { url: string, title: string }) => (
{title}
); // --- Store Implementation --- export const useJobStore = create((set, get) => ({ connections: new Map(), progress: new Map(), getJobProgress: (url) => get().progress.get(url), subscribeToJob: (url, toastTitle) => { return new Promise((resolve, reject) => { const callbacks: PromiseCallbacks = { resolve, reject, toastTitle }; const connections = get().connections; let connection = connections.get(url); // --- Reuse Existing Connection --- if (connection && connection.status !== 'closed') { if (toastTitle && !connection.subscribers.has(callbacks)) { // Only add toast if new subscriber wants one callbacks.toastId = toast.loading(, { duration: Infinity, classNames: { "content": "onvo-w-full", "title": "onvo-w-full" } }); } connection.subscribers.add(callbacks); // Check immediate resolution/rejection const currentProgress = get().progress.get(url); if (currentProgress?.status === 'completed') { if (callbacks.toastId) toast.dismiss(callbacks.toastId); resolve(true); } else if (currentProgress?.status === 'error' || currentProgress?.status === 'timeout') { if (callbacks.toastId) toast.dismiss(callbacks.toastId); reject(currentProgress.error || new Error('Job failed before new subscription')); } // Otherwise, wait for updates via the shared connection set({ connections: new Map(connections) }); // Update state to reflect new subscriber count indirectly return; } // --- Create New Connection --- if (toastTitle) { callbacks.toastId = toast.loading(, { duration: Infinity, classNames: { "content": "onvo-w-full", "title": "onvo-w-full" } }); } const eventSource = new EventSource(url); const newConnection: JobConnection = { eventSource, status: 'connecting', subscribers: new Set([callbacks]), timeoutId: undefined }; connections.set(url, newConnection); set(state => ({ connections: new Map(connections), progress: new Map(state.progress).set(url, { status: 'pending' }) })); // --- Event Handlers --- eventSource.onopen = () => { const conn = get().connections.get(url); if (conn && conn.status === 'connecting') { conn.status = 'open'; // Set timeout only after connection is confirmed open conn.timeoutId = setTimeout(() => { get()._closeConnection(url, 'timeout', new Error("Job timed out after 3 minutes")); }, 180000); // 3 minutes set(state => ({ connections: new Map(state.connections), // Update map reference progress: new Map(state.progress).set(url, { status: 'running' }) })); } }; eventSource.onmessage = (event) => { const conn = get().connections.get(url); if (!conn || conn.status === 'closed') return; // Ignore late messages try { const data = JSON.parse(event.data); const status = data.status === 'completed' ? 'completed' : data.status === "error" ? "error" : 'running'; set(state => ({ progress: new Map(state.progress).set(url, { status: status, completed: data.completed, total: data.total, error: data.error // Include potential error details from message }) })); if (status === "completed") { get()._closeConnection(url, 'completed'); } if (status === "error") { get()._closeConnection(url, 'error', data.error); } } catch (e) { get()._closeConnection(url, 'error', e); } }; eventSource.onerror = (error) => { const conn = get().connections.get(url); // onerror often fires on close. Only treat as error if connection wasn't already closing/closed cleanly. if (conn && conn.status !== 'closed') { get()._closeConnection(url, 'error', error); } }; }); }, _closeConnection: (url, reason, errorDetails) => { const connections = get().connections; const connection = connections.get(url); if (!connection || connection.status === 'closed') return; // Already handled connection.status = 'closed'; if (connection.timeoutId) { clearTimeout(connection.timeoutId); connection.timeoutId = undefined; } try { connection.eventSource.close(); } catch (e) { } // Update final progress state const finalProgress: JobProgress = { ...(get().progress.get(url) || {}), // Get existing progress if any status: reason, error: errorDetails }; set(state => ({ connections: new Map(state.connections), // Update map reference progress: new Map(state.progress).set(url, finalProgress) })); // Notify all subscribers connection.subscribers.forEach(sub => { if (sub.toastId) { if (reason === 'completed') { toast.success(, { id: sub.toastId }); // Optionally dismiss success toast after short delay setTimeout(() => toast.dismiss(sub.toastId), 3000); } else if (reason === 'error') { toast.error(errorDetails, { id: sub.toastId, duration: 5000 }); } else { toast.error(errorDetails?.message || `Job ${reason}`, { id: sub.toastId, duration: 5000 }); } } // Resolve or reject the promise if (reason === 'completed') sub.resolve(true); else sub.reject(errorDetails || new Error(`Job ${reason}`)); }); connection.subscribers.clear(); // Clear subscribers after notification // Optional: remove connection from map after closing? Depends if progress state should persist // connections.delete(url); // set({ connections: new Map(connections) }); }, unsubscribeFromJob: (url, callbacks) => { const connections = get().connections; const connection = connections.get(url); if (connection && connection.subscribers.has(callbacks)) { if (callbacks.toastId) toast.dismiss(callbacks.toastId); // Dismiss specific toast connection.subscribers.delete(callbacks); // Optional: Close connection if last subscriber leaves and it's not finished? if (connection.subscribers.size === 0 && connection.status !== 'closed' && connection.status !== 'connecting') { const currentStatus = get().progress.get(url)?.status; if (currentStatus !== 'completed' && currentStatus !== 'error' && currentStatus !== 'timeout') { get()._closeConnection(url, 'error', new Error('Job cancelled: last subscriber left')); // Treat as error/cancel } } set({ connections: new Map(connections) }); // Update map reference } } })); // --- Hook for Components --- export const useJobProgress = (url: string): JobProgress | undefined => { // Ensure component re-renders when the specific job progress changes return useJobStore(state => state.progress.get(url)); };