4.0.0Pub-sub and work queue server. Wildcards, streams, back-pressure, multi-transport. Just Node and a filesystem required.
Re-exports all exports of centro-js/lib/client and centro-js/lib/server
Centro client functions
Get authorization data for transports which use HTTP Basic Authentication (currently Primus) and then initiate communication with a server on a stream you supply.
(Object?)
Configuration options. This supports all the options supported by
MQlobberClient
as well as the following:
| Name | Description |
|---|---|
config.token (string | Array<string>)?
|
JSON Web Token(s) to present to the server. |
config.max_subscriptions integer?
|
If the server returns pre-subscription data (see the
ready
event), emit an
error
event if there are more entries than this maximum. If not specified, no limit is applied.
|
config.max_topic_length integer?
|
If the server returns pre-subscription data (see the
ready
event), emit an
error
event if one of the topics exceeds this length. If not specified, no limit is applied.
|
(centro-js/lib/client.authzCallback)
Called with authorization data.
Authorize with a server on a stream, for transports which send authorization data as a stream header (currently all except Primus and HTTP).
(stream.Duplex)
Connection you've already made to the server.
(Object?)
Configuration options. This supports all the options supported by
MQlobberClient
as well as the following:
| Name | Description |
|---|---|
config.token (string | Array<string>)?
|
JSON Web Token(s) to present to the server. |
config.max_subscriptions integer?
|
If the server returns pre-subscription data (see the
ready
event), emit an
error
event if there are more entries than this maximum. If not specified, no limit is applied.
|
config.max_topic_length integer?
|
If the server returns pre-subscription data (see the
ready
event), emit an
error
event if one of the topics exceeds this length. If not specified, no limit is applied.
|
MQlobberClient:
Object you can use for publishing and subscribing to messages. See the
mqlobber documentation
.
Callback type for HTTP Basic Authentication data.
Type: Function
(Error?)
Error, if one occurred.
(string)
Authentication data in the form
centro:<tokens>
where
<tokens>
a comma-separated list of tokens you passed to
centro-js/lib/client.separate_auth
.
(streamCallback)
Call this when you've made a connection to the server.
Callback type for connection to server.
Type: Function
(stream.Duplex)
Connection you've made to the server.
MQlobberClient:
Object you can use for publishing and subscribing to messages. See the
mqlobber documentation
.
Ready event. This is an extra event added to MQlobberClient and is emitted when the server has authorized the client and a connection is established.
(Array<Object<string, boolean>>?)
For each authorization token you supplied to
centro-js/lib/client.separate_auth
or
centro-js/lib/client.stream_auth
, a map containing the topics to which the client has been pre-subscribed. Each topic maps to whether the client will receive existing messages for the topic (
true
) or just new ones (
false
). If no tokens specified any pre-subscriptions then this will be
undefined
.
Centro server class
Ready event. Emitted when the server is ready to accept connections.
Transport ready event. Emitted when an individual transport has been loaded and is ready to accept connections.
(Object)
Operations you can perform on the transport, including:
(centro-js/lib/server.closeCallback)
Close the transport.
Authorization start event. This is the first event emitted when a client connects.
(centro-js/lib/server.cancelCallback)
Call this to cancel authorization.
(centro-js/lib/server.oncloseCallback)
Call this to register a handler when the connection is closed.
(Object)
What's being authorized. This is either a HTTP request (with a
url
property) or a stream.
Authorization end event. Emitted when a client has been accepted or rejected.
(Array<{payload: Object, uri: string, rev: rev}>?)
For each authorization token the client presented, its payload, issuer URI and public key revision (see
authorize-jwt
).
(Array<{string}>?)
The raw authorization tokens.
Pre-connect event. Emitted after authz_end but before connect. The client connection has been authorized but handshake data has not yet been received.
(Object)
Information about the connection.
| Name | Description |
|---|---|
info.mqserver MQlobberServer
|
MQlobberServer object managing the connection. |
info.access_control AccessControl
|
AccessControl object enforcing access control on the connection. |
info.connid string
|
Unique ID for the connection. |
info.prefixes Array<string>
|
For each authorization token the client presented, a prefix which
info.access_control
enforces on topics. The prefix is a hash of the token's issuer ID, thus preventing clients with tokens from different issuers sending and receiving messages to each other. That is, messages are 'scoped' by token issuer.
|
info.destroy centro-js/lib/server.destroyCallback
|
Call this to close the connection. |
info.onclose centro-js/lib/server.oncloseCallback
|
Call this to register a handler when the connection is closed. |
info.token_infos Array<{payload: Object, uri: string, rev: rev}>
|
For each authorization token the client presented, its payload, issuer URI and public key revision (see authorize-jwt ). |
Connect event. Emitted when handshake data has been received on a connection.
(Buffer)
Handshake received from the client.
Disconnect event. Emitted when a client disconnects.
(string)
Unique ID for the connection.
Expired event. Emitted when a token presented by a client expires whilst it is connected. After the event is emitted, the connection is destroyed.
(string)
Unique ID for the connection.
Empty event. Emitted when a client disconnects and there are no other clients connected.
Close event. Emitted when the server has been closed (see close).
Primus transport. This allows messages to be sent over Primus connections.
(Object)
Configuration options. This supports all the options supported by
Primus.createServer
and
PrimusDuplex
as well as the following:
| Name | Description |
|---|---|
config.server Primus?
|
If you want to supply your own Primus server object. Otherwise,
Primus.createServer
will be called to create one.
|
config.pathname string
(default /centro/v2/primus)
|
Pathname prefix on which Primus listens for connections. |
config.primus_transport Object?
|
Primus transformer-specific configuration. |
config.primus Object?
|
If present then this is used in preference to
config
.
|
TCP transport. This allows messages to be sent over TCP connections.
(Object)
Configuration options. This supports all the options supported by
net.Server#listen
,
net.createServer
and
tls.createServer
as well as the following:
| Name | Description |
|---|---|
config.server (net.Server | tls.Server)?
|
If you want to supply your own TCP or TLS server object. Otherwise, net.createServer or tls.createServer will be called to create one. |
config.noDelay boolean?
|
If present then setNoDelay is called on every connection with this as the argument. |
config.tcp Object?
|
If present then this is used in preference to
config
.
|
HTTP transport. This allows messages to be published using HTTP POST requests and received using HTTP Server-Sent Events.
Publish messages using POST requests of the form:
/centro/v2/publish?authz_token=XXX&topic=YYY
XXX is a JSON Web Token allowing access to the server. See here for more information about Centro authorization tokens.YYY is the message's topic. Topics should be in AMQP format: . delimits words.Subscribe to messages using GET requests of the form:
/centro/v2/subscribe?authz_token=XXX&topic=YYY&topic=ZZZ
XXX is the authorization token allowing access to the server.YYY is the message topic to which to subscribe. Topics should be in AMQP format: . delimits words, * matches exactly one word and # matches zero or more words.ZZZ above.Messages you've subscribed to are delivered using server-sent events. Each
message begins with a start event, continues with multiple data events
and finishes with an end event.
For example, consider a message with topic foo.bar and body wow. You
could receive the following events:
type: start
data: {"id":0,"single":false,"existing":false,"expires":1498375188,"size":3,"topic":"foo.bar"}
type: data
data: {"id":0,"data":"wow"}
type: end
data: {"id":0}
For larger messages you may receive multiple data messages. The event
data for each event is JSON-encoded and always contains a message id
(in this case 0). If you're receiving multiple messages at the same time,
their events may be interleaved so you need to use the id to tell which
event is for which message. The id identifies the message for this
connection only - it isn't globally unique and the same message may have a
different ids on different connections.
In the start event, you'll receive whether the messages is being delivered
to a single subscriber, whether it's an existing message (i.e. published
before we subscribed), when it expires (in seconds since 1970-01-01),
the size of its body data and its topic.
In the data event, you'll receive (part of) the message's body (wow in
this case). Since this is JSON-encoded, the data will be a UTF-8 encoded
string, even for binary data. If you need to get the raw binary data,
encode the string as latin-1 and use the bytes in the result (the server
decodes the bytes as latin-1 before JSON encoding the result).
Note that server-sent event IDs and the Last-Event-ID header are not
supported because when a connection drops, the server forgets all about it.
(Object)
Configuration options. This supports all the options supported by
net.Server#listen
and
https.createServer
as well as the following:
| Name | Description |
|---|---|
config.server (http.Server | https.Server)?
|
If you want to supply your own HTTP or HTTPS server object. Otherwise, http.createServer or https.createServer will be called to create one. |
config.pathname string
(default /centro/v2/)
|
Pathname prefix on which to listen for requests. Publish requests are made on
/centro/v2/publish
and subscribe requests are made on
/centro/v2/subscribe
.
|
config.sse_keep_alive_interval integer?
|
If present, the number of seconds between sending a Server-Sent Event comment in order to keep a connection alive. If you don't specify this, no keep-alive comments will be sent. |
config.destroy_after_end_delay integer
(default 500)
|
Amount of time (in milliseconds) to wait between end ing a response and destroy ing the connection, if an error occurs which means the connection can't be re-used. If you don't want the connection forcibly destroyed, set this to a negative value. |
config.http Object?
|
If present then this is used in preference to
config
.
|
config.access Object?
|
Passed to access-control . |
config.http2 boolean?
|
Whether to use
HTTP/2
(
http2.createServer
/
http2.createSecureServer
) instead of HTTP. Note this still uses Server-Sent Events to deliver messages. If you want to use HTTP/2 streams to deliver messages, see the
http2
and
http2-duplex
transports. Defaults to
false
.
|
HTTP/2 transport. This allows messages to be sent over HTTP/2 streams.
Note that this transport won't work with browser clients. You'll need to use the http2-duplex transport with browsers.
(Object)
Configuration options. This supports all the options supported by
http2.createServer
,
http2.createSecureServer
and
net.Server#listen
as well as the following:
| Name | Description |
|---|---|
config.server (http2.Http2Server | http2.Http2SecureServer)?
|
If you want to supply your own HTTP/2 server object. Otherwise, http2.createServer or { https://nodejs.org/api/http2.html#http2_http2_createsecureserver_options_onrequesthandler|http2.createSecureServer} will be called to create one. |
config.pathname string
(default /centro/v2/http2)
|
Pathname prefix on which to listen for requests. |
config.http2 Object?
|
If present then this is used in preference to
config
.
|
config.access Object?
|
Passed to access-control . |
HTTP/2 transport for browsers. This allows messages to be sent over HTTP/2 streams to browsers.
If your client isn't a browser, you should be able to use the http2 transport.
This transport uses the http2-duplex module to emulate a full-duplex connection with browsers.
Browser-to-server streaming isn't implemented by any browser, nor are there
any plans to do so. http2-duplex emulates it by using POST requests.
(Object)
Configuration options. This supports all the options supported by
http2.createServer
,
http2.createSecureServer
,
net.Server#listen
and
http2-duplex
as well as the following:
| Name | Description |
|---|---|
config.server centro-js/lib/server_transports/http2-duplex.CentroHttp2DuplexServer?
|
If you want to supply your own HTTP/2 full-duplex emulation server. This class inherits from
Http2DuplexServer
in
http2-duplex
. If you don't supply an instance (the default) then one will be created for you, along with an instance of
Http2Server
or
Http2SecureServer
.
|
config.pathname string
(default /centro/v2/http2-duplex)
|
Pathname prefix on which to listen for requests. |
config.http2_duplex Object?
|
If present then this is used in preference to
config
.
|
config.access Object?
|
Passed to access-control . |
In-memory transport. This allows messages to be sent over an in-memory stream to a server running in the same process. The in-memory stream has minimal overhead (the messages aren't copied).
An extra transport operation for this transport is added to the server object:
| Name | Type |
|---|---|
| connect | connectCallback |
Callback type for connecting to in-memory transport.
Type: Function
(Error?)
Error, if one occurred.
Create a server which publishes and subscribes to messages on behalf of clients.
Extends events.EventEmitter
(Object)
Configuration options. This supports all the options supported by
MQlobberServer
and
AccessControl
as well as the following:
| Name | Description |
|---|---|
config.transport (string | Object)?
|
Specifies which transport to load into the server. This should be either the transport's name or its configuration, with its name in the
server
property.
|
config.transports Array<(string | Object)>?
|
Specifies multiple transports to load into the server. |
config.authorize Function?
|
Function for creating an object which can authorize
JSON Web Tokens
presented by clients. Defaults to
authorize-jwt
. If you supply your own, it must comply with authorize-jwt's API.
authorize_config
or
config
is passed as the first argument.
|
config.authorize_config Object?
|
Configuration specific to
config.authorize
.
|
config.allowed_algs Array<string>
|
Which JWT algorithms are allowed. Clients which present tokens which aren't signed with one of these algorithms are rejected. You must supply this list. |
config.privileged boolean
(default false)
|
If true, clients can see messages from all token issuers. If false (the default), a client can only see messages sent by another client if they were both authorized using a token with the same issuer. You can make clients on individual transports privileged by setting this property in their transport-specific config (
config.transports[X].config.privileged
).
|
config.fsq (QlobberFSQ | QlobberPG)?
|
QlobberFSQ
or
QlobberPG
instance for handling messages. Defaults to a new instance of
QlobberFSQ
, passing
fsq_config
or
config
to the constructor.
|
config.fsq_config Object?
|
Configuration specific to
config.fsq
.
|
config.realm string
(default centro)
|
When authorization of a client fails, the transport is given information it can (optionally) use when rejecting the connection. This includes a HTTP authentication header specifying this realm. |
config.auth_method string
(default Bearer)
|
When authorization of a client fails, the transport is given information it can (optionally) use when rejecting the connection. This includes a HTTP authentication header specifying this authentication method. Note that the client can opt to use Bearer or Basic authentication regardless of this setting. |
config.max_tokens integer
(default 10)
|
Maximum number of authorization tokens each client is allowed to present. |
config.max_token_length integer
(default 1MiB)
|
Maximum size of authorization token each client is allowed to present. |
config.max_topic_length integer
(default 4KiB)
|
Maximum length of topics the client can specify, in subscription and publish requests and in the authorization token. |
config.max_issuer_length integer
(default 128)
|
Maximum length of the
iss
claim in authorization tokens.
|
config.max_subscribe_topics integer
(default 1000)
|
Maximum number of topics that
subscribe
claims in authorization tokens can contain. These claims specify to which topics client which present them are pre-subscribed.
|
config.max_allow_publish_topics integer
(default 1000)
|
Maximum number of topics that
access_control.publish.allow
claims in authorization tokens can contain. These claims are passed to
AccessControl
and specify to which topics clients which present them can publish messages.
|
config.max_disallow_publish_topics integer
(default 1000)
|
Maximum number of topics that
access_control.publish.disallow
claims in authorization tokens can contain. These claims are passed to
AccessControl
and specify to which topics clients which present them cannot publish messages.
|
config.max_allow_subscribe_topics integer
(default 1000)
|
Maximum number of topics that
access_control.subscribe.allow
claims in authorization tokens can contain. These claims are passed to
AccessControl
and specify to which topics clients which present them cannot subscribe.
|
config.max_disallow_subscribe_topics integer
(default 1000)
|
Maximum number of topics that
access_control.subscribe.disallow
claims in authorization tokens can contain. These claims are passed to
AccessControl
and specify to which topics clients which present them cannot subscribe.
|
config.max_block_topics integer
(default 1000)
|
Maximum number of topics that
access_control.block
claims in authorization tokens can contain. These claims are passed to
AccessControl
and specify which messages aren't delivered to clients which present them.
|
config.max_presence_data_length integer
(default 1MiB)
|
Maximum length of
presence.connect.data
and
presence.disconnect.data
claims in authorization tokens can contain. The
presence
claims specify messages to send when clients which present them connect and disconnect.
|
QlobberFSQ or QlobberPG instance being used to handle messages.
Type: (QlobberFSQ | QlobberPG)
Operations you can perform on transports you specified when constructing CentroServer. Transports can define their own operations in addition to close. Indexable by position or transport name.
Type: Array<{close: centro-js/lib/server.closeCallback}>
Close the server.
(centro-js/lib/server.closeCallback)
Called when the server has been closed.
Centro extension for defining backoff event behaviour.
Attach behaviour to the backoff event.
(Object)
Configuration options.
| Name | Description |
|---|---|
config.close_conn boolean?
|
Close connection when a
backoff
event occurs.
|
config.delay_responses boolean?
|
When a
backoff
event occurs, delay responses to subscribe, unsubscribe and publish requests from the client until a
drain
event occurs.
|
config.skip_message boolean?
|
When a
backoff
event occurs, drop messages that would be sent to the client until a
drain
event occurs.
|
config.on_skip centro-js/lib/server_extensions/backoff.skipCallback?
|
Called when a message is dropped. |
config.delay_message boolean?
|
When a
backoff
event occurs, postpone messages that would be sent to the client until a
drain
event occurs.
|
config.on_delay centro-js/lib/server_extensions/backoff.delayCallback?
|
Called when a message is postponed. |
Centro extension for closing a connection when an error occurs on it.
Close a connection when an errors occurs on it.
Centro extensions for filtering messages.
Postpone sending a message until all existing message streams to the message's recipients are under their high-water mark.
(Object)
Configuration options.
| Name | Description |
|---|---|
config.on_delay centro-js/lib/server_extensions/filter.delayCallback?
|
Called when a message is postponed. |
Send messages at the rate that the fastest client connection can handle. Normally, messages are sent at the rate of the slowest client.
You should attach this extension last but before dummy_data_event.
(Object?)
Configuration option. This is passed to
FastestWritable
and also supports the following options:
| Name | Description |
|---|---|
config.on_fw centro-js/lib/server_extensions/filter.fastestWritableCallback?
|
Called with the FastestWritable object constructed for each message stream. |
Emit an empty data event on a message stream after it's piped to a client
connection. This enables the message stream immediately to determine when a
connection is at full capacity. Otherwise, the message stream must read
initial message data and write it to the connection in order to make this
determination.
You should attach this extension last.
Callback type for creation of a FastestWritable for a message stream.
Type: Function
(FastestWritable)
FastestWritable
that was created for the message stream.
(stream.Readable)
Message stream.
Centro extension for defining full event behaviour.
Attach behaviour to the full event.
(Object)
Configuration options.
| Name | Description |
|---|---|
config.close_conn boolean?
|
Close connection when a
full
event occurs.
|
config.skip_message boolean?
|
When a
full
event occurs, drop messages that would be sent to the client until a
removed
event occurs.
|
config.on_skip centro-js/lib/server_extensions/full.skipCallback?
|
Called when a message is dropped. |
config.delay_message boolean?
|
When a
full
event occurs, postpone messages that would be sent to the client until a
removed
event occurs.
|
config.on_delay centro-js/lib/server_extensions/full.delayCallback?
|
Called when a message is postponed. |
Centro extension for limiting the number of active subscriptions, publications and connections.
Centro extension for limiting the total amount of data published and the total number of messages published by a connected client.
Centro extension for limiting the number of concurrent connections each transport supports.
Limit the number of connections each transport supports. Doesn't apply to the in-mem transport.
(Object)
Configuration options.
| Name | Description |
|---|---|
config.max_transport_connections integer
|
Maximum number of concurrent connections accepted by each transport. |
Centro extension for limiting the rate of messages streams.
Centro extension for limiting how long message streams can be open.