* In the case where two events are simultaneous (i.e. both * within the same transaction), the event from this will take precedence, and * the event from s will be dropped. * If you want to specify your own combining function, use {@link Stream#merge(Stream, Lambda2)}. * s1.orElse(s2) is equivalent to s1.merge(s2, (l, r) -> l). *
* The name orElse() is used instead of merge() to make it really clear that care should
* be taken, because events can be dropped.
*/
orElse(s : Stream) : Stream {
return this.merge(s, (left : A, right: A) => {
return left;
});
}
/**
* Merge two streams of the same type into one, so that events on either input appear
* on the returned stream.
*
* If the events are simultaneous (that is, one event from this and one from s
* occurring in the same transaction), combine them into one using the specified combining function
* so that the returned stream is guaranteed only ever to have one event per transaction.
* The event from this will appear at the left input of the combining function, and
* the event from s will appear at the right.
* @param f Function to combine the values. It may construct FRP logic or use
* {@link Cell#sample()}. Apart from this the function must be referentially transparent.
*/
merge(s : Stream, f : ((left : A, right : A) => A) | Lambda2) : Stream {
const ff = Lambda2_toFunction(f);
const mergeState = new MergeState();
let pumping = false;
const out = new StreamWithSend(null);
const pump = () => {
if (pumping) {
return;
}
pumping = true;
Transaction.currentTransaction.prioritized(out.getVertex__(), () => {
if (mergeState.left_present && mergeState.right_present) {
out.send_(ff(mergeState.left, mergeState.right));
} else if (mergeState.left_present) {
out.send_(mergeState.left);
} else if (mergeState.right_present) {
out.send_(mergeState.right);
}
mergeState.left = null;
mergeState.left_present = false;
mergeState.right = null;
mergeState.right_present = false;
pumping = false;
});
};
const vertex = new Vertex("merge", 0,
[
new Source(
this.vertex,
() => this.listen_(out.vertex, (a : A) => {
mergeState.left = a;
mergeState.left_present = true;
pump();
}, false)
),
new Source(
s.vertex,
() => s.listen_(out.vertex, (a : A) => {
mergeState.right = a;
mergeState.right_present = true;
pump();
}, false)
)
].concat(toSources(Lambda2_deps(f)))
);
out.vertex = vertex;
return out;
}
/**
* Return a stream that only outputs events for which the predicate returns true.
*/
filter(f : ((a : A) => boolean) | Lambda1) : Stream {
const out = new StreamWithSend(null);
const ff = Lambda1_toFunction(f);
out.vertex = new Vertex("filter", 0, [
new Source(
this.vertex,
() => {
return this.listen_(out.vertex, (a : A) => {
if (ff(a))
out.send_(a);
}, false);
}
)
].concat(toSources(Lambda1_deps(f)))
);
return out;
}
/**
* Return a stream that only outputs events that have present
* values, discarding null values.
*/
filterNotNull() : Stream {
const out = new StreamWithSend(null);
out.vertex = new Vertex("filterNotNull", 0, [
new Source(
this.vertex,
() => {
return this.listen_(out.vertex, (a : A) => {
if (a !== null)
out.send_(a);
}, false);
}
)
]
);
return out;
}
/**
* Return a stream that only outputs events from the input stream
* when the specified cell's value is true.
*/
gate(c : Cell
* There is an implicit delay: State updates caused by event firings being held with
* {@link Stream#hold(Object)} don't become visible as the cell's current value until
* the following transaction. To put this another way, {@link Stream#snapshot(Cell, Lambda2)}
* always sees the value of a cell as it was before any state changes from the current
* transaction.
*/
snapshot(b : Cell, f_ : ((a : A, b : B) => C) | Lambda2) : Stream
* There is an implicit delay: State updates caused by event firings being held with
* {@link Stream#hold(Object)} don't become visible as the cell's current value until
* the following transaction. To put this another way, snapshot()
* always sees the value of a cell as it was before any state changes from the current
* transaction.
*/
snapshot3(b : Cell, c : Cell
* There is an implicit delay: State updates caused by event firings being held with
* {@link Stream#hold(Object)} don't become visible as the cell's current value until
* the following transaction. To put this another way, snapshot()
* always sees the value of a cell as it was before any state changes from the current
* transaction.
*/
snapshot4(b : Cell, c : Cell
* There is an implicit delay: State updates caused by event firings being held with
* {@link Stream#hold(Object)} don't become visible as the cell's current value until
* the following transaction. To put this another way, snapshot()
* always sees the value of a cell as it was before any state changes from the current
* transaction.
*/
snapshot5(b : Cell, c : Cell
* There is an implicit delay: State updates caused by event firings being held with
* {@link Stream#hold(Object)} don't become visible as the cell's current value until
* the following transaction. To put this another way, snapshot()
* always sees the value of a cell as it was before any state changes from the current
* transaction.
*/
snapshot6(b : Cell, c : Cell
* There is an implicit delay: State updates caused by event firings don't become
* visible as the cell's current value as viewed by {@link Stream#snapshot(Cell, Lambda2)}
* until the following transaction. To put this another way,
* {@link Stream#snapshot(Cell, Lambda2)} always sees the value of a cell as it was before
* any state changes from the current transaction.
*/
hold(initValue : A) : Cell {
return new Cell(initValue, this);
}
/**
* A variant of {@link hold(Object)} with an initial value captured by {@link Cell#sampleLazy()}.
*/
holdLazy(initValue : Lazy) : Cell {
return new LazyCell(initValue, this);
}
/**
* Transform an event with a generalized state loop (a Mealy machine). The function
* is passed the input and the old state and returns the new state and output value.
* @param f Function to apply to update the state. It may construct FRP logic or use
* {@link Cell#sample()} in which case it is equivalent to {@link Stream#snapshot(Cell)}ing the
* cell. Apart from this the function must be referentially transparent.
*/
collect(initState : S, f : ((a : A, s : S) => Tuple2) | Lambda2>) : Stream {
return this.collectLazy(new Lazy(() => { return initState; }), f);
}
/**
* A variant of {@link collect(Object, Lambda2)} that takes an initial state returned by
* {@link Cell#sampleLazy()}.
*/
collectLazy(initState : Lazy, f : ((a : A, s : S) => Tuple2) | Lambda2>) : Stream {
const ea = this;
return Transaction.run(() => {
const es = new StreamLoop(),
s = es.holdLazy(initState),
ebs = ea.snapshot(s, f),
eb = ebs.map((bs : Tuple2) => { return bs.a; }),
es_out = ebs.map((bs : Tuple2) => { return bs.b; });
es.loop(es_out);
return eb;
});
}
/**
* Accumulate on input event, outputting the new state each time.
* @param f Function to apply to update the state. It may construct FRP logic or use
* {@link Cell#sample()} in which case it is equivalent to {@link Stream#snapshot(Cell)}ing the
* cell. Apart from this the function must be referentially transparent.
*/
accum(initState : S, f : ((a : A, s : S) => S) | Lambda2) : Cell {
return this.accumLazy(new Lazy(() => { return initState; }), f);
}
/**
* A variant of {@link accum(Object, Lambda2)} that takes an initial state returned by
* {@link Cell#sampleLazy()}.
*/
accumLazy(initState : Lazy, f : ((a : A, s : S) => S) | Lambda2) : Cell {
const ea = this;
return Transaction.run(() => {
const es = new StreamLoop(),
s = es.holdLazy(initState),
es_out = ea.snapshot(s, f);
es.loop(es_out);
return es_out.holdLazy(initState);
});
}
/**
* Return a stream that outputs only one value: the next event of the
* input stream, starting from the transaction in which once() was invoked.
*/
once() : Stream {
/*
return Transaction.run(() => {
const ev = this,
out = new StreamWithSend();
let la : () => void = null;
la = ev.listen_(out.vertex, (a : A) => {
if (la !== null) {
out.send_(a);
la();
la = null;
}
}, false);
return out;
});
*/
// We can't use the implementation above, beacuse deregistering
// listeners triggers the exception
// "send() was invoked before listeners were registered"
// We can revisit this another time. For now we will use the less
// efficient implementation below.
const me = this;
return Transaction.run(() => me.gate(me.mapTo(false).hold(true)));
}
listen(h : (a : A) => void) : () => void {
return Transaction.run<() => void>(() => {
return this.listen_(Vertex.NULL, h, false);
});
}
listen_(target : Vertex,
h : (a : A) => void,
suppressEarlierFirings : boolean) : () => void {
if (this.vertex.register(target))
Transaction.currentTransaction.requestRegen();
const listener = new Listener(h, target);
this.listeners.push(listener);
if (!suppressEarlierFirings && this.firings.length != 0) {
const firings = this.firings.slice();
Transaction.currentTransaction.prioritized(target, () => {
// Anything sent already in this transaction must be sent now so that
// there's no order dependency between send and listen.
for (let i = 0; i < firings.length; i++)
h(firings[i]);
});
}
return () => {
let removed = false;
for (let i = 0; i < this.listeners.length; i++) {
if (this.listeners[i] == listener) {
this.listeners.splice(i, 1);
removed = true;
break;
}
}
if (removed)
this.vertex.deregister(target);
};
}
/**
* Fantasy-land Algebraic Data Type Compatability.
* Stream satisfies the Functor and Monoid Categories (and hence Semigroup)
* @see {@link https://github.com/fantasyland/fantasy-land} for more info
*/
//map :: Functor f => f a ~> (a -> b) -> f b
'fantasy-land/map'(f : ((a : A) => B)) : Stream {
return this.map(f);
}
//concat :: Semigroup a => a ~> a -> a
'fantasy-land/concat'(a:Stream) : Stream {
return this.merge(a, (left:any, right) => {
return (Z.Semigroup.test(left)) ? Z.concat(left, right) : left;
});
}
//empty :: Monoid m => () -> m
'fantasy-land/empty'() : Stream {
return new Stream();
}
}
export class StreamWithSend extends Stream {
constructor(vertex? : Vertex) {
super(vertex);
}
setVertex__(vertex : Vertex) { // TO DO figure out how to hide this
this.vertex = vertex;
}
send_(a : A) : void {
if (this.firings.length == 0)
Transaction.currentTransaction.last(() => {
this.firings = [];
});
this.firings.push(a);
const listeners = this.listeners.slice();
for (let i = 0; i < listeners.length; i++) {
const h = listeners[i].h;
Transaction.currentTransaction.prioritized(listeners[i].target, () => {
Transaction.currentTransaction.inCallback++;
try {
h(a);
Transaction.currentTransaction.inCallback--;
}
catch (err) {
Transaction.currentTransaction.inCallback--;
throw err;
}
});
}
}
}
/**
* A forward reference for a {@link Stream} equivalent to the Stream that is referenced.
*/
export class StreamLoop extends StreamWithSend {
assigned__ : boolean = false; // to do: Figure out how to hide this
constructor()
{
super();
this.vertex.name = "StreamLoop";
if (Transaction.currentTransaction === null)
throw new Error("StreamLoop/CellLoop must be used within an explicit transaction");
}
/**
* Resolve the loop to specify what the StreamLoop was a forward reference to. It
* must be invoked inside the same transaction as the place where the StreamLoop is used.
* This requires you to create an explicit transaction with {@link Transaction#run(Lambda0)}
* or {@link Transaction#runVoid(Runnable)}.
*/
loop(sa_out : Stream) : void {
if (this.assigned__)
throw new Error("StreamLoop looped more than once");
this.assigned__ = true;
this.vertex.addSource(
new Source(
sa_out.getVertex__(),
() => {
return sa_out.listen_(this.vertex, (a : A) => {
this.send_(a);
}, false);
}
)
);
}
}