Press n or j to go to the next uncovered block, b, p or k for the previous block.
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 | 2x 2x 2x 2x 2x 2x 4x 4x 4x 4x 4x 4x 4x 4x 4x 4x 4x 6x 2x 1x 1x 1x 1x 4x 4x 3x 3x 3x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 5x 5x 1x | /**
* EventStream 클래스
* 워커와 메인 스레드 간 지속적인 통신을 위한 스트림 구현
*/
import { EventEmitter } from "eventemitter3";
import {
StreamOptions,
StreamStatus,
StreamMessageType,
StreamEvents,
StreamMessage,
} from "../types/stream.js";
import { WorkerType, TaskPriority } from "../types/index.js";
import { generateId } from "./utils.js";
import { logger } from "../utils/logger.js";
/**
* EventStream 클래스
* 워커와의 양방향 통신 스트림을 제공
*/
export class EventStream<T = any> extends EventEmitter {
/** 스트림 ID */
private id: string;
/** 스트림 상태 */
private status: StreamStatus = StreamStatus.INITIALIZING;
/** 워커 ID */
private workerId?: string;
/** 스트림 옵션 */
private options: Required<StreamOptions>;
/** 마지막 활동 시간 */
private lastActivityTime: number = Date.now();
/** 타임아웃 타이머 */
private timeoutTimer?: NodeJS.Timeout;
/**
* EventStream 생성자
* @param sendMessage 메시지 전송 콜백
* @param options 스트림 옵션
*/
constructor(
private sendMessage: (message: StreamMessage<T>) => Promise<void>,
options: StreamOptions = {}
) {
super();
// 스트림 ID 생성
this.id = generateId();
// 기본 옵션 설정
this.options = {
workerType: WorkerType.CALC,
priority: TaskPriority.NORMAL,
initialData: undefined,
timeout: 0, // 0은 타임아웃 없음
autoCleanup: true,
metadata: {},
...options,
};
// 타임아웃 설정
Iif (this.options.timeout > 0) {
this.setupTimeoutTimer();
}
// 스트림 초기화
this.initialize();
}
/**
* 스트림 초기화
*/
private async initialize(): Promise<void> {
try {
// 초기화 메시지 전송
await this.sendMessage({
type: StreamMessageType.INIT,
streamId: this.id,
data: this.options.initialData,
timestamp: Date.now(),
});
logger.debug(`Stream ${this.id} initialized`);
} catch (error) {
this.status = StreamStatus.ERROR;
this.emit("error", error);
logger.error(`Stream initialization error:`, error);
}
}
/**
* 스트림 ID 가져오기
*/
public getId(): string {
return this.id;
}
/**
* 스트림 상태 가져오기
*/
public getStatus(): StreamStatus {
return this.status;
}
/**
* 메시지 전송
* @param data 전송할 데이터
*/
public async send(data: T): Promise<void> {
// 스트림 상태 확인
Iif (
this.status !== StreamStatus.ACTIVE &&
this.status !== StreamStatus.INITIALIZING
) {
throw new Error(`Cannot send message: stream is ${this.status}`);
}
try {
// 메시지 전송
await this.sendMessage({
type: StreamMessageType.MESSAGE,
streamId: this.id,
data,
timestamp: Date.now(),
});
// 마지막 활동 시간 업데이트
this.updateActivity();
} catch (error) {
logger.error(`Error sending message to stream ${this.id}:`, error);
this.emit("error", error);
}
}
/**
* 메시지 수신 처리
* @param message 수신된 메시지
*/
public handleMessage(message: StreamMessage): void {
// 활동 시간 업데이트
this.updateActivity();
// 메시지 타입에 따른 처리
switch (message.type) {
case StreamMessageType.READY:
this.status = StreamStatus.ACTIVE;
this.emit("ready");
break;
case StreamMessageType.MESSAGE:
if (this.status === StreamStatus.ACTIVE) {
this.emit("message", message.data);
}
break;
case StreamMessageType.PAUSE:
this.status = StreamStatus.PAUSED;
this.emit("pause");
break;
case StreamMessageType.RESUME:
this.status = StreamStatus.ACTIVE;
this.emit("resume");
break;
case StreamMessageType.ERROR:
this.emit("error", new Error(message.error || "Unknown stream error"));
break;
case StreamMessageType.CLOSE:
this.close();
break;
default:
logger.debug(`Unknown stream message type: ${message.type}`);
}
}
/**
* 스트림 일시 중지
*/
public async pause(): Promise<void> {
Iif (this.status !== StreamStatus.ACTIVE) return;
try {
await this.sendMessage({
type: StreamMessageType.PAUSE,
streamId: this.id,
timestamp: Date.now(),
});
this.status = StreamStatus.PAUSED;
this.emit("pause");
} catch (error) {
logger.error(`Error pausing stream ${this.id}:`, error);
}
}
/**
* 스트림 재개
*/
public async resume(): Promise<void> {
Iif (this.status !== StreamStatus.PAUSED) return;
try {
await this.sendMessage({
type: StreamMessageType.RESUME,
streamId: this.id,
timestamp: Date.now(),
});
this.status = StreamStatus.ACTIVE;
this.emit("resume");
} catch (error) {
logger.error(`Error resuming stream ${this.id}:`, error);
}
}
/**
* 스트림 종료
*/
public async close(): Promise<void> {
Iif (this.status === StreamStatus.CLOSED) return;
try {
// 종료 메시지 전송
if (this.status !== StreamStatus.ERROR) {
await this.sendMessage({
type: StreamMessageType.CLOSE,
streamId: this.id,
timestamp: Date.now(),
});
}
// 상태 업데이트
this.status = StreamStatus.CLOSED;
// 타임아웃 타이머 정리
this.clearTimeoutTimer();
// 이벤트 발행 및 리스너 정리
this.emit("close");
if (this.options.autoCleanup) {
this.removeAllListeners();
}
logger.debug(`Stream ${this.id} closed`);
} catch (error) {
logger.error(`Error closing stream ${this.id}:`, error);
}
}
/**
* 워커 ID 설정
* @param workerId 워커 ID
*/
public setWorkerId(workerId: string): void {
this.workerId = workerId;
}
/**
* 활동 시간 업데이트
*/
private updateActivity(): void {
this.lastActivityTime = Date.now();
// 타임아웃 타이머 재설정
Iif (this.options.timeout > 0) {
this.clearTimeoutTimer();
this.setupTimeoutTimer();
}
}
/**
* 타임아웃 타이머 설정
*/
private setupTimeoutTimer(): void {
Iif (this.options.timeout <= 0) return;
this.timeoutTimer = setTimeout(() => {
const inactiveTime = Date.now() - this.lastActivityTime;
Iif (inactiveTime >= this.options.timeout) {
logger.warn(
`Stream ${this.id} timed out after ${inactiveTime}ms of inactivity`
);
this.close();
}
}, this.options.timeout);
}
/**
* 타임아웃 타이머 정리
*/
private clearTimeoutTimer(): void {
Iif (this.timeoutTimer) {
clearTimeout(this.timeoutTimer);
this.timeoutTimer = undefined;
}
}
}
|