# gent-hub

> 基于RxJS@5，集中管理数据流。

## 特性

- 概念简单，小巧、灵活、方便
- 数据统一管理，不分框架，一处定义多处使用
- 无缝结合`RxJS 5`
- 数据分层，逻辑清晰，便于单元测试
- 管道+流+中间件，便于调试、数据变换

## 安装

```sh
npm install gent-hub rxjs -S
```

```js
import Hub from 'gent-hub';

// or use es6 source
import Hub from 'gent-hub/index';
```

## 数据流 `flow`

数据流 `flow` 表示一个数据从开始到结束的整个流程，比如下面是一个简单的数据流:

```
点击按钮，flow 开始（事件参数）
    ▼
封装请求数据（事件参数 => 请求参数）
    ▼
请求数据（请求参数 => 服务器返回数据）
    ▼
返回数据处理（服务器返回数据 => 封装后的数据）
    ▼
到底终点（展示或者存储数据）
```

> 一个数据流可能只有一个数据（从起点流到终点），也可以有一系列数据（分别各自按照一定的顺序从起点流向终点）。

## 管道 `pipe`。

上面的数据流`flow`中，每一行可以理解为数据流中一段，称为一个`管道 pipe`。每个`pipe`有一个数据流入，和一个数据流出。

一个管道是一个`纯函数`，接受一个数据`data`, 返回一个可以作为[Rx.Observable.form](http://reactivex.io/rxjs/class/es6/Observable.js~Observable.html#static-method-from)的参数的值。

- 参数 `data`: 上一个管道流入这个管道的`值`。
- 返回值 `result`: 可以作为`Rx.Observable.form`的参数: 数组、Promise 实例，Rx.Observable实例等。

相关管道例子：

```js
// pipes/test.js

import Rx from 'rxjs/Rx';

// 返回一个固定的值
export function fromValue(data) {
  return [data];
}

// 返回一个promise对象
export function fromPromise(data) {
  return Promise.resolve(data);
}

// 返回一个 Observable 流
export function fromObservable(data) {
  return Rx.Observable.of(data);
}

// 数据变换
export function transform(data) {
  data.test += '-123456';
  return [data];
}

// 中间件
export function middleware(data, opts) {
  data.test += '-' + opts.type + '-' + opts.pipeName;
  return [data];
}

// 产生异常
export function throwError() {
  throw new Error('pipe error');
}
```

## 创建数据板实例 `hub`, 并添加管道、中间件

`hub`用来挂载和方便地获取`管道 pipe`, 并给管道添加前置和后置中间件。

实例Hub，`const hub = new Hub(options)`, options 参数如下：

- `options.Observable`: Rx.Observable，详见[Observable](http://reactivex.io/rxjs/manual/overview.html#observable) 和 [RxJS Operators](http://reactivex.io/rxjs/manual/overview.html#operators)。
- `options.beforeMiddlewares`: 前置中间件列表。
- `options.afterMiddlewares`: 后置中间件列表。

```js
// hubs/test.js

import Rx from 'rxjs/Rx';
import Hub from 'gent-hub';
import * as pipes from '../pipes/test';

const hub = new Hub({
  Observable: Rx.Observable,
  beforeMiddlewares: [Hub.logMiddleware],
  afterMiddlewares: [Hub.logMiddleware]
});

hub.addPipes('test', pipes);

export default hub;
```

## Hub静态方法或属性

- `Hub.logMiddleware`: 内置的中间件, 用于打印每一个管道的数据流动情况。

## Hub实例`hub`的方法或属性

- `hub.form`: 等同于`Rx.Observable.from`，创建一个`Rx.Observable`实例。
- `hub.addPipe(name, pipe)`: 添加一个管道。
- `hub.addPipes(scope, pipes)`: 添加一批管道。`scope`定义这些管道的上下文，`pipes`是`{name: pipe}`的格式。
- `hub.getPipe(name)`: 获取一个安装在`hub`上的管道。
- `hub.addMiddleware(type, middleware)`: 添加中间件，`type`可取值`before`和`after`。
- `hub.removeMiddleware(type, middleware)`: 移除中间件，`type`可取值`before`和`after`。
- `hub.flow(observableSource)`: 开始一个数据流`Flow`，返回`Flow`实例。
- `flowAll(args=[])`: 合并多个数据源开始一个数据流`Flow`，返回`Flow`实例。

## 数据流`Flow`的实例`flow`

- `flow.use(pipe, operator='concatMap')`: 流经下一个管道，返回一个新的`flow`。pipe 如果是`string`则取当前hub上得管道:`hub.getPipe(pipe)`， 否则`pipe`为一个实际的管道。`operator`为[RxJS Operators](http://reactivex.io/rxjs/manual/overview.html#operators)。
- `flow.subscribe(...args)`: 订阅。`let subscription = flow.subscribe(next, error, complete)`或者`let subscription = flow.subscribe({next => {}, error => {}, complete => {}})`。 可以调用`subscription.unsubscribe()`取消订阅。
- `flow.toObservable()`: 返回`Rx.Observable`实例。
- `flow.operator(operatorName)`: 返回一个类[RxJS Operator](http://reactivex.io/rxjs/manual/overview.html#operators)，执行这个`Operator`返回一个新的`flow`。

## 中间件

中间件也是一种管道，添加在`hub`上得每一个`pipe`的入口和出口，用以统一监控、变换、调试数据。

## 实例

```js
import hub from '../hus/test';

// 开始数据流并订阅
let addUserSubs;

// 添加数据
function addUser() {
  // 放弃 上次订阅
  if (addUserSubs) addUserSubs.unsubscribe();
  // 开始新的数据流
  let addUserSubs = hub.flow([1])
    .use('test.fromObservable')
    .use('test.transform')
    .use(function customPipe(data) {
      return [data];
    })
    .subscribe(
      data => {
        console.log(data);
      },
      error => {
        console.log(error);
      }
    );
}

// 合并多个流
let ggSubs;
function getUserAndGetFood() {
  // 放弃 上次订阅
  if (ggSubs) ggSubs.unsubscribe();

  let observableUser = hub.flow([userId])
    .use('xxx.xxx')
    .use('xxx.xxx');
  let observableFood = hub.flow([foodId])
    .use('xxx.xxx')
    .use('xxx.xxx');;

  // 合并多个数据流
  let ggSubs = hub.flowAll([
    observableUser, 
    observableFood
  ])
    .use('test.fromObservable')
    .use('test.fromPromise')
    .use(function customPipe(data) {
      return [data];
    })
    .subscribe(
      ([user, food]) => {
        console.log(user, food);
      },
      error => {
        console.log(error);
      }
    );
}

// test operator
hub.flow([1, 2, 3, 4, 5])
.operator('filter')(item => {
  return item > 3
})
.use((data) => {
  return [{
    test: data
  }]
})
.subscribe(
  (data) => {}, 
  (error) => {}
);

// 可以取消订阅
subscription.unsubscribe();
```