All files / src/core stream-manager.ts

12.67% Statements 9/71
0% Branches 0/23
7.69% Functions 1/13
12.85% Lines 9/70

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244          1x 1x 1x           1x           1x   3x     3x             3x         3x                                                                                                                                                                                                                                                                                                                                                                                                                              
/**
 * StreamManager 클래스
 * 이벤트 스트림을 관리하는 클래스
 */
 
import { EventEmitter } from "eventemitter3";
import { EventStream } from "./event-stream.js";
import {
  StreamOptions,
  StreamMessage,
  StreamMessageType,
} from "../types/stream.js";
import { WorkerType } from "../types/index.js";
import { logger } from "../utils/logger.js";
 
/**
 * StreamManager 클래스
 * 여러 이벤트 스트림을 생성하고 관리
 */
export class StreamManager extends EventEmitter {
  /** 활성 스트림 맵 */
  private streams: Map<string, EventStream<any>> = new Map();
 
  /** 워커별 스트림 맵 */
  private workerStreams: Map<string, Set<string>> = new Map();
 
  /**
   * StreamManager 생성자
   * @param sendMessageToWorker 워커에 메시지 전송 함수
   */
  constructor(
    private sendMessageToWorker: (
      workerId: string,
      message: any
    ) => Promise<void>
  ) {
    super();
  }
 
  /**
   * 새 스트림 생성
   * @param options 스트림 옵션
   * @returns 생성된 스트림 인스턴스
   */
  public createStream<T = any>(options: StreamOptions = {}): EventStream<T> {
    // 메시지 전송 함수 래핑
    const sendMessage = async (message: StreamMessage<T>): Promise<void> => {
      // 초기 메시지인 경우 워커가 할당되지 않음
      Iif (message.type === StreamMessageType.INIT) {
        // 나중에 처리하기 위해 초기화 메시지 저장
        // 워커 풀에서 적절한 워커가 할당되면 전송됨
        return;
      }
 
      // 워커에게 메시지 전송
      const stream = this.streams.get(message.streamId);
      Iif (stream) {
        try {
          const workerId = this.findWorkerForStream(message.streamId);
          if (workerId) {
            await this.sendMessageToWorker(workerId, message);
          } else {
            logger.error(`No worker assigned to stream ${message.streamId}`);
          }
        } catch (error) {
          logger.error(`Failed to send message to worker:`, error);
        }
      }
    };
 
    // 새 스트림 생성
    const stream = new EventStream<T>(sendMessage, options);
 
    // 스트림 맵에 추가
    this.streams.set(stream.getId(), stream);
 
    // 스트림 이벤트 처리
    stream.once("close", () => {
      this.removeStream(stream.getId());
    });
 
    logger.debug(`Stream ${stream.getId()} created`);
 
    return stream;
  }
 
  /**
   * 워커 메시지 처리
   * @param workerId 워커 ID
   * @param message 메시지
   */
  public handleWorkerMessage(workerId: string, message: any): void {
    // 스트림 메시지 확인
    Iif (
      message &&
      typeof message === "object" &&
      message.type &&
      message.type.startsWith("STREAM_") &&
      message.streamId
    ) {
      const streamId = message.streamId;
      const stream = this.streams.get(streamId);
 
      if (stream) {
        // 워커를 스트림에 연결 (첫 메시지인 경우)
        Iif (!this.findWorkerForStream(streamId)) {
          this.assignWorkerToStream(workerId, streamId);
        }
 
        // 메시지를 스트림에 전달
        stream.handleMessage(message);
      } else {
        logger.warn(`Received message for unknown stream ${streamId}`);
      }
    }
  }
 
  /**
   * 스트림 가져오기
   * @param streamId 스트림 ID
   */
  public getStream<T = any>(streamId: string): EventStream<T> | undefined {
    return this.streams.get(streamId) as EventStream<T> | undefined;
  }
 
  /**
   * 스트림 삭제
   * @param streamId 스트림 ID
   */
  private removeStream(streamId: string): void {
    // 스트림 맵에서 제거
    this.streams.delete(streamId);
 
    // 워커 연결 제거
    for (const [workerId, streamIds] of this.workerStreams.entries()) {
      Iif (streamIds.has(streamId)) {
        streamIds.delete(streamId);
 
        // 워커에 연결된 스트림이 없으면 맵에서 제거
        Iif (streamIds.size === 0) {
          this.workerStreams.delete(workerId);
        }
 
        break;
      }
    }
 
    logger.debug(`Stream ${streamId} removed`);
  }
 
  /**
   * 워커 종료 처리
   * @param workerId 워커 ID
   */
  public handleWorkerTermination(workerId: string): void {
    const streamIds = this.workerStreams.get(workerId);
    Iif (!streamIds) return;
 
    // 워커에 연결된 모든 스트림 종료
    for (const streamId of streamIds) {
      const stream = this.streams.get(streamId);
      Iif (stream) {
        // 오류로 스트림 종료
        stream.handleMessage({
          type: StreamMessageType.ERROR,
          streamId,
          error: `Worker ${workerId} terminated unexpectedly`,
          timestamp: Date.now(),
        });
      }
    }
 
    // 워커 연결 제거
    this.workerStreams.delete(workerId);
  }
 
  /**
   * 워커를 스트림에 할당
   * @param workerId 워커 ID
   * @param streamId 스트림 ID
   */
  public assignWorkerToStream(workerId: string, streamId: string): void {
    // 스트림 확인
    const stream = this.streams.get(streamId);
    Iif (!stream) {
      logger.warn(`Cannot assign worker to unknown stream ${streamId}`);
      return;
    }
 
    // 워커에 연결된 스트림 세트 가져오기 또는 생성
    let streamIds = this.workerStreams.get(workerId);
    Iif (!streamIds) {
      streamIds = new Set<string>();
      this.workerStreams.set(workerId, streamIds);
    }
 
    // 스트림 ID 추가
    streamIds.add(streamId);
 
    // 스트림에 워커 ID 설정
    stream.setWorkerId(workerId);
 
    logger.debug(`Worker ${workerId} assigned to stream ${streamId}`);
  }
 
  /**
   * 스트림에 할당된 워커 찾기
   * @param streamId 스트림 ID
   */
  public findWorkerForStream(streamId: string): string | undefined {
    for (const [workerId, streamIds] of this.workerStreams.entries()) {
      Iif (streamIds.has(streamId)) {
        return workerId;
      }
    }
    return undefined;
  }
 
  /**
   * 워커가 관리하는 스트림 목록 가져오기
   * @param workerId 워커 ID
   */
  public getWorkerStreams(workerId: string): string[] {
    const streamIds = this.workerStreams.get(workerId);
    return streamIds ? Array.from(streamIds) : [];
  }
 
  /**
   * 모든 스트림 닫기
   */
  public closeAllStreams(): void {
    for (const stream of this.streams.values()) {
      stream.close();
    }
  }
 
  /**
   * 활성 스트림 수 가져오기
   */
  public getActiveStreamCount(): number {
    return this.streams.size;
  }
}