import assert from "assert";
import { getLogger } from "@logtape/logtape";
import { type Logger } from "@logtape/logtape";
import { BackendPostgres, OpenWorkflow } from "@sonamu-kit/tasks";
import { type Worker } from "@sonamu-kit/tasks";
import { serializeRetryPolicy } from "@sonamu-kit/tasks/internal";
import {
type RunnableWorkflow,
type SchemaInput,
type SchemaOutput,
type StandardSchemaV1,
type StepApi,
type WorkflowRunHandle,
type WorkflowSpec,
} from "@sonamu-kit/tasks/internal";
import { type Knex } from "knex";
import { schedule as cronSchedule } from "node-cron";
import { type ScheduledTask } from "node-cron";
import { type ZodObject } from "zod";
import { type Context } from "../api/context";
import { Sonamu } from "../api/sonamu";
import { convertDomainToCategory } from "../logger/category";
import { Naite } from "../naite/naite";
import { createMockSSEFactory } from "../stream/sse";
import { type Executable } from "../types/types";
import { type WorkflowMetadata } from "./decorator";
import { StepWrapper } from "./step-wrapper";
export interface WorkflowOptions {
// worker에서 동시 실행할 태스크 수, 기본은 1
concurrency?: number;
// worker에서 사용할 pub/sub 여부, 기본은 true
usePubSub?: boolean;
// pub/sub으로 태스크를 수신했을 때 워크플로우 실행까지 지연 시간, 기본은 500ms
listenDelay?: number;
}
// Workflow 함수의 타입, @sonamu-kit/tasks와 다른 점은 step을 한번 감싼 형태.
export type WorkflowFunction = (
params: Readonly<{
logger: Logger;
input: Input;
step: StepWrapper;
version: string | null;
}>,
) => Promise | Output;
// Workflow를 등록할 때를 위한 타입
export type WorkflowCreateOptions<
Input,
Output,
TSchema extends StandardSchemaV1 | undefined = undefined,
> = Omit<
WorkflowSpec, Output, SchemaInput>,
"version"
> & {
id: string;
version: string | null;
function: WorkflowFunction, Output>;
};
export class WorkflowManager {
// backend 인스턴스
readonly #backend: BackendPostgres;
// OpenWorkflow 인스턴스
readonly #ow: OpenWorkflow;
// Worker 인스턴스 (없을 수 있으며, 이 때는 분산된 서버 환경에서 DB에 publish만 한다고 가정함)
#worker: Worker | null;
// 파일 경로 -> 파일에 정의된 워크플로우 메타데이터 목록
#workflowsMap: Map;
// Task 이름 -> Task 정보 및 Input 값, Workflow ID
#scheduledTasks: Map<
string,
{
task: ScheduledTask;
inputFn: Executable>;
workflowId: string;
}
>;
// BackendPostgres에서 처리하는 것들이 있어서 Knex 커넥션이 아니라 설정값을 넣어줘야함.
constructor(dbConf: Knex.Config, runMigrations: boolean = true) {
const backend = new BackendPostgres(dbConf, { runMigrations });
this.#backend = backend;
this.#ow = new OpenWorkflow({ backend });
this.#worker = null;
this.#workflowsMap = new Map();
this.#scheduledTasks = new Map();
}
// Sonamu UI API에서 워크플로우 실행 목록 조회, 취소 등에 사용합니다.
get backend(): BackendPostgres {
return this.#backend;
}
get workflowDefinitions() {
return Array.from(this.#workflowsMap.values())
.flat()
.map((wf) => ({
id: wf.id,
name: wf.name,
version: wf.version,
schedules: wf.schedules.map((s) => ({
name: s.name,
expression: s.expression,
})),
retryPolicy: wf.retryPolicy ? serializeRetryPolicy(wf.retryPolicy) : undefined,
}));
}
/**
* 정의된 워크플로우 및 워크플로우에 대한 scheduled tasks를 동기화합니다.
*/
async synchronize(workflowMap: Map) {
// 1. 삭제된 파일은 일괄 삭제
await Promise.allSettled(
Array.from(this.#workflowsMap.entries())
.filter(([key]) => !workflowMap.has(key))
.flatMap(([_, workflows]) =>
workflows.map((workflow) => {
return Promise.allSettled([
...workflow.schedules.map((schedule) => this.unscheduleTask(schedule.name)),
this.unregisterWorkflow(workflow),
]);
}),
),
);
// 2. 새로 추가된 파일은 일괄 등록
await Promise.allSettled(
Array.from(workflowMap.entries())
.filter(([key]) => !this.#workflowsMap.has(key))
.flatMap(([_, workflows]) =>
workflows.map((workflow) => {
this.registerWorkflow({
id: workflow.id,
name: workflow.name,
version: workflow.version ?? null,
schema: workflow.schema,
function: workflow.fn,
retryPolicy: workflow.retryPolicy,
});
return Promise.allSettled(
workflow.schedules.map((schedule) => this.scheduleTask(workflow, schedule)),
);
}),
),
);
// 3. 기존과 다른 것을 diff친 다음, 삭제 후 재등록
await Promise.allSettled(
Array.from(workflowMap.entries())
.filter(([key]) => this.#workflowsMap.has(key))
.map<[string, [WorkflowMetadata[], WorkflowMetadata[]]]>(([key, newWorkflows]) => {
const previousWorkflows = this.#workflowsMap.get(key);
assert(previousWorkflows, "previous workflows not found");
return [key, [previousWorkflows, newWorkflows]];
})
.map(async ([_, [previousWorkflows, newWorkflows]]) => {
// 기존 것들을 삭제부터 해야함.
await Promise.allSettled(
previousWorkflows
.filter((prevItem) => !newWorkflows.some((newItem) => newItem.id === prevItem.id))
.map((prevItem) => {
return Promise.allSettled([
...prevItem.schedules.map((schedule) => this.unscheduleTask(schedule.name)),
this.unregisterWorkflow(prevItem),
]);
}),
);
// 새로 추가된 것들을 등록
await Promise.allSettled(
newWorkflows
.filter(
(newItem) => !previousWorkflows.some((prevItem) => prevItem.id === newItem.id),
)
.map(async (newItem) => {
this.registerWorkflow({
id: newItem.id,
name: newItem.name,
version: newItem.version ?? null,
schema: newItem.schema,
function: newItem.fn,
retryPolicy: newItem.retryPolicy,
});
return Promise.allSettled(
newItem.schedules.map((schedule) => this.scheduleTask(newItem, schedule)),
);
}),
);
}),
);
this.#workflowsMap = workflowMap;
}
// 워크플로우를 실행
run (
options: Omit, "function" | "schema" | "id">,
input: SchemaInput,
): Promise> {
return this.#ow.runWorkflow(
{
name: options.name,
version: options.version ?? undefined,
},
input,
);
}
// cron task를 등록
async scheduleTask(
workflow: Pick,
schedule: WorkflowMetadata["schedules"][number],
) {
// Worker가 활성화된 노드에서만 처리
if (!this.#worker) {
return;
}
const task = cronSchedule(
schedule.expression,
(async (
{ name, version }: Pick,
{ input }: WorkflowMetadata["schedules"][number],
) => {
const inputData = await (typeof input === "function"
? Promise.resolve(input())
: Promise.resolve(input));
return this.run({ name, version: version ?? null }, inputData);
}).bind(this, workflow, schedule),
{
name: schedule.name,
timezone: Sonamu.config.api.timezone,
noOverlap: false,
},
);
this.#scheduledTasks.set(schedule.name, {
task,
inputFn: schedule.input,
workflowId: workflow.id,
});
await task.start();
}
// cron task를 중지
async unscheduleTask(name: string) {
// Worker가 활성화된 노드에서만 처리
if (!this.#worker) {
return;
}
const taskItem = this.#scheduledTasks.get(name);
if (!taskItem) {
console.error("scheduled task not found", name);
return;
}
this.#scheduledTasks.delete(name);
await taskItem.task.stop();
await taskItem.task.destroy();
}
// 워크플로우를 등록, 관련된 스케줄은 별도로 등록해야 함.
registerWorkflow (
options: WorkflowCreateOptions ,
): RunnableWorkflow, Output, SchemaInput> {
const fn = async (
params: Readonly<{
input: SchemaOutput;
step: StepApi;
version: string | null;
}>,
) => {
const baseContext = {
transport: "http" as const,
request: null,
reply: null,
headers: {},
createSSE: (schema: ZodObject) => createMockSSEFactory(schema),
naiteStore: Naite.createStore(),
locale: "",
user: null,
session: null,
} as unknown as Context;
const contextProvider = Sonamu.config.tasks?.contextProvider;
const context: Context = contextProvider
? await Promise.resolve(contextProvider(baseContext))
: baseContext;
const step = new StepWrapper(params.step);
return Sonamu.asyncLocalStorage.run({ context }, () =>
options.function({
input: params.input,
step,
version: params.version,
logger: getLogger(convertDomainToCategory(options.name, "workflow")),
}),
);
};
const workflow = this.#ow.defineWorkflow(
{
name: options.name,
version: options.version ?? undefined,
schema: options.schema,
retryPolicy: options.retryPolicy,
},
fn,
);
return workflow;
}
// 워크플로우를 등록 해제, 관련된 스케줄은 별도로 해제해야 함.
async unregisterWorkflow(workflow: Pick) {
this.#ow.unregisterWorkflow(workflow.name, workflow.version ?? null);
}
// Worker를 설정 후 시작
setupWorker(options: WorkflowOptions) {
this.#worker = this.#ow.newWorker(options);
}
// Worker가 있으면 Worker까지 초기화, 아니면 백엔드만 초기화
async startWorker() {
await this.#backend.initialize();
await this.#worker?.start();
}
// Worker를 중지
async stopWorker() {
await this.#worker?.stop();
}
// cron task들을 모두 중지
async stopSchedules() {
await Promise.allSettled(
Array.from(this.#scheduledTasks.values()).map(({ task }) => Promise.resolve(task.stop())),
);
}
// cron task들을 모두 정리
async destroySchedules() {
await Promise.allSettled(
Array.from(this.#scheduledTasks.values()).map(({ task }) => Promise.resolve(task.destroy())),
);
this.#scheduledTasks.clear();
}
async destroy() {
await this.stopSchedules();
await this.stopWorker();
await this.destroySchedules();
await this.#backend.stop();
}
[Symbol.asyncDispose]() {
return this.destroy();
}
}