{"version":3,"file":"intelligence.cjs","names":["AgentRunner","Socket","Observable","AG_UI_CHANNEL_EVENT","EventType","EMPTY"],"sources":["../../../../src/v2/runtime/runner/intelligence.ts"],"sourcesContent":["import {\n  AgentRunner,\n  AgentRunnerConnectRequest,\n  AgentRunnerIsRunningRequest,\n  AgentRunnerRunRequest,\n  type AgentRunnerStopRequest,\n} from \"./agent-runner\";\nimport { EMPTY, Observable, from } from \"rxjs\";\nimport { catchError, finalize } from \"rxjs/operators\";\nimport {\n  AbstractAgent,\n  BaseEvent,\n  EventType,\n  RunStartedEvent,\n} from \"@ag-ui/client\";\nimport {\n  finalizeRunEvents,\n  AG_UI_CHANNEL_EVENT,\n  phoenixExponentialBackoff,\n} from \"@copilotkit/shared\";\nimport { Socket, Channel } from \"phoenix\";\nimport { randomUUID } from \"node:crypto\";\n\nexport interface IntelligenceAgentRunnerOptions {\n  /** Phoenix runner websocket URL, e.g. \"ws://localhost:4000/runner\" */\n  url: string;\n  /** Optional Phoenix socket auth token used during websocket connect. */\n  authToken?: string;\n  /** Max delay (ms) for WebSocket reconnect backoff. @default 10_000 */\n  maxReconnectMs?: number;\n  /** Max delay (ms) for channel rejoin backoff. @default 30_000 */\n  maxRejoinMs?: number;\n}\n\ninterface ThreadState {\n  socket: Socket;\n  channel: Channel;\n  isRunning: boolean;\n  stopRequested: boolean;\n  agent: AbstractAgent | null;\n  currentEvents: BaseEvent[];\n  nextEventSeq: number;\n  hasRunStarted: boolean;\n}\n\nexport class IntelligenceAgentRunner extends AgentRunner {\n  private options: IntelligenceAgentRunnerOptions;\n  private threads = new Map<string, ThreadState>();\n\n  constructor(options: IntelligenceAgentRunnerOptions) {\n    super();\n    // Store config — sockets are created per-run, not eagerly.\n    this.options = options;\n  }\n\n  /**\n   * Create a new Phoenix socket with explicit exponential backoff.\n   *\n   * Each run/connect gets its own socket so that:\n   *  - A socket failure only affects a single thread, not all threads.\n   *  - Cleanup is simple: channel.leave() + socket.disconnect() tears\n   *    down everything for that run with no shared-state concerns.\n   *  - Each run gets its own independent retry budget.\n   *\n   * reconnectAfterMs — delay before Phoenix reconnects the WebSocket\n   *   after an unclean close. 100ms base, doubling up to maxReconnectMs (default 10s).\n   *\n   * rejoinAfterMs — delay before Phoenix re-joins a channel that\n   *   entered the \"errored\" state. 1s base, doubling up to maxRejoinMs (default 30s).\n   *\n   * These are set explicitly because Phoenix's default schedule is a\n   * fixed stepped array (not exponential), and any code that calls\n   * socket.disconnect() in an onError handler will set\n   * closeWasClean = true and reset the reconnect timer — permanently\n   * killing retries.\n   */\n  private createSocket(): Socket {\n    const socket = new Socket(this.options.url, {\n      ...(this.options.authToken ? { authToken: this.options.authToken } : {}),\n      reconnectAfterMs: phoenixExponentialBackoff(\n        100,\n        this.options.maxReconnectMs ?? 10_000,\n      ),\n      rejoinAfterMs: phoenixExponentialBackoff(\n        1_000,\n        this.options.maxRejoinMs ?? 30_000,\n      ),\n    });\n    socket.connect();\n    return socket;\n  }\n\n  private createRunnerEventPayload(\n    event: BaseEvent,\n    request: AgentRunnerRunRequest,\n    state: ThreadState,\n  ): Record<string, unknown> {\n    const canonicalEvent = this.stampRunnerMetadata(event, state);\n    const payload = {\n      ...(canonicalEvent as Record<string, unknown>),\n    };\n\n    payload.thread_id ??= request.threadId;\n\n    const runId = payload.runId ?? payload.run_id ?? request.input.runId;\n\n    if (runId) {\n      payload.run_id = runId;\n    }\n\n    return payload;\n  }\n\n  private stampRunnerMetadata(event: BaseEvent, state: ThreadState): BaseEvent {\n    const eventRecord = event as BaseEvent & {\n      metadata?: Record<string, unknown>;\n    };\n\n    const existingMetadata = eventRecord.metadata ?? {};\n    const hasEventId = typeof existingMetadata.cpki_event_id === \"string\";\n    const hasEventSeq = typeof existingMetadata.cpki_event_seq === \"number\";\n\n    if (hasEventId && hasEventSeq) {\n      const eventSeq = existingMetadata.cpki_event_seq as number;\n      state.nextEventSeq = Math.max(state.nextEventSeq, eventSeq + 1);\n      return eventRecord;\n    }\n\n    const eventSeq = state.nextEventSeq++;\n\n    return {\n      ...eventRecord,\n      metadata: {\n        ...existingMetadata,\n        cpki_event_id:\n          typeof existingMetadata.cpki_event_id === \"string\"\n            ? existingMetadata.cpki_event_id\n            : randomUUID(),\n        cpki_event_seq: eventSeq,\n      },\n    };\n  }\n\n  run(request: AgentRunnerRunRequest): Observable<BaseEvent> {\n    const { threadId, agent, input, joinCode } = request;\n\n    const existing = this.threads.get(threadId);\n    if (existing?.isRunning) {\n      throw new Error(\"Thread already running\");\n    }\n\n    return new Observable((observer) => {\n      const socket = this.createSocket();\n\n      const channelTopic = joinCode ?? threadId;\n      const channel = socket.channel(`ingestion:${channelTopic}`, {\n        runId: input.runId,\n      });\n\n      const state: ThreadState = {\n        socket,\n        channel,\n        isRunning: true,\n        stopRequested: false,\n        agent,\n        currentEvents: [],\n        nextEventSeq: 1,\n        hasRunStarted: false,\n      };\n      this.threads.set(threadId, state);\n\n      // Track consecutive socket errors for this run. Phoenix retries\n      // automatically via reconnectAfterMs, but if the connection fails\n      // repeatedly we abort the agent — otherwise runAgent() completes\n      // normally, finalization events buffer silently on the dead\n      // channel, and the client never receives them.\n      //\n      // Aborting the agent is the single trigger that cascades through\n      // the existing error pipeline: runAgent() rejects → catchError\n      // pushes RUN_ERROR → finalize calls finalizeRunEvents +\n      // removeThread → channel.leave() + socket.disconnect().\n      const MAX_CONSECUTIVE_ERRORS = 5;\n      let consecutiveErrors = 0;\n\n      socket.onError(() => {\n        consecutiveErrors++;\n        if (consecutiveErrors >= MAX_CONSECUTIVE_ERRORS && state.agent) {\n          try {\n            state.agent.abortRun();\n          } catch {\n            // Ignore abort errors.\n          }\n        }\n        // Otherwise: Phoenix retries automatically using the exponential\n        // backoff schedule configured in createSocket().\n      });\n\n      socket.onOpen(() => {\n        // A successful (re)connection resets the counter so transient\n        // network blips don't accumulate across recoveries.\n        consecutiveErrors = 0;\n      });\n\n      // Listen for custom \"stop\" events pushed by the client over the\n      // channel. This must be registered before channel.join() so the\n      // handler is in place by the time the server starts relaying messages.\n      // The client sends the stop event before leaving the channel, so the\n      // runner is guaranteed to receive it while still joined.\n      channel.on(AG_UI_CHANNEL_EVENT, (payload: BaseEvent) => {\n        if (\n          payload.type === EventType.CUSTOM &&\n          (payload as BaseEvent & { name?: string }).name === \"stop\"\n        ) {\n          this.stop({ threadId });\n        }\n      });\n\n      channel\n        .join()\n        .receive(\"ok\", () => {\n          this.executeAgentRun(request, state, threadId).subscribe({\n            complete: () => observer.complete(),\n          });\n        })\n        .receive(\"error\", (resp) => {\n          const errorEvent = {\n            type: EventType.RUN_ERROR,\n            message: `Failed to join channel: ${JSON.stringify(resp)}`,\n            code: \"CHANNEL_JOIN_ERROR\",\n          } as BaseEvent;\n          observer.next(errorEvent);\n          state.currentEvents.push(errorEvent);\n          this.removeThread(threadId);\n          observer.complete();\n        })\n        .receive(\"timeout\", () => {\n          const errorEvent = {\n            type: EventType.RUN_ERROR,\n            message: \"Timed out joining channel\",\n            code: \"CHANNEL_JOIN_TIMEOUT\",\n          } as BaseEvent;\n          observer.next(errorEvent);\n          state.currentEvents.push(errorEvent);\n          this.removeThread(threadId);\n          observer.complete();\n        });\n\n      return () => {\n        this.removeThread(threadId);\n      };\n    });\n  }\n\n  connect(request: AgentRunnerConnectRequest): Observable<BaseEvent> {\n    const { threadId } = request;\n\n    return new Observable((observer) => {\n      const socket = this.createSocket();\n\n      const channel = socket.channel(`thread:${threadId}`);\n\n      channel.on(\"ag_ui_event\", (payload: BaseEvent) => {\n        observer.next(payload);\n\n        if (\n          payload.type === EventType.RUN_FINISHED ||\n          payload.type === EventType.RUN_ERROR\n        ) {\n          observer.complete();\n        }\n      });\n\n      const cleanup = () => {\n        channel.leave();\n        socket.disconnect();\n      };\n\n      channel\n        .join()\n        .receive(\"ok\", () => undefined)\n        .receive(\"error\", (resp) => {\n          observer.error(\n            new Error(`Failed to join channel: ${JSON.stringify(resp)}`),\n          );\n          cleanup();\n        })\n        .receive(\"timeout\", () => {\n          observer.error(new Error(\"Timed out joining channel\"));\n          cleanup();\n        });\n\n      return () => {\n        cleanup();\n      };\n    });\n  }\n\n  isRunning(request: AgentRunnerIsRunningRequest): Promise<boolean> {\n    const state = this.threads.get(request.threadId);\n    return Promise.resolve(state?.isRunning ?? false);\n  }\n\n  stop(request: AgentRunnerStopRequest): Promise<boolean | undefined> {\n    const state = this.threads.get(request.threadId);\n    if (!state || !state.isRunning || state.stopRequested) {\n      return Promise.resolve(false);\n    }\n\n    state.stopRequested = true;\n\n    // Direct local abort — the runtime is the authority.\n    if (state.agent) {\n      try {\n        state.agent.abortRun();\n      } catch {\n        // Ignore abort errors.\n      }\n    }\n\n    return Promise.resolve(true);\n  }\n\n  private executeAgentRun(\n    request: AgentRunnerRunRequest,\n    state: ThreadState,\n    threadId: string,\n  ): Observable<void> {\n    const { currentEvents, channel } = state;\n    const pushCanonicalEvent = (event: BaseEvent): void => {\n      const canonicalEvent = this.stampRunnerMetadata(event, state);\n      currentEvents.push(canonicalEvent);\n\n      if (canonicalEvent.type === EventType.RUN_STARTED) {\n        state.hasRunStarted = true;\n      }\n\n      channel.push(\n        \"event\",\n        this.createRunnerEventPayload(canonicalEvent, request, state),\n      );\n    };\n\n    const getPersistedInputMessages = () =>\n      request.persistedInputMessages ?? request.input.messages;\n\n    const buildRunStartedEvent = (\n      source?: RunStartedEvent,\n    ): RunStartedEvent => {\n      const baseInput = source?.input ?? request.input;\n      const persistedInputMessages = getPersistedInputMessages();\n\n      return {\n        ...(source ?? {\n          type: EventType.RUN_STARTED,\n          threadId: request.threadId,\n          runId: request.input.runId,\n        }),\n        input: {\n          ...baseInput,\n          ...(persistedInputMessages !== undefined\n            ? { messages: persistedInputMessages }\n            : {}),\n        },\n      } as RunStartedEvent;\n    };\n\n    const ensureRunStarted = (): void => {\n      if (!state.hasRunStarted) {\n        state.hasRunStarted = true;\n        pushCanonicalEvent(buildRunStartedEvent());\n      }\n    };\n\n    return from(\n      request.agent.runAgent(request.input, {\n        onEvent: ({ event }: { event: BaseEvent }) => {\n          if (event.type === EventType.RUN_STARTED) {\n            pushCanonicalEvent(buildRunStartedEvent(event as RunStartedEvent));\n            return;\n          }\n\n          ensureRunStarted();\n          pushCanonicalEvent(event);\n        },\n      }),\n    ).pipe(\n      catchError((error) => {\n        ensureRunStarted();\n        const errorEvent = {\n          type: EventType.RUN_ERROR,\n          message: error instanceof Error ? error.message : String(error),\n        } as BaseEvent;\n        pushCanonicalEvent(errorEvent);\n        return EMPTY;\n      }),\n      finalize(() => {\n        ensureRunStarted();\n        const appended = finalizeRunEvents(currentEvents, {\n          stopRequested: state.stopRequested,\n        });\n        for (const event of appended) {\n          channel.push(\n            \"event\",\n            this.createRunnerEventPayload(event, request, state),\n          );\n        }\n        this.removeThread(threadId);\n      }),\n    );\n  }\n\n  /**\n   * Tear down all resources for a thread: leave the channel,\n   * disconnect the per-run socket, and remove the thread state.\n   *\n   * Idempotent — safe to call multiple times for the same threadId\n   * (e.g. from join error handlers, finalize, and Observable teardown).\n   */\n  private removeThread(threadId: string): void {\n    const state = this.threads.get(threadId);\n    if (!state) {\n      return;\n    }\n\n    // Delete first so concurrent calls see the entry as already removed.\n    this.threads.delete(threadId);\n\n    try {\n      state.channel.leave();\n    } catch {\n      // Channel may already be closed/left.\n    }\n    try {\n      state.socket.disconnect();\n    } catch {\n      // Socket may already be disconnected.\n    }\n  }\n}\n"],"mappings":";;;;;;;;;;;AA6CA,IAAa,0BAAb,cAA6CA,iCAAY;CAIvD,YAAY,SAAyC;AACnD,SAAO;iCAHS,IAAI,KAA0B;AAK9C,OAAK,UAAU;;;;;;;;;;;;;;;;;;;;;;;CAwBjB,AAAQ,eAAuB;EAC7B,MAAM,SAAS,IAAIC,eAAO,KAAK,QAAQ,KAAK;GAC1C,GAAI,KAAK,QAAQ,YAAY,EAAE,WAAW,KAAK,QAAQ,WAAW,GAAG,EAAE;GACvE,oEACE,KACA,KAAK,QAAQ,kBAAkB,IAChC;GACD,iEACE,KACA,KAAK,QAAQ,eAAe,IAC7B;GACF,CAAC;AACF,SAAO,SAAS;AAChB,SAAO;;CAGT,AAAQ,yBACN,OACA,SACA,OACyB;EAEzB,MAAM,UAAU,EACd,GAFqB,KAAK,oBAAoB,OAAO,MAAM,EAG5D;AAED,UAAQ,cAAc,QAAQ;EAE9B,MAAM,QAAQ,QAAQ,SAAS,QAAQ,UAAU,QAAQ,MAAM;AAE/D,MAAI,MACF,SAAQ,SAAS;AAGnB,SAAO;;CAGT,AAAQ,oBAAoB,OAAkB,OAA+B;EAC3E,MAAM,cAAc;EAIpB,MAAM,mBAAmB,YAAY,YAAY,EAAE;EACnD,MAAM,aAAa,OAAO,iBAAiB,kBAAkB;EAC7D,MAAM,cAAc,OAAO,iBAAiB,mBAAmB;AAE/D,MAAI,cAAc,aAAa;GAC7B,MAAM,WAAW,iBAAiB;AAClC,SAAM,eAAe,KAAK,IAAI,MAAM,cAAc,WAAW,EAAE;AAC/D,UAAO;;EAGT,MAAM,WAAW,MAAM;AAEvB,SAAO;GACL,GAAG;GACH,UAAU;IACR,GAAG;IACH,eACE,OAAO,iBAAiB,kBAAkB,WACtC,iBAAiB,6CACL;IAClB,gBAAgB;IACjB;GACF;;CAGH,IAAI,SAAuD;EACzD,MAAM,EAAE,UAAU,OAAO,OAAO,aAAa;AAG7C,MADiB,KAAK,QAAQ,IAAI,SAAS,EAC7B,UACZ,OAAM,IAAI,MAAM,yBAAyB;AAG3C,SAAO,IAAIC,iBAAY,aAAa;GAClC,MAAM,SAAS,KAAK,cAAc;GAElC,MAAM,eAAe,YAAY;GACjC,MAAM,UAAU,OAAO,QAAQ,aAAa,gBAAgB,EAC1D,OAAO,MAAM,OACd,CAAC;GAEF,MAAM,QAAqB;IACzB;IACA;IACA,WAAW;IACX,eAAe;IACf;IACA,eAAe,EAAE;IACjB,cAAc;IACd,eAAe;IAChB;AACD,QAAK,QAAQ,IAAI,UAAU,MAAM;GAYjC,MAAM,yBAAyB;GAC/B,IAAI,oBAAoB;AAExB,UAAO,cAAc;AACnB;AACA,QAAI,qBAAqB,0BAA0B,MAAM,MACvD,KAAI;AACF,WAAM,MAAM,UAAU;YAChB;KAMV;AAEF,UAAO,aAAa;AAGlB,wBAAoB;KACpB;AAOF,WAAQ,GAAGC,yCAAsB,YAAuB;AACtD,QACE,QAAQ,SAASC,wBAAU,UAC1B,QAA0C,SAAS,OAEpD,MAAK,KAAK,EAAE,UAAU,CAAC;KAEzB;AAEF,WACG,MAAM,CACN,QAAQ,YAAY;AACnB,SAAK,gBAAgB,SAAS,OAAO,SAAS,CAAC,UAAU,EACvD,gBAAgB,SAAS,UAAU,EACpC,CAAC;KACF,CACD,QAAQ,UAAU,SAAS;IAC1B,MAAM,aAAa;KACjB,MAAMA,wBAAU;KAChB,SAAS,2BAA2B,KAAK,UAAU,KAAK;KACxD,MAAM;KACP;AACD,aAAS,KAAK,WAAW;AACzB,UAAM,cAAc,KAAK,WAAW;AACpC,SAAK,aAAa,SAAS;AAC3B,aAAS,UAAU;KACnB,CACD,QAAQ,iBAAiB;IACxB,MAAM,aAAa;KACjB,MAAMA,wBAAU;KAChB,SAAS;KACT,MAAM;KACP;AACD,aAAS,KAAK,WAAW;AACzB,UAAM,cAAc,KAAK,WAAW;AACpC,SAAK,aAAa,SAAS;AAC3B,aAAS,UAAU;KACnB;AAEJ,gBAAa;AACX,SAAK,aAAa,SAAS;;IAE7B;;CAGJ,QAAQ,SAA2D;EACjE,MAAM,EAAE,aAAa;AAErB,SAAO,IAAIF,iBAAY,aAAa;GAClC,MAAM,SAAS,KAAK,cAAc;GAElC,MAAM,UAAU,OAAO,QAAQ,UAAU,WAAW;AAEpD,WAAQ,GAAG,gBAAgB,YAAuB;AAChD,aAAS,KAAK,QAAQ;AAEtB,QACE,QAAQ,SAASE,wBAAU,gBAC3B,QAAQ,SAASA,wBAAU,UAE3B,UAAS,UAAU;KAErB;GAEF,MAAM,gBAAgB;AACpB,YAAQ,OAAO;AACf,WAAO,YAAY;;AAGrB,WACG,MAAM,CACN,QAAQ,YAAY,OAAU,CAC9B,QAAQ,UAAU,SAAS;AAC1B,aAAS,sBACP,IAAI,MAAM,2BAA2B,KAAK,UAAU,KAAK,GAAG,CAC7D;AACD,aAAS;KACT,CACD,QAAQ,iBAAiB;AACxB,aAAS,sBAAM,IAAI,MAAM,4BAA4B,CAAC;AACtD,aAAS;KACT;AAEJ,gBAAa;AACX,aAAS;;IAEX;;CAGJ,UAAU,SAAwD;EAChE,MAAM,QAAQ,KAAK,QAAQ,IAAI,QAAQ,SAAS;AAChD,SAAO,QAAQ,QAAQ,OAAO,aAAa,MAAM;;CAGnD,KAAK,SAA+D;EAClE,MAAM,QAAQ,KAAK,QAAQ,IAAI,QAAQ,SAAS;AAChD,MAAI,CAAC,SAAS,CAAC,MAAM,aAAa,MAAM,cACtC,QAAO,QAAQ,QAAQ,MAAM;AAG/B,QAAM,gBAAgB;AAGtB,MAAI,MAAM,MACR,KAAI;AACF,SAAM,MAAM,UAAU;UAChB;AAKV,SAAO,QAAQ,QAAQ,KAAK;;CAG9B,AAAQ,gBACN,SACA,OACA,UACkB;EAClB,MAAM,EAAE,eAAe,YAAY;EACnC,MAAM,sBAAsB,UAA2B;GACrD,MAAM,iBAAiB,KAAK,oBAAoB,OAAO,MAAM;AAC7D,iBAAc,KAAK,eAAe;AAElC,OAAI,eAAe,SAASA,wBAAU,YACpC,OAAM,gBAAgB;AAGxB,WAAQ,KACN,SACA,KAAK,yBAAyB,gBAAgB,SAAS,MAAM,CAC9D;;EAGH,MAAM,kCACJ,QAAQ,0BAA0B,QAAQ,MAAM;EAElD,MAAM,wBACJ,WACoB;GACpB,MAAM,YAAY,QAAQ,SAAS,QAAQ;GAC3C,MAAM,yBAAyB,2BAA2B;AAE1D,UAAO;IACL,GAAI,UAAU;KACZ,MAAMA,wBAAU;KAChB,UAAU,QAAQ;KAClB,OAAO,QAAQ,MAAM;KACtB;IACD,OAAO;KACL,GAAG;KACH,GAAI,2BAA2B,SAC3B,EAAE,UAAU,wBAAwB,GACpC,EAAE;KACP;IACF;;EAGH,MAAM,yBAA+B;AACnC,OAAI,CAAC,MAAM,eAAe;AACxB,UAAM,gBAAgB;AACtB,uBAAmB,sBAAsB,CAAC;;;AAI9C,wBACE,QAAQ,MAAM,SAAS,QAAQ,OAAO,EACpC,UAAU,EAAE,YAAkC;AAC5C,OAAI,MAAM,SAASA,wBAAU,aAAa;AACxC,uBAAmB,qBAAqB,MAAyB,CAAC;AAClE;;AAGF,qBAAkB;AAClB,sBAAmB,MAAM;KAE5B,CAAC,CACH,CAAC,qCACY,UAAU;AACpB,qBAAkB;AAKlB,sBAJmB;IACjB,MAAMA,wBAAU;IAChB,SAAS,iBAAiB,QAAQ,MAAM,UAAU,OAAO,MAAM;IAChE,CAC6B;AAC9B,UAAOC;IACP,qCACa;AACb,qBAAkB;GAClB,MAAM,qDAA6B,eAAe,EAChD,eAAe,MAAM,eACtB,CAAC;AACF,QAAK,MAAM,SAAS,SAClB,SAAQ,KACN,SACA,KAAK,yBAAyB,OAAO,SAAS,MAAM,CACrD;AAEH,QAAK,aAAa,SAAS;IAC3B,CACH;;;;;;;;;CAUH,AAAQ,aAAa,UAAwB;EAC3C,MAAM,QAAQ,KAAK,QAAQ,IAAI,SAAS;AACxC,MAAI,CAAC,MACH;AAIF,OAAK,QAAQ,OAAO,SAAS;AAE7B,MAAI;AACF,SAAM,QAAQ,OAAO;UACf;AAGR,MAAI;AACF,SAAM,OAAO,YAAY;UACnB"}