import type { Has } from '@principia/base/Has' import type { UQueue } from '@principia/base/Queue' import { pipe } from '@principia/base/function' import { tag } from '@principia/base/Has' import * as I from '@principia/base/IO' import * as L from '@principia/base/Layer' import * as M from '@principia/base/Managed' import * as Q from '@principia/base/Queue' import * as Ref from '@principia/base/Ref' import * as SRef from '@principia/base/SRef' import * as http from 'http' import { HttpConnection } from './HttpConnection' export interface HttpServerConfig { readonly host: string readonly port: number } export const HttpServerConfig = tag() export function serverConfig(config: HttpServerConfig): L.Layer> { return L.succeed(HttpServerConfig)(config) } export interface HttpServer { readonly server: http.Server readonly queue: UQueue } export const HttpServerTag = tag() export function HttpServer({ host, port }: HttpServerConfig): L.Layer> { return L.fromRawManaged( pipe( I.gen(function* (_) { const queue = yield* _(Q.makeUnbounded()) const runtime = yield* _(I.runtime()) const server = yield* _( I.succeedLazy(() => { return http.createServer((req, res) => { runtime.run_( I.gen(function* (_) { const reqRef = yield* _(Ref.make(req)) const resRef = yield* _(SRef.make(res)) yield* _(Q.offer_(queue, new HttpConnection(reqRef, resRef))) }) ) }) }) ) yield* _( I.async((k) => { function clean() { server.removeListener('error', onError) server.removeListener('listening', onDone) } function onError(error: Error) { clean() k(I.halt(error)) } function onDone() { clean() k(I.unit()) } server.listen(port, host).once('error', onError).once('listening', onDone) }) ) return { queue, server } }), M.bracket(({ queue, server }) => pipe( I.async((k) => { server.close((error) => { if (error) { k(I.halt(error)) } else { k(I.unit()) } }) }), I.apSecond(Q.shutdown(queue)) ) ), M.map(HttpServerTag.of) ) ) }