import { Layer } from "effect"; import * as Context from "effect/Context"; import * as Effect from "effect/Effect"; import type { Simplify } from "effect/Types"; import { App } from "./app.ts"; import type { AnyBinding, BindingService } from "./binding.ts"; import { type PlanStatusSession, type ScopedPlanStatusSession, CLI, } from "./cli/service.ts"; import type { ApplyStatus } from "./event.ts"; import { generateInstanceId, InstanceId } from "./instance-id.ts"; import * as Output from "./output.ts"; import { type Apply, type BindNode, type Delete, type DerivePlan, type IPlan, type Providers, plan, } from "./plan.ts"; import type { Instance } from "./policy.ts"; import { getProviderByType } from "./provider.ts"; import type { AnyResource, Resource } from "./resource.ts"; import type { AnyService } from "./service.ts"; import { type CreatedResourceState, type CreatingResourceState, type DeletingResourceState, type ReplacedResourceState, type ReplacingResourceState, type ResourceState, type UpdatedResourceState, type UpdatingReourceState, State, StateStoreError, } from "./state.ts"; import { asEffect } from "./util.ts"; export type ApplyEffect< P extends IPlan, Err = never, Req = never, > = Effect.Effect< { [k in keyof AppliedPlan

]: AppliedPlan

[k]; }, Err, Req >; export type AppliedPlan

= { [id in keyof P["resources"]]: P["resources"][id] extends | Delete | undefined | never ? never : Simplify; }; export const apply = < const Resources extends (AnyService | AnyResource)[] = never, >( ...resources: Resources ): ApplyEffect< DerivePlan>, never, State | Providers> // TODO(sam): don't cast to any > => plan(...resources).pipe( Effect.flatMap((p) => applyPlan(p as any as IPlan)), ) as any; export const applyPlan =

(plan: P) => Effect.gen(function* () { const cli = yield* CLI; const session = yield* cli.startApplySession(plan); // 1. expand the graph (create new resources, update existing and create replacements) const resources = yield* expandAndPivot(plan, session); // TODO(sam): support roll back to previous state if errors occur during expansion // -> RISK: some UPDATEs may not be reverisble (i.e. trigger replacements) // TODO(sam): should pivot be done separately? E.g shift traffic? // 2. delete orphans and replaced resources yield* collectGarbage(plan, session); yield* session.done(); if (Object.keys(plan.resources).length === 0) { // all resources are deleted, return undefined return undefined; } return resources as { [k in keyof AppliedPlan

]: AppliedPlan

[k]; }; }); const expandAndPivot = Effect.fnUntraced(function* ( plan: IPlan, session: PlanStatusSession, ) { const state = yield* State; const app = yield* App; const outputs = {} as Record>; const resolveUpstream = Effect.fn(function* (resourceId: string) { const upstreamNode = plan.resources[resourceId]; const upstreamAttr = upstreamNode ? yield* apply(upstreamNode) : yield* Effect.dieMessage(`Resource ${resourceId} not found`); return { resourceId, upstreamAttr, upstreamNode, }; }); const resolveBindingUpstream = Effect.fn(function* ({ node, }: { node: BindNode; resource: Resource; }) { const binding = node.binding as AnyBinding & { // smuggled property (because it interacts poorly with inference) Tag: Context.Tag; }; const provider = yield* binding.Tag; const resourceId: string = node.binding.capability.resource.id; const { upstreamAttr, upstreamNode } = yield* resolveUpstream(resourceId); return { resourceId, upstreamAttr, upstreamNode, provider, }; }); const attachBindings = ({ resource, bindings, target, }: { resource: Resource; bindings: BindNode[]; target: { id: string; props: any; attr: any; }; }) => Effect.all( bindings.map( Effect.fn(function* (node) { const { resourceId, upstreamAttr, upstreamNode, provider } = yield* resolveBindingUpstream({ node, resource }); const input = { source: { id: resourceId, attr: upstreamAttr, props: upstreamNode.resource.props, }, props: node.binding.props, attr: node.attr, target, } as const; if (node.action === "attach") { return yield* asEffect(provider.attach(input)); } else if (node.action === "reattach") { // reattach is optional, we fall back to attach if it's not available return yield* asEffect( (provider.reattach ? provider.reattach : provider.attach)(input), ); } else if (node.action === "detach" && provider.detach) { return yield* asEffect( provider.detach({ ...input, target, }), ); } return node.attr; }), ), ); const postAttachBindings = ({ bindings, bindingOutputs, resource, target, }: { bindings: BindNode[]; bindingOutputs: any[]; resource: Resource; target: { id: string; props: any; attr: any; }; }) => Effect.all( bindings.map( Effect.fn(function* (node, i) { const { resourceId, upstreamAttr, upstreamNode, provider } = yield* resolveBindingUpstream({ node, resource }); const oldBindingOutput = bindingOutputs[i]; if ( provider.postattach && (node.action === "attach" || node.action === "reattach") ) { const bindingOutput = yield* asEffect( provider.postattach({ source: { id: resourceId, attr: upstreamAttr, props: upstreamNode.resource.props, }, props: node.binding.props, attr: oldBindingOutput, target, } as const), ); return { ...oldBindingOutput, ...bindingOutput, }; } return oldBindingOutput; }), ), { concurrency: "unbounded" }, ); const apply: (node: Apply) => Effect.Effect = (node) => Effect.gen(function* () { const commit = (value: State) => state.set({ stack: app.name, stage: app.stage, resourceId: node.resource.id, value, }); const id = node.resource.id; const resource = node.resource; const scopedSession = { ...session, note: (note: string) => session.emit({ id, kind: "annotate", message: note, }), } satisfies ScopedPlanStatusSession; return yield* (outputs[id] ??= yield* Effect.cached( Effect.gen(function* () { const report = (status: ApplyStatus) => session.emit({ kind: "status-change", id, type: node.resource.type, status, }); if (node.action === "noop") { return node.state.attr; } // resolve upstream dependencies before committing any changes to state const upstream = Object.fromEntries( yield* Effect.all( Object.entries(Output.resolveUpstream(node.props)).map(([id]) => resolveUpstream(id).pipe( Effect.map(({ upstreamAttr }) => [id, upstreamAttr]), ), ), { concurrency: "unbounded" }, ), ); const instanceId = yield* Effect.gen(function* () { if (node.action === "create" && !node.state?.instanceId) { const instanceId = yield* generateInstanceId(); yield* commit({ status: "creating", instanceId, logicalId: id, downstream: node.downstream, props: node.props, providerVersion: node.provider.version ?? 0, resourceType: node.resource.type, bindings: node.bindings, }); return instanceId; } else if (node.action === "replace") { if ( node.state.status === "replaced" || node.state.status === "replacing" ) { // replace has already begun and we have the new instanceId, do not re-create it return node.state.instanceId; } const instanceId = yield* generateInstanceId(); yield* commit({ status: "replacing", instanceId, logicalId: id, downstream: node.downstream, props: node.props, providerVersion: node.provider.version ?? 0, resourceType: node.resource.type, bindings: node.bindings, old: node.state, deleteFirst: node.deleteFirst, }); return instanceId; } else if (node.state?.instanceId) { // we're in a create, update or delete state with a stable instanceId, use it return node.state.instanceId; } // this should never happen return yield* Effect.dieMessage( `Instance ID not found for resource '${id}' and action is '${node.action}'`, ); }); const apply = Effect.gen(function* () { if (node.action === "create") { const news = (yield* Output.evaluate( node.props, upstream, )) as Record; const checkpoint = (attr: any) => commit({ status: "creating", logicalId: id, instanceId, resourceType: node.resource.type, props: news, attr, providerVersion: node.provider.version ?? 0, bindings: node.bindings, downstream: node.downstream, }); if (!node.state) { yield* checkpoint(undefined); } let attr: any; if ( node.action === "create" && node.provider.precreate && // pre-create is only designed to ensure the resource exists, if we have state.attr, then it already exists and should be skipped node.state?.attr === undefined ) { yield* report("pre-creating"); // stub the resource prior to resolving upstream resources or bindings if a stub is available attr = yield* node.provider.precreate({ id, news: node.props, session: scopedSession, instanceId, }); yield* checkpoint(attr); } yield* report("attaching"); let bindingOutputs = yield* attachBindings({ resource, bindings: node.bindings, target: { id, props: news, attr, }, }); yield* report("creating"); attr = yield* node.provider.create({ id, news, instanceId, bindings: bindingOutputs, session: scopedSession, }); yield* checkpoint(attr); yield* report("post-attach"); bindingOutputs = yield* postAttachBindings({ resource, bindings: node.bindings, bindingOutputs, target: { id, props: news, attr, }, }); yield* commit({ status: "created", logicalId: id, instanceId, resourceType: node.resource.type, props: news, attr, bindings: node.bindings.map((binding, i) => ({ ...binding, attr: bindingOutputs[i], })), providerVersion: node.provider.version ?? 0, downstream: node.downstream, }); yield* report("created"); return attr; } else if (node.action === "update") { const upstream = Object.fromEntries( yield* Effect.all( Object.entries(Output.resolveUpstream(node.props)).map( ([id]) => resolveUpstream(id).pipe( Effect.map(({ upstreamAttr }) => [id, upstreamAttr]), ), ), { concurrency: "unbounded" }, ), ); const news = (yield* Output.evaluate( node.props, upstream, )) as Record; const checkpoint = (attr: any) => { if (node.state.status === "replaced") { return commit({ ...node.state, attr, props: news, }); } else { return commit({ status: "updating", logicalId: id, instanceId, resourceType: node.resource.type, props: news, attr, providerVersion: node.provider.version ?? 0, bindings: node.bindings, downstream: node.downstream, old: node.state.status === "updating" ? node.state.old : node.state, }); } }; yield* checkpoint(node.state.attr); yield* report("attaching"); let bindingOutputs = yield* attachBindings({ resource, bindings: node.bindings, target: { id, props: news, attr: node.state.attr, }, }); yield* report("updating"); const attr = yield* node.provider.update({ id, news, instanceId, bindings: bindingOutputs, session: scopedSession, olds: node.state.status === "created" || node.state.status === "updated" || node.state.status === "replaced" ? node.state.props : node.state.old.props, output: node.state.attr, }); yield* checkpoint(attr); yield* report("post-attach"); bindingOutputs = yield* postAttachBindings({ resource, bindings: node.bindings, bindingOutputs, target: { id, props: news, attr, }, }); if (node.state.status === "replaced") { yield* commit({ ...node.state, attr, props: news, }); } else { yield* commit({ status: "updated", logicalId: id, instanceId, resourceType: node.resource.type, props: news, attr, bindings: node.bindings.map((binding, i) => ({ ...binding, attr: bindingOutputs[i], })), providerVersion: node.provider.version ?? 0, downstream: node.downstream, }); } yield* report("updated"); return attr; } else if (node.action === "replace") { if (node.state.status === "replaced") { // we've already created the replacement resource, return the output return node.state.attr; } let state: ReplacingResourceState; if (node.state.status !== "replacing") { yield* commit( (state = { status: "replacing", logicalId: id, instanceId, resourceType: node.resource.type, props: node.props, attr: node.state.attr, providerVersion: node.provider.version ?? 0, deleteFirst: node.deleteFirst, old: node.state, downstream: node.downstream, }), ); } else { state = node.state; } const upstream = Object.fromEntries( yield* Effect.all( Object.entries(Output.resolveUpstream(node.props)).map( ([id]) => resolveUpstream(id).pipe( Effect.map(({ upstreamAttr }) => [id, upstreamAttr]), ), ), { concurrency: "unbounded" }, ), ); const news = (yield* Output.evaluate( node.props, upstream, )) as Record; const checkpoint = < S extends ReplacingResourceState | ReplacedResourceState, >({ status, attr, bindings, }: Pick) => commit({ status, logicalId: id, instanceId, resourceType: node.resource.type, props: news, attr, providerVersion: node.provider.version ?? 0, bindings: bindings ?? node.bindings, downstream: node.downstream, old: state.old, deleteFirst: node.deleteFirst, } as S); let attr: any; if ( node.provider.precreate && // pre-create is only designed to ensure the resource exists, if we have state.attr, then it already exists and should be skipped node.state?.attr === undefined ) { yield* report("pre-creating"); // stub the resource prior to resolving upstream resources or bindings if a stub is available attr = yield* node.provider.precreate({ id, news: node.props, session: scopedSession, instanceId, }); yield* checkpoint({ status: "replacing", attr, }); } yield* report("attaching"); let bindingOutputs = yield* attachBindings({ resource, bindings: node.bindings, target: { id, props: news, attr, }, }); yield* report("creating replacement"); attr = yield* node.provider.create({ id, news, instanceId, bindings: bindingOutputs, session: scopedSession, }); yield* checkpoint({ status: "replacing", attr, }); yield* report("post-attach"); bindingOutputs = yield* postAttachBindings({ resource, bindings: node.bindings, bindingOutputs, target: { id, props: news, attr, }, }); yield* checkpoint({ status: "replaced", attr, bindings: node.bindings.map((binding, i) => ({ ...binding, attr: bindingOutputs[i], })), }); yield* report("created"); return attr; } // @ts-expect-error return yield* Effect.dieMessage(`Unknown action: ${node.action}`); }); // provide the resource-specific context (InstanceId, etc.) return yield* apply.pipe( Effect.provide(Layer.succeed(InstanceId, instanceId)), ); }), )); }) as Effect.Effect; return Object.fromEntries( yield* Effect.all( Object.entries(plan.resources).map( Effect.fn(function* ([id, node]) { return [id, yield* apply(node)]; }), ), { concurrency: "unbounded" }, ), ); }); const collectGarbage = Effect.fnUntraced(function* ( plan: IPlan, session: PlanStatusSession, ) { const state = yield* State; const app = yield* App; const deletions: { [logicalId in string]: Effect.Effect; } = {}; // delete all replaced resources const replacedResources = yield* state.getReplacedResources({ stack: app.name, stage: app.stage, }); const deletionGraph = { ...plan.deletions, ...Object.fromEntries( replacedResources.map((replaced) => [replaced.logicalId, replaced]), ), }; const deleteResource: ( node: Delete | ReplacedResourceState, ) => Effect.Effect = Effect.fnUntraced( function* (node: Delete | ReplacedResourceState) { const isDeleteNode = ( node: Delete | ReplacedResourceState, ): node is Delete => "action" in node; const { logicalId, resourceType, instanceId, downstream, props, attr, provider, } = isDeleteNode(node) ? { logicalId: node.resource.id, resourceType: node.resource.type, instanceId: node.state.instanceId, downstream: node.downstream, props: node.state.props, attr: node.state.attr, provider: node.provider, } : { logicalId: node.logicalId, resourceType: node.old.resourceType, instanceId: node.old.instanceId, downstream: node.old.downstream, props: node.old.props, attr: node.old.attr, provider: yield* getProviderByType(node.old.resourceType), }; const commit = (value: State) => state.set({ stack: app.name, stage: app.stage, resourceId: logicalId, value, }); const report = (status: ApplyStatus) => session.emit({ kind: "status-change", id: logicalId, type: resourceType, status, }); const scopedSession = { ...session, note: (note: string) => session.emit({ id: logicalId, kind: "annotate", message: note, }), } satisfies ScopedPlanStatusSession; return yield* (deletions[logicalId] ??= yield* Effect.cached( Effect.gen(function* () { yield* Effect.all( downstream.map((dep) => dep in deletionGraph ? deleteResource(deletionGraph[dep] as Delete) : Effect.void, ), { concurrency: "unbounded" }, ); yield* report("deleting"); if (isDeleteNode(node)) { yield* commit({ status: "deleting", logicalId, instanceId, resourceType, props, attr, downstream, providerVersion: provider.version ?? 0, bindings: node.bindings, }); } if (attr !== undefined) { yield* provider.delete({ id: logicalId, instanceId, olds: props as never, output: attr, session: scopedSession, bindings: [], }); } if (isDeleteNode(node)) { // TODO(sam): should we commit a tombstone instead? and then clean up tombstones after all deletions are complete? yield* state.delete({ stack: app.name, stage: app.stage, resourceId: logicalId, }); yield* report("deleted"); } else { yield* commit({ status: "created", logicalId, instanceId, resourceType, props: node.props, attr: node.attr, providerVersion: provider.version ?? 0, downstream: node.downstream, bindings: node.bindings, }); yield* report("replaced"); } }).pipe(Effect.provide(Layer.succeed(InstanceId, instanceId))), )); }, ); yield* Effect.all( Object.values(deletionGraph) .filter((node) => node !== undefined) .map(deleteResource), { concurrency: "unbounded" }, ); });