VimUnDoWyn,=ajkS]"O*VnGVw N`4_N_Pp5_ N_Pp function limit(source, number) {5_   GvGN_P qH * Internal utility function that takes a `source` function and optional5_  GvGN_Prq5_  GvGN_PrEInternal utility function that takes a `source` function and optional5_  GvGN_Pr*/5_FvFN_P rG * `number` arguments and returns a function that calls `source` with a5_ FvFN_P  * 5_ FvFN_P /**5_   FvFN_P  * 5_   FvFN_Po */5_ F  FvFN_PoH `number` arguments and returns a function that calls `source` with a*/5_    FvFN_P pG * given arguments only first `number` of times it's called. Useful for5_   FvFN_Pp */5_   ?v?N_P q@ * wrapping callbacks that must be called only ones for example.5_  ?v?N_Pq */5_   ?v?N_P  * 5_   ?v?N_P  * 5_   ?v?N_P  */5_  ?v?N_Pn5_"vN_Po function normalize(source) {& return function stream(next, stop) {' var stopped = false, reading = true( source(function onElement(element) {( if (!stopped && false !== reading)( return (reading = next(element)) }, function onStop(reason) {; return stopped ? nil : (stopped = true, stop(reason)) }) }}5_vN_P5_vN_P5_vN_QTcK * Utility function that returns streams of elements of given `array`. ThisI * function is dangerous as array mutations will have side effects on theJ * returned stream so it should be used with a care, reader of such streamL * **MUST NOT** mutate source array, this is also reason why we don't export * this function.5_vN_QU/**5_vN_QU * 5_vN_QV */5_ vN_QX]\ function streamArray(elements) {5_vN_Q[b * this function.b * a5_vN_Q_a * this function.*/5_ vN_QaaL * **MUST NOT** mutate source array, this is also reason why we don't export5_! vN_QcaJ * returned stream so it should be used with a care, reader of such stream5_ "!vN_QeaI * function is dangerous as array mutations will have side effects on the5_!#"vN_QgaHUtility function that returns streams of elements of given `array`. This5_"$#vN_Qia5_#%$vN_Ql b& return function stream(next, stop) {5_$&%vN_Qnb this function.*/5_%'&vN_QobJ **MUST NOT** mutate source array, this is also reason why we don't export5_&('vN_QpbH returned stream so it should be used with a care, reader of such stream5_')(vN_QqbG function is dangerous as array mutations will have side effects on the5_(*)**.vN_Q|)+b/**$ * Creates stream of given elements. * @examples# * list('a', 2, {})(console.log) */5_)+****vN_Q})*5_*,+***vN_Q*0^ ),]function list() {5_+-,+**vN_Q)+bfunction list() {/***,b /**5_,.-***vN_Q)+afunction list() {/**5_-/.+**vN_Q*,a$ * Creates stream of given elements.5_.0/,**vN_Q+-a * @examples5_/10-**vN_Q,.a# * list('a', 2, {})(console.log)5_021+**vN_Q*,a" Creates stream of given elements.5_132,**vN_Q+-a @examples5_243-**vN_Q,.a list('a', 2, {})(console.log)5_354.**vN_Q.0a5_465.**vN_Q,.b! list('a', 2, {})(console.log)*/-/b */5_576-!**vN_Q,/a# list('a', 2, {})(console.log)*/5_6875557v7N_Q46b8 * Creates empty stream. This is equivalent of `list()`.5_7984557v7N_Q34/*5_8:94447v7N_Q34 * 5_9;:4447v7N_Q34 */5_:<;4447v7N_Q46_5_;=<4447v7N_Q 46a 36`function empty() {5_<>=;;MvN_Q:<a/**# * Returns stream of mapped values. * @param {Function} lambda# * function that maps each value * @param {Function} input * source stream to be mapped * @examples: * var stream = list({ name: 'foo' }, { name: 'bar' })D * var names = map(function(value) { return value.name }, stream) * names(console.log) * // 'foo' * // 'bar'! * var numbers = list(1, 2, 3)N * var mapped = map(function onEach(number) { return number * 2 }, numbers) * mapped(console.log) * // 2 * // 4 * // 6 */5_=?>;;;vN_Q:;5_>@?;;;vN_Q;OO ;=N5_?A@<;;vN_Q:<afunction map(lambda, source) {/**;=a /**5_@BA;!;;vN_Q:<`!function map(lambda, source) {/**5_ACB<;;vN_Q:=` function map(lambda, source) {/*"* Returns stream of mapped values.;=`# * Returns stream of mapped values.5_BDC=;;vN_Q<>` * @param {Function} lambda5_CED>;;vN_Q=?`# * function that maps each value5_DFE?;;vN_Q>@` * @param {Function} input5_EGF@;;vN_Q?A` * source stream to be mapped5_FHGA;;vN_Q@B` * @examples5_GIHB;;vN_QAC`: * var stream = list({ name: 'foo' }, { name: 'bar' })5_HJIC;;vN_QBD`D * var names = map(function(value) { return value.name }, stream)5_IKJD;;vN_QCE` * names(console.log)5_JLKE;;vN_QDF` * // 'foo'5_KMLF;;vN_QEG` * // 'bar'5_LNMG;;vN_QFH`! * var numbers = list(1, 2, 3)5_MONH;;vN_QGI`N * var mapped = map(function onEach(number) { return number * 2 }, numbers)5_NPOI;;vN_QHJ` * mapped(console.log)5_OQPJ;;vN_QIK` * // 25_PRQK;;vN_QJL` * // 45_QSRL;;vN_QKM` * // 65_RTSM;;vN_Q MO`5_SUTM;;vN_Q KMa // 6*/LNa */5_TVU  vN_R `G Internal utility function that takes a `source` function and optionalF `number` arguments and returns a function that calls `source` with aF given arguments only first `number` of times it's called. Useful forA wrapping callbacks that must be called only ones for example.*/5_UWVvN_R`J Utility function that returns streams of elements of given `array`. ThisH function is dangerous as array mutations will have side effects on theI returned stream so it should be used with a care, reader of such streamK **MUST NOT** mutate source array, this is also reason why we don't export this function.*/5_VXW++-vN_R*.`# Creates stream of given elements. @examples! list('a', 2, {})(console.log)5_W_X.+-vN_R-/` */5_X`Y_5+-vN_R;46`9 Creates empty stream. This is equivalent of `list()`.*/5__a`<<LvN_R@;M`" Returns stream of mapped values. @param {Function} lambda" function that maps each value @param {Function} input source stream to be mapped @examples9 var stream = list({ name: 'foo' }, { name: 'bar' })C var names = map(function(value) { return value.name }, stream) names(console.log) // 'foo' // 'bar' var numbers = list(1, 2, 3)M var mapped = map(function onEach(number) { return number * 2 }, numbers) mapped(console.log) // 2 // 4 // 6*/5_`baL<LvN_REKN` // 6*/5_acbWWevN_RNVXa/**% * Returns stream of filtered values. * @param {Function} lambda" * function that filters values * @param {Function} source" * source stream to be filtered * @examples* * var numbers = list(10, 23, 2, 7, 17)+ * var digits = filter(function(value) {' * return value >= 0 && value <= 9 * }, numbers) * digits(console.log) * // 2 * // 7 */5_bdcWWWvN_ROVW5_cedWWWvN_RPWgS WYR5_dfefWWvN_RRega */5_egfXWWvN_RWWYa /**5_fhgXWWvN_RWWYa/**5_gihYWWvN_RYXZa% * Returns stream of filtered values.5_hjiZWWvN_R[Y[a * @param {Function} lambda5_ikj[WWvN_R]Y\a @param {Function} lambdafunction that filters valuesZ\a" * function that filters values5_jlk\WWvN_Rb[]a * @param {Function} source5_kmlZWWvN_ReY[a @param {Function} lambda5_lnm[WWvN_RfZ\a function that filters values5_mon\WWvN_Rh[]a @param {Function} source5_npo]WWvN_Rj\^a" * source stream to be filtered5_oqp^WWvN_Rm]_a * @examples5_prq_WWvN_Rp^`a* * var numbers = list(10, 23, 2, 7, 17)5_qsr`WWvN_Rw_aa+ * var digits = filter(function(value) {5_rtsaWWvN_Ry`ba' * return value >= 0 && value <= 95_sutbWWvN_R{aca * }, numbers)5_tvuaWWvN_R`ba! return value >= 0 && value <= 95_uwvcWWvN_Rbda * digits(console.log)5_vxwdWWvN_Rcea * // 25_wyxeWWvN_Rdfa * // 75_xzyfWWvN_R fha5_y{zppvN_Roqb/**# * Returns stream of reduced values * @param {Function} source * stream to reduce. * @param {Function} reducer * reducer function * @param initial * initial value * @examples! * var numbers = list(2, 3, 8)> * var sum = reduce(function onElement(previous, current) {( * return (previous || 0) + current * }, numbers) * sum(console.log) * // 13 */5_z|{pppvN_Rop5_{}|pppvN_RpS prR5_|~}qppvN_Rprb /**5_}~qppvN_Rprb/**5_~qppvN_Roqb+function reduce(reducer, source, initial) {/*prb/*5_XppvN_RVXa!function filter(lambda, source) {/*WYa/*5_poovN_Roq`# * Returns stream of reduced values5_qoovN_Rpr` * @param {Function} source5_roovN_Rqs` * stream to reduce.5_soovN_Rrt` * @param {Function} reducer5_toovN_Rsu` * reducer function5_uoovN_Rtv` * @param initial5_voovN_Ruw` * initial value5_woovN_Rvx` * @examples5_xoovN_Rwy`! * var numbers = list(2, 3, 8)5_yoovN_Rxz`> * var sum = reduce(function onElement(previous, current) {5_zoovN_Ry{`( * return (previous || 0) + current5_{oovN_Rz|` * }, numbers)5_|oovN_R{}` * sum(console.log)5_}oovN_R|~` * // 135_~oovN_R}` */5_~oovN_R}`5_~oovN_R ~`5_vN_Ra/**L * This function returns stream of tuples, where the n-th tuple contains theI * n-th element from each of the argument streams. The returned stream isE * truncated in length to the length of the shortest argument stream. * @params {Function}" * source steams to be combined * @examples% * var a = list([ 'a', 'b', 'c' ])" * var b = list([ 1, 2, 3, 4 ])/ * var c = list([ '!', '@', '#', '$', '%' ]) * var abc = zip(a, b, c) * abs(console.log) * // [ 'a', 1, '!' ] * // [ 'b', 2, '@' ] * // [ 'c', 3, '#' ] */5_vN_R5_vN_RR Q5_vN_Ra5_vN_Rbvar zip = (function Zip() {/**b /**5_vN_Ravar zip = (function Zip() {/**5_vN_RaL * This function returns stream of tuples, where the n-th tuple contains the5_vN_RaI * n-th element from each of the argument streams. The returned stream is5_vN_RaE * truncated in length to the length of the shortest argument stream.5_vN_Ra * @params {Function}5_vN_Ra" * source steams to be combined5_vN_Ra * @examples5_vN_Ra% * var a = list([ 'a', 'b', 'c' ])5_vN_Ra" * var b = list([ 1, 2, 3, 4 ])5_vN_Ra/ * var c = list([ '!', '@', '#', '$', '%' ])5_vN_Ra * var abc = zip(a, b, c)5_vN_Ra * abs(console.log)5_vN_Ra * // [ 'a', 1, '!' ]5_vN_Ra * // [ 'b', 2, '@' ]5_vN_Ra * // [ 'c', 3, '#' ]5_vN_Ra */5_vN_Ra5_v N_Ra/**K * Returns stream containing first `n` elements of given `source`, on which * given lambda returns `true`. * * @param {Function} lambda# * function that takes elements. * @param {Function} source* * source stream to take elements from. * @examples* * var numbers = list(10, 23, 2, 7, 17)) * var digits = take(function(value) { * return value >= 10 * }, numbers) * digits(console.log) * // 10 * // 23 */function take(lambda, source) {5_v N_RQ P5_v N_Rbfunction take(lambda, source) {/**b /**5_"v N_Ra"function take(lambda, source) {/**5_v N_RaK * Returns stream containing first `n` elements of given `source`, on which5_v N_Ra * given lambda returns `true`.5_v N_Ra *5_v N_Ra * @param {Function} lambda5_v N_Ra# * function that takes elements.5_v N_Ra * @param {Function} source5_v N_Ra* * source stream to take elements from.5_v N_Ra * @examples5_v N_Ra* * var numbers = list(10, 23, 2, 7, 17)5_v N_Ra) * var digits = take(function(value) {5_v N_Ra * return value >= 105_v N_Ra * }, numbers)5_v N_Ra * digits(console.log)5_v N_Ra * // 105_v N_Ra * // 235_v N_Ra */5_vN_S a/**M * Returns a stream consisting of the given `source` stream elements startingL * form the `start` zero-based index till `end` zero-based index element. If5 * `end` is not passed all elements will be included. */$function slice(source, start, end) {5_vN_S \5_vN_S ]5_vN_Sb$function slice(source, start, end) {/**b/**5_'vN_Sa'function slice(source, start, end) {/**5_vN_SaM * Returns a stream consisting of the given `source` stream elements starting5_vN_SaL * form the `start` zero-based index till `end` zero-based index element. If5_vN_Sa5 * `end` is not passed all elements will be included.5_vN_Sa */5_vN_Sa5_vN_S#a /**K * Returns a stream containing only first `number` of elements of the givenM * `source` stream or all elements, if `source` stream has less than `number`= * of elements. If `number` is not passed it defaults to `1`. * @param {Function} source * source stream * @param {Number} number=1, * number of elements to take from stream */5_vN_S%5_vN_S&Y X5_vN_S)a /**5_vN_S*a/**5_vN_S.afunction head(source, number) {/*a/*5_vN_S0`K * Returns a stream containing only first `number` of elements of the given5_vN_S2`M * `source` stream or all elements, if `source` stream has less than `number`5_vN_S5`= * of elements. If `number` is not passed it defaults to `1`.5_vN_S6` * @param {Function} source5_vN_S6` * source stream5_vN_S7` * @param {Number} number=15_vN_S7`, * number of elements to take from stream5_vN_S8` */5_vN_S:`5_vN_S<`5_vN_S\aexports.head = head5_*vN_Sc)+aexports.tail = tail5_&v N_Sia /**N * Returns a stream equivalent to given `source` stream, except that the firstN * `number` of elements are omitted. If `source` stream has less than `number`O * of elements, then empty stream is returned. `number` defaults to `1` if it's * not passed. * @param {Function} source& * source stream to return tail of. * @param {Number} number=1. * Number of elements that will be omitted. */function tail(source, number) {5_v N_Sj)X W5_v N_Snbfunction tail(source, number) {/**b /**5_"v N_Soa"function tail(source, number) {/**5_v N_SqaN * Returns a stream equivalent to given `source` stream, except that the first5_v N_Ss aN * `number` of elements are omitted. If `source` stream has less than `number`5_ v N_Su!aO * of elements, then empty stream is returned. `number` defaults to `1` if it's5_!v N_Su "a * not passed.5_"v N_Sv!#a * @param {Function} source5_#v N_Sv"$a& * source stream to return tail of.5_$v N_Sw#%a * @param {Number} number=15_%v N_Sw$&a. * Number of elements that will be omitted.5_&v N_Sy%'a */5_,,8v N_S+-a/**O * Returns a stream that contains all elements of each stream in the order theyL * appear in the original streams. If any of the `source` streams is stoppedM * with an error than it propagates to the resulting stream and it also get's * stopped. * @examples5 * var stream = append(list(1, 2), list('a', 'b')) * stream(console.log) * // 1 * // 2 * // 'a' * // 'b' */function append() {5_,,,v N_S,;U ,.T5_-,,v N_S+-bfunction append() {/**,.b /**5_,,,v N_S+-afunction append() {/**5_-,,v N_S,.aO * Returns a stream that contains all elements of each stream in the order they5_.,,v N_S-/aL * appear in the original streams. If any of the `source` streams is stopped5_/,,v N_S.0aM * with an error than it propagates to the resulting stream and it also get's5_0,,v N_S/1a * stopped.5_1,,v N_S02a * @examples5_2,,v N_S13a5 * var stream = append(list(1, 2), list('a', 'b'))5_3,,v N_S24a * stream(console.log)5_4,,v N_S35a * // 15_5,,v N_S46a * // 25_6,,v N_S57a * // 'a'5_7,,v N_S68a * // 'b'5_8,,v N_S79a */5_8,,v N_S79a5_G^GvN_SFHa/**J * Returns a stream that contains all elements of each stream of the givenP * source stream. `source` is stream of streams whose elements will be containedK * by the resulting stream. Any error from any stream will propagate to theP * resulting stream. Stream is stopped when all streams from `source` and sourceI * itself is ended. Elements of the stream are position in order they areL * delivered so it could happen that elements from second stream will appear2 * before or between elements of the first stream. * @param {Function} sourceL * Stream of streams whose elements will be contained by resulting stream * @examples" * function async(next, stop) { * setTimeout(function() { * next('async') * stop() * }, 10) * }4 * var stream = merge(list(async, list(1, 2, 3))) * stream(console.log) * // 1 * // 2 * // 3 * // 'async' */5_GGGvN_SFG5_GGGvN_SG`J GII5_HGGvN_SGIa /**5_HGGvN_SGIa/**5_HGGvN_SFHafunction merge(source) {/*GIa/*5_HGGvN_SGI`J * Returns a stream that contains all elements of each stream of the given5_IGGvN_SHJ`P * source stream. `source` is stream of streams whose elements will be contained5_JGGvN_SIK`K * by the resulting stream. Any error from any stream will propagate to the5_KGGvN_SJL`P * resulting stream. Stream is stopped when all streams from `source` and source5_LGGvN_SKM`I * itself is ended. Elements of the stream are position in order they are5_MGGvN_SLN`L * delivered so it could happen that elements from second stream will appear5_NGGvN_SMO`2 * before or between elements of the first stream.5_OGGvN_SNP` * @param {Function} source5_ PGGvN_SOQ`L * Stream of streams whose elements will be contained by resulting stream5_  QGGvN_SPR` * @examples5_   RGGvN_SQS`" * function async(next, stop) {5_   SGGvN_SRT` * setTimeout(function() {5_   TGGvN_SSU` * next('async')5_  UGGvN_STV` * stop()5_ VGGvN_SUW` * }, 10)5_WGGvN_SVX` * }5_XGGvN_SWY`4 * var stream = merge(list(async, list(1, 2, 3)))5_YGGvN_SXZ` * stream(console.log)5_ZGGvN_SY[` * // 15_[GGvN_SZ\` * // 25_\GGvN_S[]` * // 35_]GGvN_S\^` * // 'async'5_^GGvN_S]_` */5_^GGvN_S^``5_ssyvN_Srta/**% * Utility function to print streams. * @param {Function} stream * stream to print * @examples# * print(list('Hello', 'world')) */function print(stream) {5_sssvN_Ss|[ suZ5_tssvN_Srtbfunction print(stream) {/**sub /**5_sssvN_Srtafunction print(stream) {/**5_tssvN_Ssua% * Utility function to print streams.5_ussvN_Stva * @param {Function} stream5_vssvN_Suwa * stream to print5_ wssvN_Svxa * @examples5_! xssvN_Swya# * print(list('Hello', 'world'))5_ %!yssvN_Sxza */5_!&"%vN_T a@/**H * Returns a stream equivalent to a given `source`, with difference thatK * all the consumers will start reading it from the point it's at the givenM * moment. This is useful with streams such as user generated events (clicks,M * keypress, etc..) where multiple stream readers might need to read from the, * same source. In other words, this is yourK * [pub / sub](http://en.wikipedia.org/wiki/Publish/subscribe) for streams. * @param {Function} source; * Stream whose elements get published to a subscribers. * @return {Function}5 * Stream that multiple subscribers can read from. * @examples" * function range(start, end) {, * return function stream(next, stop) { * var number = start - 1( * setTimeout(function onNext() { * if (++number >= end) * return stop()' * if (false !== next(number))# * setTimeout(onNext, 2) * }, 2) * } * } * function printer(index) {' * return function print(stream) {> * stream(console.log.bind(console, '#' + index + '>'),9 * console.log.bind(console, '<#' + index)) * } * } * var numbers = range(1, 5) * printer(1)(numbers)8 * setTimeout(function () { printer(2)(numbers) }, 5) *. * // Output will look something like this: * #1> 1 * #1> 2 * #1> 3 * #2> 1 * #1> 4 * #2> 2 * <#1 * #2> 3 * #2> 4 * <#2 *L * // If you noticed second print started form the first `1` element. Now* * // lets do similar thing with a hub. *$ * var numbers = hub(range(1, 5)) * printer(1)(numbers)8 * setTimeout(function () { printer(2)(numbers) }, 5) */ * // In this case output will be different: * * #1> 1 * #1> 2 * #1> 3 * #1> 4 * #2> 4 * <#1 * <#2 *L * // Notice this time second print only printed only following elements. */5_%'&vN_T# "5_&('vN_Tbfunction hub(source) {/**b /**5_')(vN_Tafunction hub(source) {/**5_(*)vN_TaH * Returns a stream equivalent to a given `source`, with difference that5_)+*vN_TaK * all the consumers will start reading it from the point it's at the given5_*,+vN_TaM * moment. This is useful with streams such as user generated events (clicks,5_+-,vN_TaM * keypress, etc..) where multiple stream readers might need to read from the5_,.-vN_Ta, * same source. In other words, this is your5_-/.vN_TaK * [pub / sub](http://en.wikipedia.org/wiki/Publish/subscribe) for streams.5_.0/vN_Ta * @param {Function} source5_/10vN_Ta; * Stream whose elements get published to a subscribers.5_021vN_Ta * @return {Function}5_132vN_Ta5 * Stream that multiple subscribers can read from.5_243vN_T a * @examples5_354vN_T a" * function range(start, end) {5_465vN_T!a, * return function stream(next, stop) {5_576vN_T!a * var number = start - 15_687vN_T"a( * setTimeout(function onNext() {5_798vN_T"a * if (++number >= end)5_8:9vN_T#a * return stop()5_9;:vN_T#a' * if (false !== next(number))5_:<;vN_T$a# * setTimeout(onNext, 2)5_;=<vN_T$a * }, 2)5_<>=vN_T%a * }5_=?>vN_T%a * }5_>@?vN_T&a * function printer(index) {5_?A@vN_T&a' * return function print(stream) {5_@BAvN_T'a> * stream(console.log.bind(console, '#' + index + '>'),5_ACBvN_T'a9 * console.log.bind(console, '<#' + index))5_BDCvN_T(a * }5_CEDvN_T(a * }5_DFEvN_T)a * var numbers = range(1, 5)5_EGFvN_T*a * printer(1)(numbers)5_FHGvN_T*a8 * setTimeout(function () { printer(2)(numbers) }, 5)5_GIHvN_T+a *5_HJIvN_T+a. * // Output will look something like this:5_IKJvN_T,a * #1> 15_JLKvN_T,a * #1> 25_KMLvN_T-a * #1> 35_LNMvN_T-a * #2> 15_MONvN_T.a * #1> 45_NPOvN_T.a * #2> 25_OQPvN_T/a * <#15_PRQvN_T/a * #2> 35_QSRvN_T0a * #2> 45_RTSvN_T0a * <#25_SUTvN_T1a *5_TVUvN_T1aL * // If you noticed second print started form the first `1` element. Now5_UWVvN_T2a* * // lets do similar thing with a hub.5_VXWvN_T2a *5_WYXvN_T3a$ * var numbers = hub(range(1, 5))5_XZYvN_T4a * printer(1)(numbers)5_Y[ZvN_T4a8 * setTimeout(function () { printer(2)(numbers) }, 5)5_Z\[vN_T5a *5_[]\vN_T5a/ * // In this case output will be different:5_\^]vN_T6a *5_]_^vN_T6a * #1> 15_^`_vN_T7a * #1> 25__a`vN_T7a * #1> 35_`bavN_T8a * #1> 45_acbvN_T8a * #2> 45_bdcvN_T9a * <#15_cedvN_T9a * <#25_dfevN_T:a *5_egfvN_T;aL * // Notice this time second print only printed only following elements.5_fhgvN_T;a */5_gihvN_T>a5_hjivN_T@a*l5_ikjvN_TUb/**O * Returns a stream equivalent to a given `source` by caching all it's elementsN * into memory for faster reads. This is useful with `source` streams that areL * expensive to compute (requires access to the network for example). Use itD * carefully though, do not cache infinite streams and be aware thatB * asynchronous stream when cached will yield some or all elementsL * synchronously. Also be aware, that unlike other functions, this is greedyF * which means that it will start reading `source` stream immediately. * @param {Function} source * source stream to cache. * @returns {Function}B * cached equivalent of source that can be read multiple times. */function cache(source) {5_jlkvN_TVV U5_kmlvN_TYc /**5_lnmvN_T\cfunction cache(source) {/**c/**5_monvN_T]bfunction cache(source) {/**5_npovN_T^bO * Returns a stream equivalent to a given `source` by caching all it's elements5_oqpvN_TabN * into memory for faster reads. This is useful with `source` streams that are5_prqvN_TabL * expensive to compute (requires access to the network for example). Use it5_qsrvN_TbbD * carefully though, do not cache infinite streams and be aware that5_rtsvN_TbbB * asynchronous stream when cached will yield some or all elements5_sutvN_TcbL * synchronously. Also be aware, that unlike other functions, this is greedy5_tvuvN_TcbF * which means that it will start reading `source` stream immediately.5_uwvvN_Tdb * @param {Function} source5_vxwvN_Tdb * source stream to cache.5_wyxvN_Teb * @returns {Function}5_xzyvN_TebB * cached equivalent of source that can be read multiple times.5_y{zvN_Tfb */5_z|{vN_Tgb5_{}|vN_Trb /**N * Returns stream equivalent to a given `source` with a difference that it hasL * a state, meaning that if reader stops reading on n-th element next readerM * will continue reading from n-th element. Purpose is to wrap any other typeL * of stream, such that each element of `source` can be read only once. ThisO * allow element distribution across different consumers with a help of `head`, * `tail` and similars. * @param {Function} source0 * source stream to create stack stream from. * @returns {Function}< * stack equivalent of source that can be read only once. */5_|~}vN_Ts5_}~vN_Tt W V5_~vN_Twb /**5_vN_Twb/**5_vN_T|bfunction stack(source) {/*b/*5_vN_TaN * Returns stream equivalent to a given `source` with a difference that it has5_vN_TaL * a state, meaning that if reader stops reading on n-th element next reader5_vN_TaM * will continue reading from n-th element. Purpose is to wrap any other type5_vN_TaL * of stream, such that each element of `source` can be read only once. This5_vN_TaO * allow element distribution across different consumers with a help of `head`,5_vN_Ta * `tail` and similars.5_vN_Ta * @param {Function} source5_vN_Ta0 * source stream to create stack stream from.5_vN_Ta * @returns {Function}5_vN_Ta< * stack equivalent of source that can be read only once.5_vN_T a */5_vN_T a5_//EvN_T.0b/**J * Returns a stream that contains all elements of each stream of the givenP * source stream. `source` is stream of streams whose elements will be containedK * by the resulting stream. Any error from any stream will propagate to theP * resulting stream. Stream is stopped when all streams from `source` and sourceO * itself are ended. Elements of the stream are position in order, all elements6 * of first stream, all elements of second stream etc. * @param {Function} sourceL * Stream of streams whose elements will be contained by resulting stream * @examples" * function async(next, stop) { * setTimeout(function() { * next('async') * stop() * }, 10) * }3 * var stream = join(list(async, list(1, 2, 3))) * stream(console.log) * // 'async' * // 1 * // 2 * // 3 */function join(source) {5_///vN_T/HL /1K5_0//vN_T.0cfunction join(source) {/**/1c /**5_///vN_T.0bfunction join(source) {/**5_0//vN_T/1bJ * Returns a stream that contains all elements of each stream of the given5_1//vN_T02bP * source stream. `source` is stream of streams whose elements will be contained5_2//vN_T13bK * by the resulting stream. Any error from any stream will propagate to the5_3//vN_T24bP * resulting stream. Stream is stopped when all streams from `source` and source5_4//vN_T35bO * itself are ended. Elements of the stream are position in order, all elements5_5//vN_T46b6 * of first stream, all elements of second stream etc.5_6//vN_T57b * @param {Function} source5_7//vN_T68bL * Stream of streams whose elements will be contained by resulting stream5_8//vN_T79b * @examples5_9//vN_T8:b" * function async(next, stop) {5_://vN_T9;b * setTimeout(function() {5_;//vN_T:<b * next('async')5_<//vN_T;=b * stop()5_=//vN_T<>b * }, 10)5_>//vN_T=?b * }5_?//vN_T>@b3 * var stream = join(list(async, list(1, 2, 3)))5_@//vN_T?Ab * stream(console.log)5_A//vN_T@Bb * // 'async'5_B//vN_TACb * // 15_C//vN_TBDb * // 25_D//vN_TCEb * // 35_E//vN_TDFb */5_E//vN_TDFb5_b//vN_Tab5_%vN_U &aKReturns a stream equivalent to given `source` stream, except that the firstK`number` of elements are omitted. If `source` stream has less than `number`Lof elements, then empty stream is returned. `number` defaults to `1` if it's not passed.@param {Function} source# source stream to return tail of.@param {Number} number=1+ Number of elements that will be omitted.5_&%vN_U $&a- Number of elements that will be omitted.*/%'a*/5_,,7vN_U+8` LReturns a stream that contains all elements of each stream in the order theyIappear in the original streams. If any of the `source` streams is stoppedJwith an error than it propagates to the resulting stream and it also get'sstopped. @examples2 var stream = append(list(1, 2), list('a', 'b')) stream(console.log) // 1 // 2 // 'a' // 'b'*/5_GG]vN_UF^`GReturns a stream that contains all elements of each stream of the givenMsource stream. `source` is stream of streams whose elements will be containedHby the resulting stream. Any error from any stream will propagate to theMresulting stream. Stream is stopped when all streams from `source` and sourceFitself is ended. Elements of the stream are position in order they areIdelivered so it could happen that elements from second stream will appear/before or between elements of the first stream.@param {Function} sourceI Stream of streams whose elements will be contained by resulting stream @examples function async(next, stop) { setTimeout(function() { next('async') stop() }, 10) }1 var stream = merge(list(async, list(1, 2, 3))) stream(console.log) // 1 // 2 // 3 // 'async'*/5_FG]vN_UeEH`function merge(source) {/*5_GH^vN_UgFHa /*5_^H^vN_Uk]_a */5_^H^vN_Um]_a **/5_GH^vN_UqFHa /**5_+H^vN_Uw*-afunction append() {/*5_,I_vN_Uy+-b /*5_8I_vN_U}79b */5_I_vN_Ub!function tail(source, number) {/*5_J`vN_Uc /*5_&-J`vN_U%(c/ Number of elements that will be omitted.*/5_KavN_Ud!function head(source, number) {/*5_LbvN_Ue /*5_vN_UeHReturns a stream containing only first `number` of elements of the givenJ`source` stream or all elements, if `source` stream has less than `number`:of elements. If `number` is not passed it defaults to `1`.@param {Function} source source stream@param {Number} number=1) number of elements to take from stream*/5_vN_Ue */5_/vN_U.0e /***5_;vN_U:<e ***/5_(vN_U')e ***/5_vN_U e /***5_vN_Ue ***/5_vN_Ue /***5_$vN_Ue&function slice(source, start, end) {/*5_vN_Uf /*5_vN_UfJReturns a stream consisting of the given `source` stream elements starting5_vN_UfIform the `start` zero-based index till `end` zero-based index element. If5_vN_Uf2`end` is not passed all elements will be included.5_vN_Uf*/5_vN_Uf!function take(lambda, source) {/*5_vN_Ug /*5_vN_UgHReturns stream containing first `n` elements of given `source`, on whichgiven lambda returns `true`.@param {Function} lambda function that takes elements.@param {Function} source' source stream to take elements from. @examples' var numbers = list(10, 23, 2, 7, 17)& var digits = take(function(value) { return value >= 10 }, numbers) digits(console.log) // 10 // 235_vN_Ug*/5_vN_UgIThis function returns stream of tuples, where the n-th tuple contains theFn-th element from each of the argument streams. The returned stream isBtruncated in length to the length of the shortest argument stream.@params {Function} source steams to be combined @examples" var a = list([ 'a', 'b', 'c' ]) var b = list([ 1, 2, 3, 4 ]), var c = list([ '!', '@', '#', '$', '%' ]) var abc = zip(a, b, c) abs(console.log) // [ 'a', 1, '!' ] // [ 'b', 2, '@' ] // [ 'c', 3, '#' ]5_vN_Ugvar zip = (function Zip() {/*5_vN_Uh /*5_vN_Uh*/5_o+vN_Unqh-function reduce(reducer, source, initial) {/*5_pvN_Uoqi /*5_qq~vN_Upi Returns stream of reduced values@param {Function} source stream to reduce.@param {Function} reducer reducer function@param initial initial value @examples var numbers = list(2, 3, 8); var sum = reduce(function onElement(previous, current) {% return (previous || 0) + current }, numbers) sum(console.log) // 135_q~vN_U ~i*/5_W!q~vN_UVYi#function filter(lambda, source) {/*5_XrvN_VWYj /*5_YrvN_VXZj"Returns stream of filtered values.5_ZZevN_VYfj @param {Function} lambda function that filters values@param {Function} source source stream to be filtered @examples& var numbers = list(10, 23, 2, 7, 17)' var digits = filter(function(value) {# return value >= 0 && value <= 9 }, numbers) digits(console.log) // 2 // 75_fZevN_V !egj*/5_;ZevN_V:=j function map(lambda, source) {/*5_<[fvN_V;=k /*5_==NvN_V<Ok Returns stream of mapped values.@param {Function} lambda function that maps each value@param {Function} input source stream to be mapped @examples7 var stream = list({ name: 'foo' }, { name: 'bar' })A var names = map(function(value) { return value.name }, stream) names(console.log) // 'foo' // 'bar' var numbers = list(1, 2, 3)K var mapped = map(function onEach(number) { return number * 2 }, numbers) mapped(console.log) // 2 // 4 // 6*/5_N=NvN_V""MOk */5_*=NvN_V'),kfunction list() {/*5_+>OvN_V(*,l /*5_,,.vN_V*+/l!Creates stream of given elements. @examples list('a', 2, {})(console.log)5_/,.vN_V,#.0l*/5_5,.vN_V147lfunction empty() {/*5_6,.vN_V257m /*5_7,.vN_V468m7Creates empty stream. This is equivalent of `list()`.*/5_7,.vN_V479n 79m5_79,.vN_V8$68n9 Creates empty stream. This is equivalent of `list()`.*/5_ ,.vN_V?n"function streamArray(elements) {/*5_-/vN_V@o /*5_vN_VCoHUtility function that returns streams of elements of given `array`. ThisFfunction is dangerous as array mutations will have side effects on theGreturned stream so it should be used with a care, reader of such streamI**MUST NOT** mutate source array, this is also reason why we don't exportthis function.*/5_vN_VH% o this function.*/5_ vN_VN p"function limit(source, number) {/*5_ vN_VO q /*5_vN_VR qEInternal utility function that takes a `source` function and optionalD`number` arguments and returns a function that calls `source` with aDgiven arguments only first `number` of times it's called. Useful for?wrapping callbacks that must be called only ones for example.*/5_?vN_VT&qA wrapping callbacks that must be called only ones for example.*/5_ovN_Vnnpr ***/5_XvN_VqWYr /***5_vN_Vurfunction print(stream) {/*5_vN_Vvs /*5_vN_Vxs"Utility function to print streams.@param {Function} stream stream to print @examples print(list('Hello', 'world'))5_vN_V{'s*/5_vN_Vsfunction hub(source) {/*5_vN_Vt /*5_vN_Vt>EReturns a stream equivalent to a given `source`, with difference thatHall the consumers will start reading it from the point it's at the givenJmoment. This is useful with streams such as user generated events (clicks,Jkeypress, etc..) where multiple stream readers might need to read from the)same source. In other words, this is yourH[pub / sub](http://en.wikipedia.org/wiki/Publish/subscribe) for streams.@param {Function} source8 Stream whose elements get published to a subscribers.@return {Function}2 Stream that multiple subscribers can read from. @examples function range(start, end) {) return function stream(next, stop) { var number = start - 1% setTimeout(function onNext() { if (++number >= end) return stop()$ if (false !== next(number)) setTimeout(onNext, 2) }, 2) } } function printer(index) {$ return function print(stream) {; stream(console.log.bind(console, '#' + index + '>'),6 console.log.bind(console, '<#' + index)) } } var numbers = range(1, 5) printer(1)(numbers)5 setTimeout(function () { printer(2)(numbers) }, 5)+ // Output will look something like this: #1> 1 #1> 2 #1> 3 #2> 1 #1> 4 #2> 2 <#1 #2> 3 #2> 4 <#2I // If you noticed second print started form the first `1` element. Now' // lets do similar thing with a hub.! var numbers = hub(range(1, 5)) printer(1)(numbers)5 setTimeout(function () { printer(2)(numbers) }, 5), // In this case output will be different: #1> 1 #1> 2 #1> 3 #1> 4 #2> 4 <#1 <#2I // Notice this time second print only printed only following elements.5_vN_V(t*/5_vN_Vtfunction cache(source) {/*5_vN_Vu /*5_vN_Vu LReturns a stream equivalent to a given `source` by caching all it's elementsKinto memory for faster reads. This is useful with `source` streams that areIexpensive to compute (requires access to the network for example). Use itAcarefully though, do not cache infinite streams and be aware that?asynchronous stream when cached will yield some or all elementsIsynchronously. Also be aware, that unlike other functions, this is greedyCwhich means that it will start reading `source` stream immediately.@param {Function} source source stream to cache.@returns {Function}? cached equivalent of source that can be read multiple times.5_vN_V)u*/5_vN_Vufunction stack(source) {/*5_vN_Vv /*5_vN_Vv KReturns stream equivalent to a given `source` with a difference that it hasIa state, meaning that if reader stops reading on n-th element next readerJwill continue reading from n-th element. Purpose is to wrap any other typeIof stream, such that each element of `source` can be read only once. ThisLallow element distribution across different consumers with a help of `head`,`tail` and similars.@param {Function} source- source stream to create stack stream from.@returns {Function}9 stack equivalent of source that can be read only once.5_vN_Vv*/5_DvN_VCFvfunction join(source) {/*5_EvN_VDFw /*5_FFZvN_VE[wGReturns a stream that contains all elements of each stream of the givenMsource stream. `source` is stream of streams whose elements will be containedHby the resulting stream. Any error from any stream will propagate to theMresulting stream. Stream is stopped when all streams from `source` and sourceLitself are ended. Elements of the stream are position in order, all elements3of first stream, all elements of second stream etc.@param {Function} sourceI Stream of streams whose elements will be contained by resulting stream @examples function async(next, stop) { setTimeout(function() { next('async') stop() }, 10) }0 var stream = join(list(async, list(1, 2, 3))) stream(console.log) // 'async' // 1 // 2 // 35_[FZvN_V*Z\w*/5_ FZvN_Xw'/* vim:set ts=2 sw=2 sts=2 expandtab */?/*jshint asi: true undef: true es5: true node: true devel: true' forin: false latedef: false *//*global define: true */!(typeof(define) !== "function" ? function($){ $(typeof(require) !== 'function' ? (function() { throw Error('require unsupported'); }) : require, typeof(exports) === 'undefined' ? this : exports); } : define)(function(require, exports) { 'use strict';"var nil // Shortcut for undefined. function limit(source, number) { /**G Internal utility function that takes a `source` function and optionalF `number` arguments and returns a function that calls `source` with aF given arguments only first `number` of times it's called. Useful for? wrapping callbacks that must be called only ones for example. **/ number = number || 1 return function limited() {G return number-- > 0 && source ? source.apply(this, arguments) : nil }} function streamArray(elements) { /**J Utility function that returns streams of elements of given `array`. ThisH function is dangerous as array mutations will have side effects on theI returned stream so it should be used with a care, reader of such streamK **MUST NOT** mutate source array, this is also reason why we don't export this function. **/& return function stream(next, stop) { var index = 0# while (index < elements.length)< // If stream reader interrupts reading we just return.9 if (false === next(elements[index++])) return falseK // Once all elements were yielded we stop a stream if such callback was // passed. if (stop) stop() }}function list() { /**# Creates stream of given elements. @examples! list('a', 2, {})(console.log) **/; return streamArray(Array.prototype.slice.call(arguments))}exports.list = listfunction empty() { /**7 Creates empty stream. This is equivalent of `list()`. **/> return function stream(next, stop) { return stop && stop() }}exports.empty = emptyfunction map(lambda, source) { /**" Returns stream of mapped values. @param {Function} lambda" function that maps each value @param {Function} input source stream to be mapped @examples9 var stream = list({ name: 'foo' }, { name: 'bar' })C var names = map(function(value) { return value.name }, stream) names(console.log) // 'foo' // 'bar' var numbers = list(1, 2, 3)M var mapped = map(function onEach(number) { return number * 2 }, numbers) mapped(console.log) // 2 // 4 // 6 **/& return function stream(next, stop) {( source(function onElement(element) {" return next(lambda(element)) }, stop) }}exports.map = map!function filter(lambda, source) { /**$ Returns stream of filtered values. @param {Function} lambda function that filters values @param {Function} source source stream to be filtered @examples( var numbers = list(10, 23, 2, 7, 17)) var digits = filter(function(value) {% return value >= 0 && value <= 9 }, numbers) digits(console.log) // 2 // 7 **/& return function stream(next, stop) {( source(function onElement(element) {3 return lambda(element) ? next(element) : true }, stop) }}exports.filter = filter+function reduce(reducer, source, initial) { /**" Returns stream of reduced values @param {Function} source stream to reduce. @param {Function} reducer reducer function @param initial initial value @examples var numbers = list(2, 3, 8)= var sum = reduce(function onElement(previous, current) {' return (previous || 0) + current }, numbers) sum(console.log) // 13 **/& return function stream(next, stop) { var value = initial( source(function onElement(element) {% value = reducer(value, element) }, function onStop(error) {# if (error) return stop(error) next(value) if (stop) stop() }) }}exports.reduce = reducevar zip = (function Zip() { /**K This function returns stream of tuples, where the n-th tuple contains theH n-th element from each of the argument streams. The returned stream isD truncated in length to the length of the shortest argument stream. @params {Function}! source steams to be combined @examples$ var a = list([ 'a', 'b', 'c' ])! var b = list([ 1, 2, 3, 4 ]). var c = list([ '!', '@', '#', '$', '%' ]) var abc = zip(a, b, c) abs(console.log) // [ 'a', 1, '!' ] // [ 'b', 2, '@' ] // [ 'c', 3, '#' ] **/+ // Returns weather array is empty or not.2 function isEmpty(array) { return !array.length }G // Utility function that check if each array in given array of arraysA // has at least one element (in which case we do have a tuple).: function hasTuple(array) { return !array.some(isEmpty) }F // Utility function that creates tuple by shifting element from each // array of arrays. function shiftTuple(array) {( var index = array.length, tuple = []< while (0 <= --index) tuple.unshift(array[index].shift()) return tuple } return function zip() {7 var sources = Array.prototype.slice.call(arguments)( return function stream(next, stop) {? var buffers = [], id, reason, isStopped = false, shortest' function onElement(id, element) {N // If resulting stream is already stopped (we are in truncate mode) orP // if this stream is stopped (we deal with badly implemented stream that> // yields value after it's stopped) we ignore element." if (isStopped) return null* // Otherwise we buffer an element.! buffers[id].push(element)) // If tuple is ready we yield it.C return hasTuple(buffers) ? next(shiftTuple(buffers)) : true }" function onStop(id, error) {N // If shortest stream was already stopped then we are in truncate mode@ // which means we ignore all the following stream stops." if (isStopped) return nullL // If stream being stopped is the first one to be stopped or if it'sK // shorter then the shortest one stopped, we update stop reason and- // shortest stopped stream reference.@ if (!shortest || shortest.length > buffers[id].length) { shortest = buffers[id] reason = error }P // If shortest stream has no buffered elements, we stop resulting stream // & do some clean up. if (!shortest.length) {' // Marking stream as stopped. isStopped = true // Stopping a stream. stop(reason)K // Setting all closure captured elements to `null` so that gc can // collect them.# buffers = shortest = null } } // Initializing buffers. id = sources.length( while (0 <= --id) buffers.push([]) // Start reading streams. id = sources.length while (0 <= --id)D sources[id](onElement.bind(null, id), onStop.bind(null, id)) } }})()exports.zip = zipfunction take(lambda, source) { /**J Returns stream containing first `n` elements of given `source`, on which given lambda returns `true`. @param {Function} lambda" function that takes elements. @param {Function} source) source stream to take elements from. @examples) var numbers = list(10, 23, 2, 7, 17)( var digits = take(function(value) { return value >= 10 }, numbers) digits(console.log) // 10 // 23 **/& return function stream(next, stop) {( source(function onElement(element) {> return lambda(element) ? next(element) : stop() && false }, stop = limit(stop)) }}exports.take = take$function slice(source, start, end) { /**L Returns a stream consisting of the given `source` stream elements startingK form the `start` zero-based index till `end` zero-based index element. If4 `end` is not passed all elements will be included. **/3 // Zero-based index at which to begin extraction. start = start || 00 // Zero-based index at which to end extraction end = end || Infinity& return function stream(next, stop) { var index = 0 filter(function(element) { return start <= index ++, }, source)(function onElement(element) {A return next(element), index >= end ? (stop(), false) : true }, stop = limit(stop)) }}exports.slice = slicefunction head(source, number) { /**J Returns a stream containing only first `number` of elements of the givenL `source` stream or all elements, if `source` stream has less than `number`< of elements. If `number` is not passed it defaults to `1`. @param {Function} source source stream @param {Number} number=1+ number of elements to take from stream **/= return slice(source, 0, number && number >= 0 ? number : 1)}#exports.head = exports.first = headfunction tail(source, number) { /**M Returns a stream equivalent to given `source` stream, except that the firstM `number` of elements are omitted. If `source` stream has less than `number`N of elements, then empty stream is returned. `number` defaults to `1` if it's not passed. @param {Function} source% source stream to return tail of. @param {Number} number=1- Number of elements that will be omitted. **/: return slice(source, number && number >= 0 ? number : 1)}"exports.tail = exports.rest = tailfunction append() { /**N Returns a stream that contains all elements of each stream in the order theyK appear in the original streams. If any of the `source` streams is stoppedL with an error than it propagates to the resulting stream and it also get's stopped. @examples4 var stream = append(list(1, 2), list('a', 'b')) stream(console.log) // 1 // 2 // 'a' // 'b' **/8 var streams = Array.prototype.slice.call(arguments, 0)& return function stream(next, stop) {* var source, sources = streams.slice(0) function onStop(error) {+ if (error) return stop && stop(error): if ((source = sources.shift())) source(next, onStop) else return stop && stop() } onStop() }}exports.append = appendfunction merge(source) { /**I Returns a stream that contains all elements of each stream of the givenO source stream. `source` is stream of streams whose elements will be containedJ by the resulting stream. Any error from any stream will propagate to theO resulting stream. Stream is stopped when all streams from `source` and sourceH itself is ended. Elements of the stream are position in order they areK delivered so it could happen that elements from second stream will appear1 before or between elements of the first stream. @param {Function} sourceK Stream of streams whose elements will be contained by resulting stream @examples! function async(next, stop) { setTimeout(function() { next('async') stop() }, 10) }3 var stream = merge(list(async, list(1, 2, 3))) stream(console.log) // 1 // 2 // 3 // 'async' **/& return function stream(next, stop) { var open = 1, alive function onStop(error) {0 if (!open || false === alive) return false if (error) open = 0 else open -- if (!open) stop(error) }& source(function onStream(stream) { open ++% stream(function onNext(value) {D return open && false !== alive ? alive = next(value) : false }, onStop) }, onStop) }}exports.merge = mergefunction print(stream) { /**$ Utility function to print streams. @param {Function} stream stream to print @examples" print(list('Hello', 'world')) **/ console.log('>>')< stream(console.log.bind(console), function onStop(error) {# if (error) console.error(error) else console.log('<<') })}exports.print = printfunction hub(source) { /**G Returns a stream equivalent to a given `source`, with difference thatJ all the consumers will start reading it from the point it's at the givenL moment. This is useful with streams such as user generated events (clicks,L keypress, etc..) where multiple stream readers might need to read from the+ same source. In other words, this is yourJ [pub / sub](http://en.wikipedia.org/wiki/Publish/subscribe) for streams. @param {Function} source: Stream whose elements get published to a subscribers. @return {Function}4 Stream that multiple subscribers can read from. @examples! function range(start, end) {+ return function stream(next, stop) { var number = start - 1' setTimeout(function onNext() { if (++number >= end) return stop()& if (false !== next(number))" setTimeout(onNext, 2) }, 2) } } function printer(index) {& return function print(stream) {= stream(console.log.bind(console, '#' + index + '>'),8 console.log.bind(console, '<#' + index)) } } var numbers = range(1, 5) printer(1)(numbers)7 setTimeout(function () { printer(2)(numbers) }, 5)- // Output will look something like this: #1> 1 #1> 2 #1> 3 #2> 1 #1> 4 #2> 2 <#1 #2> 3 #2> 4 <#2K // If you noticed second print started form the first `1` element. Now) // lets do similar thing with a hub.# var numbers = hub(range(1, 5)) printer(1)(numbers)7 setTimeout(function () { printer(2)(numbers) }, 5). // In this case output will be different: #1> 1 #1> 2 #1> 3 #1> 4 #2> 4 <#1 <#2K // Notice this time second print only printed only following elements. **// var listeners = [], isStopped = false, reason# source(function onNext(element) { var index = 0E // Maybe it'd be better to just iterate on sliced array instead ?& while (index < listeners.length) {3 if (listeners[index++][0](element) === false)$ listeners.splice(--index, 1) } }, function onStop(error) { isStopped = true reason = error var listener, stop- while ((listener = listeners.shift())) {, if ((stop = listener[1])) stop(reason) } })& return function stream(next, stop) {: // If stream is already stopped, we notify a listener.' if (isStopped && stop) stop(reason)! // Else we register listener.' else listeners.push([ next, stop ]) }}exports.hub = hubfunction cache(source) { /**N Returns a stream equivalent to a given `source` by caching all it's elementsM into memory for faster reads. This is useful with `source` streams that areK expensive to compute (requires access to the network for example). Use itC carefully though, do not cache infinite streams and be aware thatA asynchronous stream when cached will yield some or all elementsK synchronously. Also be aware, that unlike other functions, this is greedyE which means that it will start reading `source` stream immediately. @param {Function} source source stream to cache. @returns {Function}A cached equivalent of source that can be read multiple times. **/ var buffer = []@ // Creating a stream that streams element of the buffer array. function cached(next, stop) { var index = 0! while (index < buffer.length)7 if (false === next(buffer[index++])) return false stop() }9 return append(cached, hub(function stream(next, stop) {( source(function onElement(element) { buffer.push(element) return next(element) }, stop) }));}exports.cache = cachefunction stack(source) { /**M Returns stream equivalent to a given `source` with a difference that it hasK a state, meaning that if reader stops reading on n-th element next readerL will continue reading from n-th element. Purpose is to wrap any other typeK of stream, such that each element of `source` can be read only once. ThisN allow element distribution across different consumers with a help of `head`, `tail` and similars. @param {Function} source/ source stream to create stack stream from. @returns {Function}; stack equivalent of source that can be read only once. **/N var readers = [], buffer = [], isStopped = false, isStarted = false, reason, updating = false function update() {A if (updating || !buffer.length || !readers.length) return nil updating = true. var resume = readers[0][0](buffer.shift())) if (false === resume) readers.shift() update(updating = false) } function onNext(element) { update(buffer.push(element)) } function onStop(error) { isStopped = true reason = error var reader = readers.shift()F return reader ? onStop(error, reader[1] && reader[1](error)) : nil }& return function stream(next, stop) {: // If stream is already stopped, we notify a listener.@ if (isStopped && !buffer.length) return stop && stop(reason) readers.push([ next, stop ]) update()B // If `source` stream is not yet being read, start reading it. if (isStarted) return nil isStarted = true source(onNext, onStop) }}exports.stack = stackfunction join(source) { /**I Returns a stream that contains all elements of each stream of the givenO source stream. `source` is stream of streams whose elements will be containedJ by the resulting stream. Any error from any stream will propagate to theO resulting stream. Stream is stopped when all streams from `source` and sourceN itself are ended. Elements of the stream are position in order, all elements5 of first stream, all elements of second stream etc. @param {Function} sourceK Stream of streams whose elements will be contained by resulting stream @examples! function async(next, stop) { setTimeout(function() { next('async') stop() }, 10) }2 var stream = join(list(async, list(1, 2, 3))) stream(console.log) // 'async' // 1 // 2 // 3 **/& return function stream(next, stop) {H var streams = stack(source), open = 2, alive = true, stopped = false function onStop(error) { open --F if (error && !stopped) open = false, stopped = true, stop(error)8 else if (!open && !stopped) stopped = true, stop() } !function recur(error) { onStop(error)/ head(streams)(function onStream(stream) {J // Increment number of open steams (twice since we decrement twice; // once on stream close and second on slice close). open += 2, stream(function onElement(element) {K // If steam is still open and read is not interrupted we pipe allD // elements to the reader. Otherwise we interrupt reading.6 return !stopped && open && false !== alive ?. alive = next(element) : false }, recur) }, onStop) }() }}exports.join = join});5_$vN_X+o function normalize(source) {& return function stream(next, stop) {' var stopped = false, reading = true( source(function onElement(element) {( if (!stopped && false !== reading)( return (reading = next(element)) }, function onStop(reason) {; return stopped ? nil : (stopped = true, stop(reason)) }) }}/**5_ vN_Xc'/* vim:set ts=2 sw=2 sts=2 expandtab */?/*jshint asi: true undef: true es5: true node: true devel: true' forin: false latedef: false *//*global define: true */!(typeof(define) !== "function" ? function($){ $(typeof(require) !== 'function' ? (function() { throw Error('require unsupported'); }) : require, typeof(exports) === 'undefined' ? this : exports); } : define)(function(require, exports) { 'use strict';"var nil // Shortcut for undefined./**H * Internal utility function that takes a `source` function and optionalG * `number` arguments and returns a function that calls `source` with aG * given arguments only first `number` of times it's called. Useful for@ * wrapping callbacks that must be called only ones for example. */ function limit(source, number) { number = number || 1 return function limited() {G return number-- > 0 && source ? source.apply(this, arguments) : nil }}/**K * Utility function that returns streams of elements of given `array`. ThisI * function is dangerous as array mutations will have side effects on theJ * returned stream so it should be used with a care, reader of such streamL * **MUST NOT** mutate source array, this is also reason why we don't export * this function. */ function streamArray(elements) {& return function stream(next, stop) { var index = 0# while (index < elements.length)< // If stream reader interrupts reading we just return.9 if (false === next(elements[index++])) return falseK // Once all elements were yielded we stop a stream if such callback was // passed. if (stop) stop() }}/**$ * Creates stream of given elements. * @examples# * list('a', 2, {})(console.log) */function list() {; return streamArray(Array.prototype.slice.call(arguments))}exports.list = list/*8 * Creates empty stream. This is equivalent of `list()`. */function empty() {> return function stream(next, stop) { return stop && stop() }}exports.empty = empty/**# * Returns stream of mapped values. * @param {Function} lambda# * function that maps each value * @param {Function} input * source stream to be mapped * @examples: * var stream = list({ name: 'foo' }, { name: 'bar' })D * var names = map(function(value) { return value.name }, stream) * names(console.log) * // 'foo' * // 'bar'! * var numbers = list(1, 2, 3)N * var mapped = map(function onEach(number) { return number * 2 }, numbers) * mapped(console.log) * // 2 * // 4 * // 6 */function map(lambda, source) {& return function stream(next, stop) {( source(function onElement(element) {" return next(lambda(element)) }, stop) }}exports.map = map/**% * Returns stream of filtered values. * @param {Function} lambda" * function that filters values * @param {Function} source" * source stream to be filtered * @examples* * var numbers = list(10, 23, 2, 7, 17)+ * var digits = filter(function(value) {' * return value >= 0 && value <= 9 * }, numbers) * digits(console.log) * // 2 * // 7 */!function filter(lambda, source) {& return function stream(next, stop) {( source(function onElement(element) {3 return lambda(element) ? next(element) : true }, stop) }}exports.filter = filter/**# * Returns stream of reduced values * @param {Function} source * stream to reduce. * @param {Function} reducer * reducer function * @param initial * initial value * @examples! * var numbers = list(2, 3, 8)> * var sum = reduce(function onElement(previous, current) {( * return (previous || 0) + current * }, numbers) * sum(console.log) * // 13 */+function reduce(reducer, source, initial) {& return function stream(next, stop) { var value = initial( source(function onElement(element) {% value = reducer(value, element) }, function onStop(error) {# if (error) return stop(error) next(value) if (stop) stop() }) }}exports.reduce = reduce/**L * This function returns stream of tuples, where the n-th tuple contains theI * n-th element from each of the argument streams. The returned stream isE * truncated in length to the length of the shortest argument stream. * @params {Function}" * source steams to be combined * @examples% * var a = list([ 'a', 'b', 'c' ])" * var b = list([ 1, 2, 3, 4 ])/ * var c = list([ '!', '@', '#', '$', '%' ]) * var abc = zip(a, b, c) * abs(console.log) * // [ 'a', 1, '!' ] * // [ 'b', 2, '@' ] * // [ 'c', 3, '#' ] */var zip = (function Zip() {+ // Returns weather array is empty or not.2 function isEmpty(array) { return !array.length }G // Utility function that check if each array in given array of arraysA // has at least one element (in which case we do have a tuple).: function hasTuple(array) { return !array.some(isEmpty) }F // Utility function that creates tuple by shifting element from each // array of arrays. function shiftTuple(array) {( var index = array.length, tuple = []< while (0 <= --index) tuple.unshift(array[index].shift()) return tuple } return function zip() {7 var sources = Array.prototype.slice.call(arguments)( return function stream(next, stop) {? var buffers = [], id, reason, isStopped = false, shortest' function onElement(id, element) {N // If resulting stream is already stopped (we are in truncate mode) orP // if this stream is stopped (we deal with badly implemented stream that> // yields value after it's stopped) we ignore element." if (isStopped) return null* // Otherwise we buffer an element.! buffers[id].push(element)) // If tuple is ready we yield it.C return hasTuple(buffers) ? next(shiftTuple(buffers)) : true }" function onStop(id, error) {N // If shortest stream was already stopped then we are in truncate mode@ // which means we ignore all the following stream stops." if (isStopped) return nullL // If stream being stopped is the first one to be stopped or if it'sK // shorter then the shortest one stopped, we update stop reason and- // shortest stopped stream reference.@ if (!shortest || shortest.length > buffers[id].length) { shortest = buffers[id] reason = error }P // If shortest stream has no buffered elements, we stop resulting stream // & do some clean up. if (!shortest.length) {' // Marking stream as stopped. isStopped = true // Stopping a stream. stop(reason)K // Setting all closure captured elements to `null` so that gc can // collect them.# buffers = shortest = null } } // Initializing buffers. id = sources.length( while (0 <= --id) buffers.push([]) // Start reading streams. id = sources.length while (0 <= --id)D sources[id](onElement.bind(null, id), onStop.bind(null, id)) } }})()exports.zip = zip/**K * Returns stream containing first `n` elements of given `source`, on which * given lambda returns `true`. * * @param {Function} lambda# * function that takes elements. * @param {Function} source* * source stream to take elements from. * @examples* * var numbers = list(10, 23, 2, 7, 17)) * var digits = take(function(value) { * return value >= 10 * }, numbers) * digits(console.log) * // 10 * // 23 */function take(lambda, source) {& return function stream(next, stop) {( source(function onElement(element) {> return lambda(element) ? next(element) : stop() && false }, stop = limit(stop)) }}exports.take = take/**M * Returns a stream consisting of the given `source` stream elements startingL * form the `start` zero-based index till `end` zero-based index element. If5 * `end` is not passed all elements will be included. */$function slice(source, start, end) {3 // Zero-based index at which to begin extraction. start = start || 00 // Zero-based index at which to end extraction end = end || Infinity& return function stream(next, stop) { var index = 0 filter(function(element) { return start <= index ++, }, source)(function onElement(element) {A return next(element), index >= end ? (stop(), false) : true }, stop = limit(stop)) }}exports.slice = slice/**K * Returns a stream containing only first `number` of elements of the givenM * `source` stream or all elements, if `source` stream has less than `number`= * of elements. If `number` is not passed it defaults to `1`. * @param {Function} source * source stream * @param {Number} number=1, * number of elements to take from stream */function head(source, number) {= return slice(source, 0, number && number >= 0 ? number : 1)}exports.head = head/**N * Returns a stream equivalent to given `source` stream, except that the firstN * `number` of elements are omitted. If `source` stream has less than `number`O * of elements, then empty stream is returned. `number` defaults to `1` if it's * not passed. * @param {Function} source& * source stream to return tail of. * @param {Number} number=1. * Number of elements that will be omitted. */function tail(source, number) {: return slice(source, number && number >= 0 ? number : 1)}exports.tail = tail/**O * Returns a stream that contains all elements of each stream in the order theyL * appear in the original streams. If any of the `source` streams is stoppedM * with an error than it propagates to the resulting stream and it also get's * stopped. * @examples5 * var stream = append(list(1, 2), list('a', 'b')) * stream(console.log) * // 1 * // 2 * // 'a' * // 'b' */function append() {8 var streams = Array.prototype.slice.call(arguments, 0)& return function stream(next, stop) {* var source, sources = streams.slice(0) function onStop(error) {+ if (error) return stop && stop(error): if ((source = sources.shift())) source(next, onStop) else return stop && stop() } onStop() }}exports.append = append/**J * Returns a stream that contains all elements of each stream of the givenP * source stream. `source` is stream of streams whose elements will be containedK * by the resulting stream. Any error from any stream will propagate to theP * resulting stream. Stream is stopped when all streams from `source` and sourceI * itself is ended. Elements of the stream are position in order they areL * delivered so it could happen that elements from second stream will appear2 * before or between elements of the first stream. * @param {Function} sourceL * Stream of streams whose elements will be contained by resulting stream * @examples" * function async(next, stop) { * setTimeout(function() { * next('async') * stop() * }, 10) * }4 * var stream = merge(list(async, list(1, 2, 3))) * stream(console.log) * // 1 * // 2 * // 3 * // 'async' */function merge(source) {& return function stream(next, stop) { var open = 1, alive function onStop(error) {0 if (!open || false === alive) return false if (error) open = 0 else open -- if (!open) stop(error) }& source(function onStream(stream) { open ++% stream(function onNext(value) {D return open && false !== alive ? alive = next(value) : false }, onStop) }, onStop) }}exports.merge = merge/**% * Utility function to print streams. * @param {Function} stream * stream to print * @examples# * print(list('Hello', 'world')) */function print(stream) { console.log('>>')< stream(console.log.bind(console), function onStop(error) {# if (error) console.error(error) else console.log('<<') })}exports.print = print/**H * Returns a stream equivalent to a given `source`, with difference thatK * all the consumers will start reading it from the point it's at the givenM * moment. This is useful with streams such as user generated events (clicks,M * keypress, etc..) where multiple stream readers might need to read from the, * same source. In other words, this is yourK * [pub / sub](http://en.wikipedia.org/wiki/Publish/subscribe) for streams. * @param {Function} source; * Stream whose elements get published to a subscribers. * @return {Function}5 * Stream that multiple subscribers can read from. * @examples" * function range(start, end) {, * return function stream(next, stop) { * var number = start - 1( * setTimeout(function onNext() { * if (++number >= end) * return stop()' * if (false !== next(number))# * setTimeout(onNext, 2) * }, 2) * } * } * function printer(index) {' * return function print(stream) {> * stream(console.log.bind(console, '#' + index + '>'),9 * console.log.bind(console, '<#' + index)) * } * } * var numbers = range(1, 5) * printer(1)(numbers)8 * setTimeout(function () { printer(2)(numbers) }, 5) *. * // Output will look something like this: * #1> 1 * #1> 2 * #1> 3 * #2> 1 * #1> 4 * #2> 2 * <#1 * #2> 3 * #2> 4 * <#2 *L * // If you noticed second print started form the first `1` element. Now* * // lets do similar thing with a hub. *$ * var numbers = hub(range(1, 5)) * printer(1)(numbers)8 * setTimeout(function () { printer(2)(numbers) }, 5) */ * // In this case output will be different: * * #1> 1 * #1> 2 * #1> 3 * #1> 4 * #2> 4 * <#1 * <#2 *L * // Notice this time second print only printed only following elements. */function hub(source) {/ var listeners = [], isStopped = false, reason# source(function onNext(element) { var index = 0E // Maybe it'd be better to just iterate on sliced array instead ?& while (index < listeners.length) {3 if (listeners[index++][0](element) === false)$ listeners.splice(--index, 1) } }, function onStop(error) { isStopped = true reason = error var listener, stop- while ((listener = listeners.shift())) {, if ((stop = listener[1])) stop(reason) } })& return function stream(next, stop) {: // If stream is already stopped, we notify a listener.' if (isStopped && stop) stop(reason)! // Else we register listener.' else listeners.push([ next, stop ]) }}exports.hub = hub/**O * Returns a stream equivalent to a given `source` by caching all it's elementsN * into memory for faster reads. This is useful with `source` streams that areL * expensive to compute (requires access to the network for example). Use itD * carefully though, do not cache infinite streams and be aware thatB * asynchronous stream when cached will yield some or all elementsL * synchronously. Also be aware, that unlike other functions, this is greedyF * which means that it will start reading `source` stream immediately. * @param {Function} source * source stream to cache. * @returns {Function}B * cached equivalent of source that can be read multiple times. */function cache(source) { var buffer = []@ // Creating a stream that streams element of the buffer array. function cached(next, stop) { var index = 0! while (index < buffer.length)7 if (false === next(buffer[index++])) return false stop() }9 return append(cached, hub(function stream(next, stop) {( source(function onElement(element) { buffer.push(element) return next(element) }, stop) }));}exports.cache = cache/**N * Returns stream equivalent to a given `source` with a difference that it hasL * a state, meaning that if reader stops reading on n-th element next readerM * will continue reading from n-th element. Purpose is to wrap any other typeL * of stream, such that each element of `source` can be read only once. ThisO * allow element distribution across different consumers with a help of `head`, * `tail` and similars. * @param {Function} source0 * source stream to create stack stream from. * @returns {Function}< * stack equivalent of source that can be read only once. */function stack(source) {N var readers = [], buffer = [], isStopped = false, isStarted = false, reason, updating = false function update() {A if (updating || !buffer.length || !readers.length) return nil updating = true. var resume = readers[0][0](buffer.shift())) if (false === resume) readers.shift() update(updating = false) } function onNext(element) { update(buffer.push(element)) } function onStop(error) { isStopped = true reason = error var reader = readers.shift()F return reader ? onStop(error, reader[1] && reader[1](error)) : nil }& return function stream(next, stop) {: // If stream is already stopped, we notify a listener.@ if (isStopped && !buffer.length) return stop && stop(reason) readers.push([ next, stop ]) update()B // If `source` stream is not yet being read, start reading it. if (isStarted) return nil isStarted = true source(onNext, onStop) }}exports.stack = stack/**J * Returns a stream that contains all elements of each stream of the givenP * source stream. `source` is stream of streams whose elements will be containedK * by the resulting stream. Any error from any stream will propagate to theP * resulting stream. Stream is stopped when all streams from `source` and sourceO * itself are ended. Elements of the stream are position in order, all elements6 * of first stream, all elements of second stream etc. * @param {Function} sourceL * Stream of streams whose elements will be contained by resulting stream * @examples" * function async(next, stop) { * setTimeout(function() { * next('async') * stop() * }, 10) * }3 * var stream = join(list(async, list(1, 2, 3))) * stream(console.log) * // 'async' * // 1 * // 2 * // 3 */function join(source) {& return function stream(next, stop) {H var streams = stack(source), open = 2, alive = true, stopped = false function onStop(error) { open --F if (error && !stopped) open = false, stopped = true, stop(error)8 else if (!open && !stopped) stopped = true, stop() } !function recur(error) { onStop(error)/ head(streams)(function onStream(stream) {J // Increment number of open steams (twice since we decrement twice; // once on stream close and second on slice close). open += 2, stream(function onElement(element) {K // If steam is still open and read is not interrupted we pipe allD // elements to the reader. Otherwise we interrupt reading.6 return !stopped && open && false !== alive ?. alive = next(element) : false }, recur) }, onStop) }() }}exports.join = join});5_ )vN_Z(*w#exports.head = exports.first = head5_  )vN_Z,(*wexports.head = head5_   9vN_Z-8:w"exports.tail = exports.rest = tail5_   )vN_\g.(*wexports.head = head5_   9vN_\z08:wexports.tail = tail5_  )-vN_]g1(*w2exports.head = exports.first = exports.peek = head5_ )vN_^(*w#exports.head = exports.first = head5_)+vN_^3(*w0exports.head = exports.first = exports.peex head5_vN`w /**K This function returns stream of tuples, where the n-th tuple contains theH n-th element from each of the argument streams. The returned stream isD truncated in length to the length of the shortest argument stream. @params {Function}! source steams to be combined @examples$ var a = list([ 'a', 'b', 'c' ])! var b = list([ 1, 2, 3, 4 ]). var c = list([ '!', '@', '#', '$', '%' ]) var abc = zip(a, b, c) abs(console.log) // [ 'a', 1, '!' ] // [ 'b', 2, '@' ] // [ 'c', 3, '#' ] **/5_vN` 5_vN`g f5_vN`4wK This function returns stream of tuples, where the n-th tuple contains theH n-th element from each of the argument streams. The returned stream isD truncated in length to the length of the shortest argument stream. @params {Function}! source steams to be combined @examples$ var a = list([ 'a', 'b', 'c' ])! var b = list([ 1, 2, 3, 4 ]). var c = list([ '!', '@', '#', '$', '%' ]) var abc = zip(a, b, c) abs(console.log) // [ 'a', 1, '!' ] // [ 'b', 2, '@' ] // [ 'c', 3, '#' ] **/5_!#%"vN_Ta5_"$#vN_T5_#$vN_T! " /**% * Utility function to print streams. * @param {Function} stream * stream to print * @examples# * print(list('Hello', 'world')) */5_vN_Ra5_vN_S5_vN_S\ ] 5_vN_S]5_XZ_Y ,.vN_R ` function limit(source, number) { /*5_Y[Z ,.vN_R a/*5_Z\[=-/vN_Ra=wrapping callbacks that must be called only ones for example.*/5_[]\ .0vN_R$b function streamArray(elements) { /*5_\^].0vN_R%c/*5_]^ /1vN_R(!cthis function.*/5