/*! * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to you under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ import { IMonad, IValueHolder, Optional } from "./Monad"; import { ICollector, IStreamDataSource, ITERATION_STATUS } from "./SourcesCollectors"; import { DomQuery } from "./DomQuery"; import { Config } from "./Config"; export type StreamMapper = (data: T) => IStreamDataSource; export type ArrayMapper = (data: T) => Array; export type IteratableConsumer = (data: T, pos?: number) => void | boolean; export type Reducable = (val1: T | V, val2: T) => V; export type Matchable = (data: T) => boolean; export type Mappable = (data: T) => R; export type Comparator = (el1: T, el2: T) => number; /** * Same for flatmap to deal with element -> stream mappings */ export declare class FlatMapStreamDataSource implements IStreamDataSource { mapFunc: StreamMapper; inputDataSource: IStreamDataSource; /** * the currently active stream * coming from an incoming element * once the end of this one is reached * it is swapped out by another one * from the next element */ activeDataSource: IStreamDataSource | null; walkedDataSources: Array>; _currPos: number; constructor(func: StreamMapper, parent: IStreamDataSource); hasNext(): boolean; private resolveActiveHasNext; lookAhead(cnt?: number): ITERATION_STATUS | S; private toDatasource; private resolveNextHasNext; next(): S | ITERATION_STATUS; reset(): void; current(): S | ITERATION_STATUS; } /** * Generic interface defining a stream */ export interface IStream { /** * Perform the operation fn on a single element in the stream at a time * then pass the stream over for further processing * This is basically an intermediate point in the stream * with further processing happening later, do not use * this method to gather data or iterate over all date for processing * (for the second case each has to be used) * * @param fn the processing function, if it returns false, further processing is stopped */ onElem(fn: IteratableConsumer): IStream; /** * Iterate over all elements in the stream and do some processing via fn * * @param fn takes a single element and if it returns false * then further processing is stopped */ each(fn: IteratableConsumer): void; /** * maps a single element into another via fn * @param fn function which takes one element in and returns another */ map(fn?: Mappable): IStream; /** * Takes an element in and returns a set of something * the set then is flatted into a single stream to be further processed * * @param fn */ flatMap(fn?: StreamMapper | ArrayMapper): IStream; /** * filtering, takes an element in and is processed by fn. * If it returns false then further processing on this element is skipped * if it returns true it is passed down the chain. * * @param fn */ filter(fn?: Matchable): IStream; /** * functional reduce... takes two elements in the stream and reduces to * one from left to right * * @param fn the reduction function for instance (val1,val2) => val1l+val2 * @param startVal an optional starting value, if provided the the processing starts with this element * and further goes down into the stream, if not, then the first two elements are taken as reduction starting point */ reduce(fn: Reducable, startVal: T | V): Optional; /** * returns the first element in the stream is given as Optional */ first(): Optional; /** * Returns the last stream element (note in endless streams without filtering and limiting you will never reach that * point hence producing an endless loop) */ last(): Optional; /** * returns true if there is at least one element where a call fn(element) produces true * * @param fn */ anyMatch(fn: Matchable): boolean; /** * returns true if all elmements produce true on a call to fn(element) * * @param fn */ allMatch(fn: Matchable): boolean; /** * returns true if no elmements produce true on a call to fn(element) * * @param fn */ noneMatch(fn: Matchable): boolean; /** * Collect the elements with a collector given * There are a number of collectors provided * * @param collector */ collect(collector: ICollector): any; /** * sort on the stream, this is a special case * of an endpoint, so your data which is fed in needs * to be limited otherwise it will fail * it still returns a stream for further processing * * @param comparator */ sort(comparator: Comparator): IStream; /** * Limits the stream to a certain number of elements * * @param end the limit of the stream */ limits(end: number): IStream; concat(...toAppend: Array>): IStream; /** * returns the stream collected into an array (90% use-case abbreviation */ value: Array; /** * returns the currently element selected in the stream */ current(): T | ITERATION_STATUS; /** * returns an observable of the given stream */ [Symbol.iterator](): Iterator; } /** * A simple typescript based reimplementation of streams * * This is the early eval version * for a lazy eval version check, LazyStream, which is api compatible * to this implementation, however with the benefit of being able * to provide infinite data sources and generic data providers, the downside * is, it might be a tad slower in some situations */ export declare class Stream implements IMonad>, IValueHolder>, IStream, IStreamDataSource { value: Array; _limits: number; private pos; constructor(...value: T[]); static of(...data: Array): Stream; /** * chunk-safe factory, takes the backing array directly instead of * spreading it into the call (Stream.of(...largeArray) overflows the * argument stack on large arrays) * * @param data the array to stream over */ static ofArr(data: Array): Stream; static ofAssoc(data: { [key: string]: T; }): Stream<[string, T]>; static ofDataSource(dataSource: IStreamDataSource): Stream; static ofDomQuery(value: DomQuery): Stream; static ofConfig(value: Config): Stream<[string, any]>; current(): T | ITERATION_STATUS; limits(end: number): Stream; /** * concat for streams, so that you can concat two streams together * @param toAppend */ concat(...toAppend: Array>): Stream; onElem(fn: (data: T, pos?: number) => void | boolean): Stream; each(fn: (data: T, pos?: number) => void | boolean): void; map(fn?: (data: T) => R): Stream; flatMap(fn?: (data: T) => R | Array): Stream; filter(fn?: (data: T) => boolean): Stream; reduce(fn: Reducable, startVal?: V | null): Optional; first(): Optional; last(): Optional; anyMatch(fn: Matchable): boolean; allMatch(fn: Matchable): boolean; noneMatch(fn: Matchable): boolean; sort(comparator: Comparator): IStream; collect(collector: ICollector): any; hasNext(): boolean; next(): T | ITERATION_STATUS; lookAhead(cnt?: number): T | ITERATION_STATUS; [Symbol.iterator](): Iterator; reset(): void; } /** * Lazy implementation of a Stream * The idea is to connect the intermediate * streams as datasources like a linked list * with reverse referencing and for special * operations like filtering flatmapping * have intermediate datasources in the list * with specialized functions. * * Sort of a modified pipe valve pattern * the streams are the pipes the intermediate * data sources are the valves * * We then can use passed in functions to control * the flow in the valves * * That way we can have a lazy evaluating stream * * So if an endpoint requests data * a callback trace goes back the stream list * which triggers an operation upwards * which sends data down the drain which then is processed * and filtered until one element hits the endpoint. * * That is repeated, until all elements are processed * or an internal limit is hit. * */ export declare class LazyStream implements IStreamDataSource, IStream, IMonad> { protected dataSource: IStreamDataSource; _limits: number; pos: number; static of(...values: Array): LazyStream; /** * chunk-safe factory, takes the backing array directly instead of * spreading it into the call (LazyStream.of(...largeArray) overflows * the argument stack on large arrays) * * @param data the array to stream over */ static ofArr(data: Array): LazyStream; static ofAssoc(data: { [key: string]: T; }): LazyStream<[string, T]>; static ofStreamDataSource(value: IStreamDataSource): LazyStream; static ofDomQuery(value: DomQuery): LazyStream; static ofConfig(value: Config): LazyStream<[string, any]>; constructor(parent: IStreamDataSource); hasNext(): boolean; next(): T | ITERATION_STATUS; lookAhead(cnt?: number): ITERATION_STATUS | T; current(): T | ITERATION_STATUS; reset(): void; /** * concat for streams, so that you can concat two streams together * @param toAppend */ concat(...toAppend: Array>): LazyStream; nextFilter(fn: Matchable): T | ITERATION_STATUS; limits(max: number): LazyStream; collect(collector: ICollector): any; onElem(fn: IteratableConsumer): LazyStream; filter(fn?: Matchable): LazyStream; map(fn?: Mappable): LazyStream; flatMap(fn?: ((data: T) => R | Array)): LazyStream; each(fn: IteratableConsumer): void; reduce(fn: Reducable, startVal?: T | V | null): Optional; last(): Optional; first(): Optional; anyMatch(fn: Matchable): boolean; allMatch(fn: Matchable): boolean; noneMatch(fn: Matchable): boolean; sort(comparator: Comparator): IStream; get value(): Array; [Symbol.iterator](): Iterator; private stop; private isOverLimits; }