All files / centro/lib/server_extensions limit_active.js

100% Statements 39/39
100% Branches 8/8
100% Functions 16/16
100% Lines 39/39

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132                                1x   30x   30x     92x   120x     30x       90x   90x 90x       92x   22x   22x 22x       92x   92x   92x 92x                         1x   30x   30x     100x   90x   30x       60x   60x   60x 60x           60x 60x 60x     60x                       1x   30x   30x     60x   60x   60x     60x   30x            
/*eslint-env node */
 
/**
 * Centro extension for limiting the number of active subscriptions,
 * publications and connections.
 *
 * @module centro-js/lib/server_extensions/limit_active
 */
"use strict";
 
/**
 * Limit the total number of active subscriptions.
 *
 * @param {Object} config - Configuration options.
 * @param {integer} config.max_subscriptions - Maximum number of active subscriptions.
 */
exports.limit_active_subscriptions = function (config)
{
    var count = 0;
    
    return {
        pre_connect: function (info)
        {
            this.pipeline(info.mqserver, 'subscribe_requested', function (topic, cb, next)
            {
                if ((count >= config.max_subscriptions) &&
                    !this.subs.has(topic))
                {
                    return cb(new Error('subscription limit ' + config.max_subscriptions +
                                        ' already reached: ' + topic));
                }
 
                next(topic, function (err, n)
                {
                    count += n;
                    cb.apply(this, arguments);
                });
            });
 
            this.pipeline(info.mqserver, 'unsubscribe_requested', function (topic, cb, next)
            {
                next(topic, function (err, n)
                {
                    count -= n;
                    cb.apply(this, arguments);
                });
            });
 
            this.pipeline(info.mqserver, 'unsubscribe_all_requested', function (cb, next)
            {
                next(function (err, n)
                {
                    count -= n;
                    cb.apply(this, arguments);
                });
            });
        }
    };
};
 
/**
 * Limit the total number of active publications.
 *
 * @param {Object} config - Configuration options.
 * @param {integer} config.max_publications - Maximum number of active publications.
 */
exports.limit_active_publications = function (config)
{
    var count = 0;
 
    return {
        pre_connect: function (info)
        {
            this.pipeline(info.mqserver, 'publish_requested', function (topic, stream, options, cb, next)
            {
                if (count >= config.max_publications)
                {
                    return cb(new Error('publication limit ' + config.max_publications +
                                        ' already reached: ' + topic));
                }
 
                count += 1;
 
                var decrement = function ()
                {
                    count -= 1;
                    cb.apply(this, arguments);
                };
 
                /*jshint validthis: true */
                function cb2()
                {
                    var dec = decrement;
                    decrement = cb; // only decrement once
                    dec.apply(this, arguments);
                }
 
                next(topic, stream, options, cb2);
            });
        }
    };
};
 
/**
 * Limit the total number of active connections.
 *
 * @param {Object} config - Configuration options.
 * @param {integer} config.max_connections - Maximum number of active connections.
 */
exports.limit_active_connections = function (config)
{
    var count = 0;
 
    return {
        authz_start: function (cancel, onclose)
        {
            count += 1;
 
            onclose(function ()
            {
                count -= 1;
            });
 
            if (count > config.max_connections)
            {
                cancel(new Error('connection limit ' + config.max_connections +
                                 ' exceeded'));
            }
        }
    };
};