import {BaseSelector} from './baseSelector';
import {ucfirst} from './core';
if (process.env.RXJS) {
const Rx = require('rxjs');
const RxComponent = require('./rxComponent');
const SUBSCRIBE_NS = 'subscribe.';
class RxSelector extends BaseSelector{
constructor(){
super();
this.$collectionMap_ = {};
}
createEvent(event, isPolling){
let eventObj = this.$collectionMap_[event];
const emitevent = `${SUBSCRIBE_NS}${event}`;
if(!eventObj){
this.$collectionMap_[event] = eventObj = {eventName: event};
eventObj.action = (...args) => {
let onceCallback = args[args.length - 1];
if(typeof onceCallback === 'function'){
if(isPolling){
const pollingCallback = (...args) => {
if(onceCallback(...args) === false){
this.removeListener(event, pollingCallback);
}
}
this.on(event, pollingCallback);
}else{
this.once(event, onceCallback);
}
}
this.emit(emitevent, ...args);
};
eventObj.stream = Rx.Observable.fromEvent(this, emitevent);
}else{
eventObj.subscription && eventObj.subscription.unsubscribe();
}
return eventObj;
}
|
addPureSubscribe(event, callback, immediate){
const eventObj = this.createEvent(event);
const _callback = callback;
callback = () => {
return eventObj.stream.flatMap(_callback);
}
return this.addSubscribe(event, callback, immediate);
}
addSubscribe(event, callback, immediate){
const eventObj = this.createEvent(event);
|
const source = callback(eventObj.stream, eventObj.action, eventObj.polling);
if(source){
eventObj.source = source.cache(1);
this.addSubscription(eventObj);
}
if(immediate){
let timer = setTimeout(() => {
clearTimeout(timer);
eventObj.action();
});
}
return eventObj;
}
pollingSubscribe(event, callback, immediate){
const eventObj = this.createEvent(event, 1);
const stopFlag = {};
const startFlag = {};
eventObj.polling = (opt) => {
const option = Object.assign({
delay: 5000,
data: startFlag,
checkSuccess: (res) => true,
action: null
}, opt);
if(!option.delay){
option.delay = 5000;
}
if(!option.action){
return Rx.Observable.of(stopFlag);
}else{
return Rx.Observable.of(option.data)
.expand(res => {
if(res === startFlag || option.checkSuccess(res)){
const stream = option.action(res, stopFlag);
if(stream instanceof Rx.Observable){
return stream;
}else{
return Rx.Observable.from(stream).delay(option.delay);
}
}else{
return Rx.Observable.of(stopFlag);
}
})
.takeWhile(res => res != stopFlag)
}
}
eventObj.polling.stopFlag = stopFlag;
eventObj.polling.startFlag = startFlag;
return this.addSubscribe(event, callback, immediate);
}
addSubscription(eventObj){
let stream = eventObj.source
.catch(err => {
this.emit(eventObj.eventName, err);
|
return stream;
});
eventObj.subscription = stream.subscribe((...args) => {
this.emit(eventObj.eventName, null, ...args);
}, err => {
this.emit(eventObj.eventName, err);
});
}
getAction(event){
return this.$collectionMap_[event] && this.$collectionMap_[event].action;
}
getActions(){
let actions = {};
for(let key in this.$collectionMap_){
actions[`do${ucfirst(key)}`] = this.$collectionMap_[key].action;
}
return actions;
}
removeSubscription(eventObj){
eventObj.subscription.unsubscribe();
}
getEvent(event){
return this.$collectionMap_[event];
}
select(){
const args = Array.prototype.slice.call(arguments, 0);
if(args[args.length - 1] === true){
return super.select(args[0]);
}else{
const store = this.getAppStore();
if(store.liftedStore && store.liftedStore.select){
return store.liftedStore.select(...args);
}else if(store.select){
return store.select(...args);
}else{
throw new Error('store类型不为Observable,需要更新redux为rxjs-redux');
}
}
}
selectable() {
return RxComponent.selectable(this.select.apply(this, arguments));
}
destroy(){
super.destroy();
for(let key in this.$collectionMap_){
this.$collectionMap_[key].subscription && this.$collectionMap_[key].subscription.unsubscribe();
}
}
}
module.exports = RxSelector;
}
|