import { Config, Context, Data, Effect, Layer, Option, Schedule, Ref, HashMap, pipe } from 'effect'; /** * Configuration for API port and WebSocket connections */ export interface ConnectionConfig { /** * The port to use for the API server */ readonly apiPort: number; /** * The maximum number of connection retry attempts */ readonly maxRetryAttempts: number; /** * The initial delay (ms) between retry attempts */ readonly initialRetryDelayMs: number; /** * Maximum timeout (ms) for WebSocket operations */ readonly socketTimeoutMs: number; /** * Heartbeat interval (ms) for connection health checks */ readonly heartbeatIntervalMs: number; } /** * Default connection configuration values */ export const DefaultConnectionConfig: ConnectionConfig = { apiPort: 3000, maxRetryAttempts: 5, initialRetryDelayMs: 1000, socketTimeoutMs: 30000, heartbeatIntervalMs: 15000, }; /** * Config schema for connection settings */ export const ConnectionConfigSchema = Config.all({ apiPort: Config.withDefault(Config.number('apiPort'), DefaultConnectionConfig.apiPort), maxRetryAttempts: Config.withDefault( Config.number('maxRetryAttempts'), DefaultConnectionConfig.maxRetryAttempts ), initialRetryDelayMs: Config.withDefault( Config.number('initialRetryDelayMs'), DefaultConnectionConfig.initialRetryDelayMs ), socketTimeoutMs: Config.withDefault( Config.number('socketTimeoutMs'), DefaultConnectionConfig.socketTimeoutMs ), heartbeatIntervalMs: Config.withDefault( Config.number('heartbeatIntervalMs'), DefaultConnectionConfig.heartbeatIntervalMs ), }); /** * Environment configuration for connection settings */ export class ConnectionConfigTag extends Context.Tag('ConnectionConfig')< ConnectionConfigTag, ConnectionConfig >() {} export const ConnectionConfigLive = Layer.effect( ConnectionConfigTag, Effect.mapError( Config.unwrap(ConnectionConfigSchema), (error) => new Error(`Failed to load connection configuration: ${JSON.stringify(error)}`) ) ); /** * Error that occurs during connection operations */ export class ConnectionError extends Data.TaggedError('ConnectionError')<{ readonly message: string; readonly cause?: unknown; }> {} /** * Connection handle for websocket or similar connections */ export interface ConnectionHandle { readonly ping: () => Promise; readonly close: () => Promise; } /** * Status information about a connection */ export interface ConnectionStatus { readonly isConnected: boolean; readonly lastHeartbeat: Option.Option; readonly reconnectAttempts: number; readonly url: string; } /** * Context Tag for ConnectionManager */ export class ConnectionManager extends Context.Tag('ConnectionManager')< ConnectionManager, { /** * Get the API port for the server */ readonly getApiPort: () => Effect.Effect; /** * Connect to a WebSocket endpoint with robust retry handling */ readonly connectWithRetry: ( url: string, options?: Readonly> ) => Effect.Effect; /** * Get the status of a connection */ readonly getConnectionStatus: ( url: string ) => Effect.Effect; /** * Set up a heartbeat on a connection to keep it alive */ readonly setupHeartbeat: ( connection: Readonly, url: string ) => Effect.Effect<{ readonly fiber: unknown }, ConnectionError, never>; } >() {} /** * Type for connection status updates - properly readonly */ type ConnectionStatusUpdate = { readonly isConnected?: boolean; readonly lastHeartbeat?: Option.Option; readonly reconnectAttempts?: number; }; /** * Extract the service type from the ConnectionManager tag */ export type ConnectionManagerService = Context.Tag.Service; const getOrCreateDefaultStatus = (url: string) => (statusMap: HashMap.HashMap): ConnectionStatus => Option.getOrElse(HashMap.get(statusMap, url), () => ({ isConnected: false, lastHeartbeat: Option.none(), reconnectAttempts: 0, url, })); const extractStatusFromMap = (url: string) => (statusMap: HashMap.HashMap) => Option.getOrThrow(HashMap.get(statusMap, url)); const updateAndExtractStatus = (url: string, updates: ConnectionStatusUpdate) => (connectionStatuses: Ref.Ref>) => Effect.map( Ref.updateAndGet(connectionStatuses, (statusMap) => { const current = pipe(statusMap, getOrCreateDefaultStatus(url)); const newStatus: ConnectionStatus = { isConnected: updates.isConnected ?? current.isConnected, lastHeartbeat: updates.lastHeartbeat ?? current.lastHeartbeat, reconnectAttempts: updates.reconnectAttempts ?? current.reconnectAttempts, url: current.url, }; return HashMap.set(statusMap, url, newStatus); }), extractStatusFromMap(url) ); const mockConnect = ( _url: string, _options?: Readonly> ): Effect.Effect => Effect.succeed({ ping: () => Promise.resolve(true), close: () => Promise.resolve(void 0), } as const); const createRetryPolicy = (config: Readonly) => Schedule.compose( Schedule.exponential(config.initialRetryDelayMs, 2.0), Schedule.recurs(config.maxRetryAttempts) ); const connectAndUpdateStatus = ( url: string, options: Readonly> | undefined, retryPolicy: Schedule.Schedule ) => ( connectionStatuses: Ref.Ref> ): Effect.Effect => Effect.mapError( Effect.retry( Effect.tap(mockConnect(url, options), () => pipe( connectionStatuses, updateAndExtractStatus(url, { isConnected: true, lastHeartbeat: Option.some(new Date()), }) ) ), retryPolicy ), () => new ConnectionError({ message: `Failed to connect to ${url}` }) ); const resetAndConnect = ( url: string, options: Readonly> | undefined, retryPolicy: Schedule.Schedule ) => ( connectionStatuses: Ref.Ref> ): Effect.Effect => Effect.flatMap( pipe( connectionStatuses, updateAndExtractStatus(url, { isConnected: false, reconnectAttempts: 0, }) ), () => pipe(connectionStatuses, connectAndUpdateStatus(url, options, retryPolicy)) ); const findStatusInMap = (url: string) => (statusMap: HashMap.HashMap) => HashMap.get(statusMap, url); const matchStatusOption = (url: string) => (status: Readonly>) => Option.match(status, { onNone: () => Effect.fail(new ConnectionError({ message: `No connection status for ${url}` })), onSome: Effect.succeed, }); const getStatusFromRef = (url: string) => (connectionStatuses: Ref.Ref>) => Effect.flatMap( Effect.map(Ref.get(connectionStatuses), findStatusInMap(url)), matchStatusOption(url) ); const repeatHeartbeat = (config: Readonly, url: string) => (connectionStatuses: Ref.Ref>) => Effect.map( Effect.forkDaemon( Effect.repeat( pipe( connectionStatuses, updateAndExtractStatus(url, { lastHeartbeat: Option.some(new Date()), }) ), Schedule.spaced(config.heartbeatIntervalMs) ) ), (fiber) => ({ fiber }) ); const buildConnectionManager = (config: Readonly) => ( connectionStatuses: Ref.Ref> ): ConnectionManagerService => ({ getApiPort: () => Effect.succeed(config.apiPort), connectWithRetry: (url: string, options?: Readonly>) => pipe(connectionStatuses, resetAndConnect(url, options, createRetryPolicy(config))), getConnectionStatus: (url: string) => pipe(connectionStatuses, getStatusFromRef(url)), setupHeartbeat: (_connection: Readonly, url: string) => pipe(connectionStatuses, repeatHeartbeat(config, url)), }); /** * Creates a live implementation of the ConnectionManager */ export const makeConnectionManager = ( config: Readonly ): Effect.Effect => Effect.map(Ref.make(HashMap.empty()), buildConnectionManager(config)); /** * Live Layer implementation of the ConnectionManager */ export const ConnectionManagerLive = Layer.provide( Layer.succeed(ConnectionManager, { getApiPort: () => Effect.succeed(3000), connectWithRetry: (_url, _options?: Readonly>) => Effect.succeed({ ping: () => Promise.resolve(true), close: () => Promise.resolve(void 0), } as const), getConnectionStatus: (_url) => Effect.succeed({ isConnected: true, lastHeartbeat: Option.none(), reconnectAttempts: 0, url: _url, } as const), setupHeartbeat: (_connection: Readonly, _url) => Effect.succeed({ fiber: null } as const), }), Layer.succeed(ConnectionConfigTag, DefaultConnectionConfig) );