All files / centro/lib/server_extensions timeout.js

100% Statements 27/27
100% Branches 7/7
100% Functions 15/15
100% Lines 27/27

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104                  1x               1x   30x     38x   30x   30x     30x   30x 30x                         1x   30x     46x   30x   30x     30x   30x   30x   30x     30x                           1x   8x     16x   16x   8x   16x   8x   8x                
/*eslint-env node */
 
/**
 * Centro extension for limiting how long message streams can be open.
 *
 * @module centro-js/lib/server_extensions/timeout
 */
"use strict";
 
var url = require('url');
 
/**
 * Limit how long a client can take to publish a message.
 *
 * @param {Object} config - Configuration options.
 * @param {integer} config.timeout - Time limit in milliseconds.
 */
exports.timeout_publish_streams = function (config)
{
    return {
        pre_connect: function (info)
        {
            this.pipeline(info.mqserver, 'publish_requested', function (topic, duplex, options, cb, next)
            {
                var t = setTimeout(function ()
                {
                    duplex.emit('error', new Error('timeout'));
                }, config.timeout);
 
                next(topic, duplex, options, function ()
                {
                    clearTimeout(t);
                    cb.apply(this, arguments);
                });
            });
        }
    };
};
 
/**
 * Limit how long a client can take to read a message.
 *
 * @param {Object} config - Configuration options.
 * @param {integer} config.timeout - Time limit in milliseconds.
 */
exports.timeout_message_streams = function (config)
{
    return {
        pre_connect: function (info)
        {
            this.pipeline(info.mqserver, 'message', function (stream, info, multiplex, cb, next)
            {
                var t = setTimeout(function ()
                {
                    stream.emit('error', new Error('timeout'));
                }, config.timeout);
 
                next(stream, info, function ()
                {
                    var duplex = multiplex.apply(this, arguments);
 
                    duplex.on('finish', function ()
                    {
                        clearTimeout(t);
                    });
 
                    return duplex;
                }, cb);
            });
        }
    };
};
 
/**
 * Limit how long the HTTP transport allows a client to take to publish a
 * message.
 *
 * @param {Object} config - Configuration options.
 * @param {integer} config.http_publish_timeout - Time limit in milliseconds.
 */
exports.timeout_http_publish_requests = function (config)
{
    return {
        transport_ready: function (tconfig, ops)
        {
            var cfg = Object.assign({}, config, tconfig);
 
            if (cfg.http_publish_timeout && ops.server && ops.pub_pathname)
            {
                ops.server.on('request', function (req)
                {
                    if (url.parse(req.url).pathname === ops.pub_pathname)
                    {
                        req.setTimeout(cfg.http_publish_timeout, function ()
                        {
                            req.destroy(new Error('timeout'));
                        });
                    }
                });
            }
        }
    };
};