All files / rxmq.js/src channel.js

96.55% Statements 28/29
86.36% Branches 19/22
100% Functions 3/3
96.43% Lines 27/28
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144                                8x           8x                   8x           8x           8x     8x                     48x 16x 16x 9x 9x 9x 9x   16x                               13x 4x     9x 9x                                           8x 2x 2x         2x 2x 2x                 9x 9x         9x                    
import { AsyncSubject, Observable } from 'rxjs';
import { filter, mergeAll } from 'rxjs/operators';
import { EndlessReplaySubject, EndlessSubject } from './rx/index';
import { compareTopics, findSubjectByName } from './utils/index';
 
/**
 * Rxmq channel class
 */
class Channel {
  /**
   * Represents a new Rxmq channel.
   * Normally you wouldn't need to instantiate it directly, you'd just work with existing instance.
   * @constructor
   * @param  {Array}   plugins  Array of plugins for new channel
   * @return {void}
   */
  constructor(plugins = []) {
    /**
     * Internal set of utilities
     * @type {Object}
     * @private
     */
    this.utils = {
      findSubjectByName,
      compareTopics,
    };
 
    /**
     * Instances of subjects
     * @type {Array}
     * @private
     */
    this.subjects = [];
    /**
     * Channel bus
     * @type {EndlessReplaySubject}
     * @private
     */
    this.channelBus = new EndlessReplaySubject();
    /**
     * Permanent channel bus stream as Observable
     * @type {Observable}
     * @private
     */
    this.channelStream = this.channelBus;
 
    // inject plugins
    plugins.map(this.registerPlugin.bind(this));
  }
 
  /**
   * Returns EndlessSubject representing given topic
   * @param  {String}         name           Topic name
   * @return {EndlessSubject}             EndlessSubject representing given topic
   * @example
   * const channel = rxmq.channel('test');
   * const subject = channel.subject('test.topic');
   */
  subject(name, { Subject = EndlessSubject } = {}) {
    let s = this.utils.findSubjectByName(this.subjects, name);
    if (!s) {
      s = new Subject();
      s.name = name;
      this.subjects.push(s);
      this.channelBus.next(s);
    }
    return s;
  }
 
  /**
   * Get an Observable for specific set of topics
   * @param  {String}         name        Topic name / pattern
   * @return {Observable}                 Observable for given set of topics
   * @example
   * const channel = rxmq.channel('test');
   * channel.observe('test.topic')
   *        .subscribe((res) => { // default Observable subscription
   *            // handle results
   *        });
   */
  observe(name) {
    // create new topic if it's plain text
    if (name.indexOf('#') === -1 && name.indexOf('*') === -1) {
      return this.subject(name);
    }
    // return stream
    return this.channelStream.pipe(
      filter(obs => compareTopics(obs.name, name)),
      mergeAll()
    );
  }
 
  /**
   * Do a request that will be replied into returned AsyncSubject
   * Alias for '.request()' that uses single object as params
   * @param  {Object}  options                   Request options
   * @param  {String}  options.topic             Topic name
   * @param  {Any}     options.data              Request data
   * @param  {Object}  options.DefaultSubject    Response subject, defaults to AsyncSubject
   * @return {AsyncSubject}                      AsyncSubject that will dispatch the response
   * @example
   * const channel = rxmq.channel('test');
   * channel.requestTo({
   *     topic: 'test.topic',
   *     data: 'test data',
   * }).subscribe((response) => { // default Observable subscription
   *     // handle response
   * });
   */
  request({ topic, data, Subject = AsyncSubject }) {
    const subj = this.utils.findSubjectByName(this.subjects, topic);
    Iif (!subj) {
      return Observable.never();
    }
 
    // create reply subject
    const replySubject = new Subject();
    subj.next({ replySubject, data });
    return replySubject;
  }
 
  /**
   * Channel plugin registration
   * @param  {Object} plugin Plugin object to apply
   * @return {void}
   */
  registerPlugin(plugin) {
    for (const prop in plugin) {
      Eif (!this.hasOwnProperty(prop)) {
        /**
         * Hide from esdoc
         * @private
         */
        this[prop] = plugin[prop];
      }
    }
  }
}
 
/**
 * Channel definition
 */
export default Channel;