/// import util = require('./util') import types = require('./types') import PRE = require('./PendingResultError') import MonitorCollector = require('./MonitorCollector') import R = require('reactivity') import B = types.Block import reactivity_util = require('./reactivity_util') var serial = 0 function LOG( msg: string ){ console.log( msg ) } /* Executors allow you to modify the blocking control flow. By default every pending error will stop execution. This module allows you to intercept blocks, catch batches of errors and consolidate them in different ways to modify concurrency. */ class Executor { type(){ return 'base'} _id: number = 0 path(){ return this.parent.path() + '.' + this.id() } id(){ return this.type() + '(' + this._id + ')' } log( msg: string ){ LOG( this.path() + ' -> ' + msg ) } constructor( public parent: Executor ){ this._id = serial++ this.log( 'constructor()' ) } run( f: B ): T { this.log('run()') return this.parent._run_child( () => this._run( f ) ) } _run_child( f: B ): T { this.log('_run_child()') return f() } _run( f: B ): T { this.log('_run()') return f() } } // Root executor has no parent class RootExecutor extends Executor { constructor( ){ super( undefined ) } run( f: B ): T { return f() } path(){ return this.id() + '' } } class SequenceExecutor extends Executor { type(){ return 'seq'} private pending = false _run( f:B ): T { this.pending = false return f() } _run_child( f:B ): T { this.log('_run_child??!') if ( ! this.pending ){ var res = R.run( f ) if ( PRE.status( res.error ) === PRE.Status.Pending ) this.pending = true; return reactivity_util.bubbleResult( res ) } else { this.log('not running. pending') } } } class ParallelExecutor extends Executor { type(){ return 'par'} private monitors = new MonitorCollector.MonitorCollector() private pending_count = 0 run( f: B ): T { this.log('run()') return super.run( f ) } _run( f: B ): T { this.log( '_run()' ) // run in reactive context var res = R.run( f ) // by now our children executed as well // and we should have caught all their monitors // the only one missing is our own this.monitors.push( res.monitor ) // we join all of them this.monitors.join() // should we throw an error? this.log( '_run().pending_count ' + this.pending_count ) if ( this.pending_count > 0 ) throw new PRE() if ( util.exists( res.error ) ) throw res.error // ok. cool. lets return the result return res.result } _run_child( f:B ): T { this.log( '_run_child()' ) var res = R.run( f ) this.monitors.push( res.monitor ) switch( PRE.status( res.error ) ) { case PRE.Status.Ready : return res.result case PRE.Status.Error : throw res.error case PRE.Status.Pending : this.pending_count++ } } } var current = new RootExecutor() var withExecutor = ( build: ( p: Executor ) => Executor ) => ( f: B ): B => util.around( () => { current = build( current ) }, // push () => { current = current.parent }, // pop () => { // run var a = arguments ; return current.run( () => f.apply( null, a ) ) } ) // combinators export var withSequence = withExecutor( (p: Executor) => new SequenceExecutor( p ) ) export var withParallel = withExecutor( (p: Executor) => new ParallelExecutor( p ) ) // applicators export function parallel( f: types.Block ): T { return withParallel( f )() } export function sequence( f: types.Block ): T { return withSequence( f )() }