import { Source } from "../activities/activity"; import { each } from "../activities/each"; export type AccessorT = (row: T, currentIndex: number) => U; export interface ObserverFactory { (): Observer; } export interface Observer { observe(r: T, idx: number): void; peek(): U; } export function Accessor(fof: ObserverFactory, accesor: AccessorT): Observer { const s = fof(); return { observe: (_: T, i: number) => { s.observe(accesor(_, i), i); }, peek: s.peek }; } // This is an pass through activity so a FlowObserver can be inserted into a pipeline --- export function sensor(s: Observer) { return each((r, i) => s.observe(r, i)); } // This converts a FlowObserver to an Activity --- export function activity(s: Observer) { return function* (source: Source) { let i = -1; for (const row of source) { s.observe(row, ++i); } yield s.peek(); }; } // This converts a FlowObserver to an ScalarActivity --- export function scalar(s: Observer) { return function (source: Source) { let i = -1; for (const row of source) { s.observe(row, ++i); } return s.peek(); }; }