import { heads } from "./heads"; import { aborts } from "./aborts"; import { chunks } from "./chunks"; import { chunkBys } from "./chunkBys"; import { debounces } from "./debounces"; import { filters } from "./filters"; import { flatMaps } from "./flatMaps"; import { flats } from "./flats"; import type { FlowSource } from "./FlowSource"; import { chunkIntervals } from "./chunkIntervals"; import { joins } from "./joins"; import { mapAddFields } from "./mapAddFields"; import { maps } from "./maps"; import { nils } from "./nils"; import { peeks } from "./peeks"; import { forEachs } from "./forEachs"; import { pMaps } from "./pMaps"; import { reduces } from "./reduces"; import { skips } from "./skips"; import { slices } from "./slices"; import { streamAsyncIterator } from "./streamAsyncIterator"; import { tails } from "./tails"; import { throttles } from "./throttles"; import { unwinds } from "./unwinds"; import type { FieldPathByValue } from "react-hook-form"; import type { Awaitable } from "./Awaitable"; import { uniqBys, uniqs } from "./uniqs"; import { limits } from "./limits"; import type { Unwinded } from "./Unwinded"; import { tees } from "./tees"; import { throughs } from "./throughs"; import { wseFrom, wseToArray, wseToPromise } from "./wse"; import { logs } from "./logs"; export type Reducer = (state: S, x: T, i: number) => Awaitable; export type snoflow = ReadableStream & AsyncIterableIterator & { // { [Symbol.asyncDispose]: () => Promise } & _type: T; readable: ReadableStream; writable: WritableStream; chunkBy(...args: Parameters>): snoflow; /** @deprecated use chunk*/ buffer(...args: Parameters>): snoflow; chunk(...args: Parameters>): snoflow; abort(...args: Parameters>): snoflow; through(fn: (s: snoflow) => snoflow): snoflow; // fn must fisrt through(stream: TransformStream): snoflow; through(stream?: TransformStream): snoflow; /** @deprecated use chunkInterval */ interval(...args: Parameters>): snoflow; chunkInterval(...args: Parameters>): snoflow; debounce(...args: Parameters>): snoflow; done: (pipeTo?: WritableStream) => Promise; end: (pipeTo?: WritableStream) => Promise; filter(fn: (x: T, i: number) => Awaitable): snoflow; // fn must fisrt filter(): snoflow>; flatMap(...args: Parameters>): snoflow; join(fn: (s: WritableStream) => void | any): snoflow; join(stream?: ReadableStream): snoflow; limit(...args: Parameters>): snoflow; head(...args: Parameters>): snoflow; map(...args: Parameters>): snoflow; log(...args: Parameters>): snoflow; peek(...args: Parameters>): snoflow; forEach(...args: Parameters>): snoflow; pMap(fn: (x: T, i: number) => Awaitable): snoflow; // fn must fisrt pMap(concurr: number, fn: (x: T, i: number) => Awaitable): snoflow; reduce(fn: (state: T | null, x: T, i: number) => Awaitable): snoflow; // fn must fisrt reduce(state: S, fn: Reducer): snoflow; skip: (...args: Parameters>) => snoflow; slice: (...args: Parameters>) => snoflow; tail: (...args: Parameters>) => snoflow; uniq: (...args: Parameters>) => snoflow; uniqBy: (...args: Parameters>) => snoflow; tees(fn: (s: snoflow) => void | any): snoflow; // fn must fisrt tees(stream?: WritableStream): snoflow; throttle: (...args: Parameters>) => snoflow; // prevents preventAbort: () => snoflow; preventClose: () => snoflow; preventCancel: () => snoflow; // to promises toNil: () => Promise; toArray: () => Promise; toCount: () => Promise; toFirst: () => Promise; toLast: () => Promise; toLog(...args: Parameters>): Promise; } & (T extends ReadonlyArray ? { flat: (...args: Parameters>) => snoflow; } : {}) & (T extends Record ? { unwind>>( key: K ): snoflow>; mapAddField: ( ...args: Parameters> ) => snoflow< Omit & { [key in K]: R; } >; } : {}) & (T extends string | Uint8Array ? { // to response toResponse: () => Response; text: () => Promise; json: () => Promise; blob: () => Promise; arrayBuffer: () => Promise; } : {}); export const snoflow = (src: FlowSource): snoflow => { const r: ReadableStream = src instanceof ReadableStream ? src : // : isXMLHTTPRequestBodyInit(src) // ? new Response(src).body! wseFrom(src); // @ts-ignore todo return Object.assign(r, { _type: null as T, get readable() { return r; }, // get writable() { // DIE(new Error("WIP")); // return new WritableStream(); // }, through: (...args: Parameters) => snoflow(r.pipeThrough(_throughs(...args))), mapAddField: ( ...args: Parameters // @ts-ignore ) => snoflow(r.pipeThrough(mapAddFields(...args))), chunkBy: (...args: Parameters) => snoflow(r.pipeThrough(chunkBys(...args))), buffer: (...args: Parameters) => snoflow(r.pipeThrough(chunks(...args))), chunk: (...args: Parameters) => snoflow(r.pipeThrough(chunks(...args))), abort: (...args: Parameters) => snoflow(r.pipeThrough(aborts(...args))), chunkInterval: (...args: Parameters) => snoflow(r.pipeThrough(chunkIntervals(...args))), /** @deprecated */ interval: (...args: Parameters) => snoflow(r.pipeThrough(chunkIntervals(...args))), debounce: (...args: Parameters) => snoflow(r.pipeThrough(debounces(...args))), done: (dst = nils()) => r.pipeTo(dst), end: (dst = nils()) => r.pipeTo(dst), filter: (...args: Parameters) => snoflow(r.pipeThrough(filters(...args))), flatMap: (...args: Parameters) => snoflow(r.pipeThrough(flatMaps(...args))), flat: ( ...args: Parameters // @ts-ignore ) => snoflow(r.pipeThrough(flats(...args))), join: (...args: Parameters) => snoflow(r.pipeThrough(joins(...args))), limit: (...args: Parameters) => snoflow(r.pipeThrough(limits(...args))), head: (...args: Parameters) => snoflow(r.pipeThrough(heads(...args))), map: (...args: Parameters) => snoflow(r.pipeThrough(maps(...args))), log: (...args: Parameters) => snoflow(r.pipeThrough(logs(...args))), uniq: (...args: Parameters) => snoflow(r.pipeThrough(uniqs(...args))), uniqBy: (...args: Parameters) => snoflow(r.pipeThrough(uniqBys(...args))), unwind: ( ...args: Parameters // @ts-ignore ) => snoflow(r.pipeThrough(unwinds(...args))), pMap: (...args: Parameters) => snoflow(r.pipeThrough(pMaps(...args))), peek: (...args: Parameters) => snoflow(r.pipeThrough(peeks(...args))), forEach: (...args: Parameters) => snoflow(r.pipeThrough(forEachs(...args))), reduce: (...args: Parameters) => snoflow(r.pipeThrough(reduces(...args))), skip: (...args: Parameters) => snoflow(r.pipeThrough(skips(...args))), slice: (...args: Parameters) => snoflow(r.pipeThrough(slices(...args))), tail: (...args: Parameters) => snoflow(r.pipeThrough(tails(...args))), tees: (...args: Parameters) => snoflow(r.pipeThrough(_tees(...args))), throttle: (...args: Parameters) => snoflow(r.pipeThrough(throttles(...args))), /** prevent downstream abort, ignore downstream errors */ preventAbort: () => snoflow(r.pipeThrough(throughs(), { preventAbort: true })), /** prevent upstream close */ preventClose: () => snoflow(r.pipeThrough(throughs(), { preventClose: true })), /** prevent upstream cancel, ignore upstream errors */ preventCancel: () => snoflow(r.pipeThrough(throughs(), { preventCancel: true })), // to promises toNil: () => r.pipeTo(nils()), toArray: () => wseToArray(r), toCount: async () => (await wseToArray(r)).length, toFirst: () => wseToPromise(snoflow(r).limit(1)), toLast: () => wseToPromise(snoflow(r).tail(1)), toLog: (...args: Parameters>) => snoflow(r.pipeThrough(logs(...args))).done(), // as response (only ReadableStream) toResponse: (init?: ResponseInit) => new Response(r, init), text: (init?: ResponseInit) => new Response(r, init).text(), json: (init?: ResponseInit) => new Response(r, init).json(), blob: (init?: ResponseInit) => new Response(r, init).blob(), arrayBuffer: (init?: ResponseInit) => new Response(r, init).arrayBuffer(), // as iterator // [Symbol.asyncDispose]: async () => await r.pipeTo(nils()), [Symbol.asyncIterator]: streamAsyncIterator, }); }; export const _tees: { (fn: (s: snoflow) => void | any): TransformStream; (stream?: WritableStream): TransformStream; } = (arg) => { if (!arg) return new TransformStream(); if (arg instanceof WritableStream) return tees((s) => s.pipeTo(arg)); const fn = arg; const { writable, readable } = new TransformStream(); const [a, b] = readable.tee(); // @ts-ignore fn(snoflow(a)); return { writable, readable: b }; }; export const _throughs: { (stream?: TransformStream): TransformStream; (stream: TransformStream): TransformStream; (fn: (s: snoflow) => FlowSource): TransformStream; } = (arg: any) => { if (!arg) return new TransformStream(); if (typeof arg !== "function") return throughs((s) => s.pipeThrough(arg)); const fn = arg; const { writable, readable } = new TransformStream(); return { writable, readable: snoflow(fn(snoflow(readable))) }; };