/** * Full-stack stream test exercising the entire wrapper: * resources (TaggedRequestFor) * → controllers (Router(...)({ effect })) * → router (makeRouter / matchAll) * → api ClientFactory (ApiClientFactory.makeFor) * * Server runs over real HTTP (NodeHttpServer on a loopback port). Client uses * FetchHttpClient through ApiClientFactory. This covers the wrapper-level * `Stream` request constructor end-to-end. */ import { NodeHttpServer } from "@effect/platform-node" import { expect, it } from "@effect/vitest" import { ApiClientFactory, makeRpcClient } from "effect-app/client" import { HttpRouter, HttpServer } from "effect-app/http" import { DefaultGenericMiddlewares } from "effect-app/middleware" import { MiddlewareMaker } from "effect-app/rpc" import * as S from "effect-app/Schema" import { TaggedErrorClass } from "effect-app/Schema" import * as Effect from "effect/Effect" import * as Exit from "effect/Exit" import * as Layer from "effect/Layer" import * as Option from "effect/Option" import * as Stream from "effect/Stream" import { FetchHttpClient } from "effect/unstable/http" import { RpcSerialization } from "effect/unstable/rpc" import { createServer } from "http" import { makeRouter } from "../src/api/routing.js" import { DefaultGenericMiddlewaresLive } from "../src/api/routing/middleware.js" import { AllowAnonymous, AllowAnonymousLive, RequestContextMap, RequireRoles, RequireRolesLive, SomeElseMiddleware, SomeElseMiddlewareLive, SomeService, Test, TestLive } from "./fixtures.js" // --------------------------------------------------------------------------- // Middleware (mirrors the boilerplate AppMiddleware shape). // --------------------------------------------------------------------------- class AppMiddleware extends MiddlewareMaker .Tag()("AppMiddleware", RequestContextMap) .middleware(RequireRoles, Test) .middleware(AllowAnonymous) .middleware(SomeElseMiddleware) .middleware(...DefaultGenericMiddlewares) { static Default = this.layer.pipe( Layer.provide( [ RequireRolesLive.pipe(Layer.provide(SomeService.Default)), AllowAnonymousLive, TestLive, SomeElseMiddlewareLive, DefaultGenericMiddlewaresLive ] as const ) ) } const { Router, matchAll } = makeRouter(AppMiddleware.Default) // --------------------------------------------------------------------------- // Resources — Stream with and without payload. // --------------------------------------------------------------------------- const { TaggedRequestFor } = makeRpcClient(AppMiddleware) const Req = TaggedRequestFor("Streamy") class StreamTicks extends Req.Command()("StreamTicks", {}, { stream: true, allowAnonymous: true, success: S.Number }) {} class StreamCountTo extends Req.Command()("StreamCountTo", { to: S.Number }, { stream: true, allowAnonymous: true, success: S.Number }) {} class StreamRealtime extends Req.Command()("StreamRealtime", {}, { stream: true, allowAnonymous: true, success: S.Number }) {} class StreamBoom extends TaggedErrorClass()("StreamBoom", { reason: S.String }) {} class StreamFailStream extends Req.Command()("StreamFailStream", {}, { stream: true, allowAnonymous: true, success: S.Number, error: StreamBoom }) {} class StreamNoSuccess extends Req.Command()("StreamNoSuccess", {}, { stream: true, allowAnonymous: true }) {} // Defaults to allowAnonymous: false → AllowAnonymous middleware fails with NotLoggedInError // when the request lacks the `x-user` header. class StreamRequiresAuth extends Req.Command()("StreamRequiresAuth", {}, { stream: true, success: S.Number }) {} class CommandRequiresAuth extends Req.Command()("CommandRequiresAuth", {}, { success: S.Number }) {} class QueryRequiresAuth extends Req.Query()("QueryRequiresAuth", {}, { success: S.Number }) {} const StreamyRsc = { StreamTicks, StreamCountTo, StreamRealtime, StreamFailStream, StreamNoSuccess, StreamRequiresAuth, CommandRequiresAuth, QueryRequiresAuth } // --------------------------------------------------------------------------- // Controllers / router — Stream impls returned from the match callback. // --------------------------------------------------------------------------- const router = Router(StreamyRsc)({ *effect(match) { return match({ StreamTicks: () => Stream.fromIterable([10, 20, 30]), StreamCountTo: ({ to }: { readonly to: number }) => Effect .gen(function*() { return Stream.range(1, to) }) .pipe(Stream.unwrap), // emits 3 values 100ms apart so the test can prove element-by-element // delivery rather than a single batched response StreamRealtime: () => Stream.fromIterable([1, 2, 3]).pipe( Stream.mapEffect((n) => Effect.sleep("100 millis").pipe(Effect.as(n))) ), StreamFailStream: () => Stream.fail(new StreamBoom({ reason: "from-stream" })), StreamNoSuccess: () => Stream.empty, // handlers below are unreachable when middleware-auth fails; bodies exist // only so the resource type-checks StreamRequiresAuth: () => Stream.fromIterable([1, 2, 3]), CommandRequiresAuth: () => Effect.succeed(1), QueryRequiresAuth: () => Effect.succeed(1) }) } }) const RpcRouterLayer = matchAll({ router }) // --------------------------------------------------------------------------- // HTTP wiring — real server on a loopback port + FetchHttpClient on the client. // --------------------------------------------------------------------------- // Server binds an ephemeral port (port: 0). The actual URL is read from the // `HttpServer` service after binding, then fed into `ApiClientFactory.layer` so // each `it.live` scope gets a fresh server without colliding on a fixed port. const NodeServerLayer = NodeHttpServer.layer(() => createServer(), { port: 0 }) const ServerLayer = HttpRouter .serve(RpcRouterLayer) .pipe( Layer.provide(NodeServerLayer), Layer.provide(RpcSerialization.layerNdjson) ) const ClientLayer = Layer .unwrap( Effect.gen(function*() { const server = yield* HttpServer.HttpServer const addr = server.address if (addr._tag !== "TcpAddress") return yield* Effect.die(new Error("expected TcpAddress")) const host = addr.hostname === "0.0.0.0" ? "127.0.0.1" : addr.hostname const url = `http://${host}:${addr.port}` return ApiClientFactory .layer({ url, headers: Option.none() }) .pipe(Layer.provide(FetchHttpClient.layer)) }) ) .pipe(Layer.provide(NodeServerLayer)) const TestLayer = Layer.mergeAll(ServerLayer, ClientLayer) // --------------------------------------------------------------------------- // Tests // --------------------------------------------------------------------------- it.live( "stream resource without input: ApiClientFactory client emits all values", Effect.fnUntraced(function*() { const client = yield* ApiClientFactory.makeFor(Layer.empty)(StreamyRsc) const values = yield* Stream.runCollect(client.StreamTicks.handler()) expect(values).toStrictEqual([10, 20, 30]) }, Effect.provide(TestLayer)), { timeout: 10_000 } ) it.live( "stream resource with input: payload drives the emitted values", Effect.fnUntraced(function*() { const client = yield* ApiClientFactory.makeFor(Layer.empty)(StreamyRsc) const values = yield* Stream.runCollect(client.StreamCountTo.handler({ to: 4 })) expect(values).toStrictEqual([1, 2, 3, 4]) }, Effect.provide(TestLayer)), { timeout: 10_000 } ) it.live( "stream resource is delivered element-by-element in real time (not batched)", Effect.fnUntraced(function*() { const client = yield* ApiClientFactory.makeFor(Layer.empty)(StreamyRsc) const start = Date.now() const arrivals = yield* Stream.runCollect( client.StreamRealtime.handler().pipe( Stream.map((n) => ({ n, at: Date.now() - start })) ) ) expect(arrivals.map((_) => _.n)).toStrictEqual([1, 2, 3]) // server emits each value 100ms after the previous one. If the response // were batched, deltas would be ~0ms. Allow generous slack for CI jitter // but require clear separation between consecutive arrivals. const delta1 = arrivals[1]!.at - arrivals[0]!.at const delta2 = arrivals[2]!.at - arrivals[1]!.at expect(delta1).toBeGreaterThan(50) expect(delta2).toBeGreaterThan(50) // first element should not be withheld until the whole stream completes expect(arrivals[0]!.at).toBeLessThan(arrivals[2]!.at - 50) }, Effect.provide(TestLayer)), { timeout: 10_000 } ) it.live( "stream handler returning Stream.fail surfaces as failing stream on client", Effect.fnUntraced(function*() { const client = yield* ApiClientFactory.makeFor(Layer.empty)(StreamyRsc) const exit = yield* Stream.runCollect(client.StreamFailStream.handler()).pipe(Effect.exit) expect(Exit.isFailure(exit)).toBe(true) if (Exit.isFailure(exit)) { const failures = (exit.cause as any).reasons as ReadonlyArray<{ _tag: "Fail"; error: StreamBoom }> expect(failures.length).toBeGreaterThan(0) expect(failures[0]!.error._tag).toBe("StreamBoom") expect(failures[0]!.error.reason).toBe("from-stream") } }, Effect.provide(TestLayer)), { timeout: 10_000 } ) it.live( "stream resource without `success` exposes handler as a Stream on the client", Effect.fnUntraced(function*() { const client = yield* ApiClientFactory.makeFor(Layer.empty)(StreamyRsc) const exit = yield* Stream.runCollect(client.StreamNoSuccess.handler()).pipe(Effect.exit) expect(Exit.isSuccess(exit)).toBe(true) }, Effect.provide(TestLayer)), { timeout: 10_000 } ) const expectNotLoggedIn = (exit: Exit.Exit) => { expect(Exit.isFailure(exit)).toBe(true) if (!Exit.isFailure(exit)) return const failures = (exit.cause as any).reasons as ReadonlyArray<{ _tag: "Fail"; error: any }> expect(failures.length).toBeGreaterThan(0) // The bug surfaces here as a SchemaError ("Expected never | { _tag: 'error', ... }") // because the middleware-thrown NotLoggedInError doesn't match the // wire StreamFailureChunk / CommandFailureWithMetaData wrapping. expect(failures[0]!.error?._tag).toBe("NotLoggedInError") } it.live( "stream resource: middleware-emitted NotLoggedInError surfaces cleanly on the client", Effect.fnUntraced(function*() { const client = yield* ApiClientFactory.makeFor(Layer.empty)(StreamyRsc) const exit = yield* Stream.runCollect(client.StreamRequiresAuth.handler()).pipe(Effect.exit) expectNotLoggedIn(exit) }, Effect.provide(TestLayer)), { timeout: 10_000 } ) it.live( "command resource: middleware-emitted NotLoggedInError surfaces cleanly on the client", Effect.fnUntraced(function*() { const client = yield* ApiClientFactory.makeFor(Layer.empty)(StreamyRsc) const exit = yield* client.CommandRequiresAuth.handler().pipe(Effect.exit) expectNotLoggedIn(exit) }, Effect.provide(TestLayer)), { timeout: 10_000 } ) it.live( "query resource: middleware-emitted NotLoggedInError surfaces cleanly on the client", Effect.fnUntraced(function*() { const client = yield* ApiClientFactory.makeFor(Layer.empty)(StreamyRsc) const exit = yield* client.QueryRequiresAuth.handler().pipe(Effect.exit) expectNotLoggedIn(exit) }, Effect.provide(TestLayer)), { timeout: 10_000 } )