import {
hasOwnProperty,
isDefined,
isSet,
valueOrDefault
} from "./util/utils.js";
import ConsoleLogger from "./logger/ConsoleLogger.js";
import SilentLogger from "./logger/SilentLogger.js";
import InMemoryStore from "./store/InMemoryStore.js";
/**
* Pubst - A slightly opinionated pub/sub library for JavaScript.
*
* Copyright 2017-2026 Jason Schindler
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* @file Pubst.js
* @author Jason Schindler
* @copyright Jason Schindler 2017-2026
* @description A slightly opinionated pub/sub library for JavaScript.
*/
const DEFAULT_TOPIC_CONFIG = {
name: '',
default: undefined,
eventOnly: false,
doPrime: true,
allowRepeats: false,
storeConfig: {}
};
const ALLOWED_SUB_PROPS = [
'topic',
'handler',
'default',
'doPrime',
'allowRepeats'
];
function buildConfig(base, extensions) {
const result = {};
for (const key in base) {
result[key] = base[key];
}
for (const key in extensions) {
if (hasOwnProperty(result, key)) {
result[key] = extensions[key];
}
}
return result;
}
/**
* @typedef {Object} PubstConfig
* @property {Object} [logger] - Logger to send warning messages to.
* @property {boolean} [showWarnings] - If logger isn't provided, switches between ConsoleLogger and SilentLogger.
* @property {Object} [store] - A store implementation for persisting topic values.
* @property {Array<TopicConfig>} [topics] - An array of topic configurations.
*/
/**
* @typedef {Object} TopicConfig
* @property {string} name - The name of the topic (REQUIRED).
* @property {*} [default] - The default value presented to subscribers when the topic is undefined or null.
* @property {boolean} [eventOnly=false] - Set to true if this topic will not have payload data.
* @property {boolean} [doPrime=true] - Should new subscribers automatically receive the last published value?
* @property {boolean} [allowRepeats=false] - Alert subscribers of all publish events, even if unchanged.
* @property {Object} [storeConfig={}] - Store-specific configuration passed to the store's registerTopic method.
*/
/**
* @typedef {Object} SubscriptionConfig
* @property {Function} handler - The handler to call when the topic is updated.
* @property {*} [default] - Default value for this subscription.
* @property {boolean} [doPrime=true] - Should the handler be primed with the last value?
* @property {boolean} [allowRepeats=false] - Should the handler be called when the value doesn't change?
*/
/**
* @summary A slightly opinionated pub/sub utility for Javascript
*/
class Pubst {
#logger = new ConsoleLogger();
#store = new InMemoryStore();
#stringSubs = {};
#fnSubs = [];
#topics = {};
/**
* @summary Creates a new Pubst instance.
*
* @description
* <p>
* Creates a new Pubst instance. The instance is ready to use immediately
* with default settings. Call `await configure()` if you need to customize
* the logger, store, or pre-register topics.
* </p>
*
* @example
* const pubst = new Pubst();
* await pubst.configure({ showWarnings: false });
*/
constructor() {
}
/**
* @summary Set Pubst configuration.
*
* @param {PubstConfig} userConfig - Your configuration
*
* @returns {Promise<void>}
*
* @description
* <p>
* Available options are:
* <ul>
* <li>`logger` (default: ConsoleLogger) - Logger to send warning messages to.</li>
* <li>`showWarnings` - If logger isn't provided, this option switches between the use of ConsoleLogger and SilentLogger</li>
* <li>`store` (default: InMemoryStore) - A store implementation for persisting topic values.
* Custom stores must implement the same async interface as InMemoryStore:
* `registerTopic`, `getValue`, `setValue`, `clearValue`, and `getTopicNames`.</li>
* <li>`topics` - An array of topic configurations. (See: `addTopic` for topic configuration options)</li>
* </ul>
* </p>
*/
async configure(userConfig = {}) {
if (userConfig.logger) {
this.#logger = userConfig.logger
} else if (hasOwnProperty(userConfig, 'showWarnings') && !userConfig.showWarnings) {
this.#logger = new SilentLogger();
}
if (userConfig.store) {
this.#store = userConfig.store;
}
if (Array.isArray(userConfig.topics)) {
await this.addTopics(userConfig.topics);
}
}
/**
* @summary Configure a new topic.
*
* @param {TopicConfig} newTopicConfig - Topic configuration
*
* @returns {Promise<Object>} Resolves with the result of registering
* the topic in the store.
*
* @description
* <p>
* Allows you to configure a new topic in pubst.
*
* Available options are:
* <ul>
* <li>`name` (<strong>REQUIRED</strong>) - A string representing the name of the topic.</li>
* <li>
* `default` (default: undefined) - The default value presented to subscribers when the topic is undefined or null.
* This can be overridden by subscribers.
* </li>
* <li>
* `eventOnly` (default: false) - Set this to true if this topic will not have payload data.
* </li>
* <li>
* `doPrime` (default: true) - Should new subscribers automatically receive the last published value on the topic?
* If no valid value is present, new subscribers will be primed with the default value (if configured).
* This can be overridden by subscribers.
* </li>
* <li>
* `allowRepeats` (default: false) - Alert subscribers of all publish events, even if the value is equal (by strict comparison) to the last value sent.
* This can be overridden by subscribers.
* </li>
* <li>
* `storeConfig` (default: {}) - Store-specific configuration that is passed through to the store's
* `registerTopic` method. This allows custom store implementations to receive topic-level
* configuration (e.g. persistence keys, TTL settings, etc.).
* </li>
* </ul>
* </p>
*/
async addTopic(newTopicConfig) {
const topic = buildConfig(DEFAULT_TOPIC_CONFIG, newTopicConfig);
if (!topic.name) {
throw new Error('Topics must have a name.');
}
if (this.#topics[topic.name]) {
this.#logger.warn(
'Pubst.addTopic',
`The '${topic.name}' topic has already been configured. The previous configuration will be overwritten.`
);
}
this.#topics[topic.name] = topic;
return await this.#store.registerTopic(topic.name, null, topic.storeConfig);
}
/**
* @summary Configure new topics.
*
* @param {Array<TopicConfig>} topics - Topic configurations
*
* @returns {Promise<void>}
*
* @description
* <p>
* Allows you to configure new topics. This will call `addTopic` with each item passed.
* Topics are registered sequentially.
* For available options, see `addTopic`.
*/
async addTopics(topics) {
for (const topic of topics) {
await this.addTopic(topic);
}
}
#getStringSubsFor(topic) {
return Array.isArray(this.#stringSubs[topic]) ? this.#stringSubs[topic] : [];
}
#getFnSubsFor(topic) {
return this.#fnSubs.filter(sub => {
try {
return sub.topic(topic);
} catch (e) {
this.#logger.warn('Pubst.subscribe', `Matcher function threw an error for topic '${topic}': ${e.message}`);
return false;
}
});
}
#addSub(subscriber) {
if (typeof subscriber.topic === 'string') {
if (!this.#topics[subscriber.topic]) {
this.#logger.warn('Pubst.addSub', `Adding a subscriber to non-configured topic '${subscriber.topic}'`);
}
this.#stringSubs[subscriber.topic] = this.#getStringSubsFor(subscriber.topic).concat(subscriber);
} else if (typeof subscriber.topic === 'function') {
const matchCount = Object.keys(this.#topics).filter(topic => {
try {
return subscriber.topic(topic);
} catch (e) {
this.#logger.warn('Pubst.addSub', `Matcher function threw an error while checking topic '${topic}': ${e.message}`);
return false;
}
}).length;
if (matchCount === 0) {
this.#logger.warn('Pubst.addSub', `Adding a function matcher subscriber that matches no configured topics.`);
}
this.#fnSubs.push(subscriber);
} else {
throw new Error('Unable to add subscriber. Topic is not a string or a function');
}
}
#removeSub(subscriber) {
if (typeof subscriber.topic === 'string') {
this.#stringSubs[subscriber.topic] = this.#getStringSubsFor(subscriber.topic).filter(sub => sub !== subscriber);
} else if (typeof subscriber.topic === 'function') {
this.#fnSubs = this.#fnSubs.filter(sub => sub !== subscriber);
}
}
#allSubsFor(topic) {
return this.#getStringSubsFor(topic).concat(this.#getFnSubsFor(topic));
}
#getTopicConfig(topic) {
return this.#topics[topic] || buildConfig(DEFAULT_TOPIC_CONFIG, {name: topic});
}
#scheduleCall(sub, payload, topic) {
const topicConfig = this.#getTopicConfig(topic);
const defVal = typeof sub.default === 'undefined' ? topicConfig.default : sub.default;
const eventOnly = hasOwnProperty(sub, 'eventOnly') ? sub.eventOnly : topicConfig.eventOnly;
const allowRepeats = hasOwnProperty(sub, 'allowRepeats') ? sub.allowRepeats : topicConfig.allowRepeats;
const value = eventOnly ? topic : valueOrDefault(payload, defVal);
if (eventOnly || allowRepeats || sub.lastVal !== value || sub.lastTopic !== topic) {
setTimeout(() => {
sub.handler(value, topic);
sub.lastVal = value;
sub.lastTopic = topic;
}, 0);
}
}
/**
* @summary Publish to a topic
*
* @param {string} topic - The topic to publish to
* @param {*} payload The payload to publish
*
* @returns {Promise<void>}
*/
async publish(topic, payload) {
if (!this.#topics[topic]) {
this.#logger.warn('Pubst.publish', `Received a publish for '${topic}', but that topic has not been configured.`);
}
await this.#store.setValue(topic, payload);
const storedValue = await this.#store.getValue(topic);
const subs = this.#allSubsFor(topic);
if (subs.length === 0) {
this.#logger.warn('Pubst.publish', `There are no subscribers that match '${topic}'!`);
} else {
subs.forEach(sub => {
this.#scheduleCall(sub, storedValue, topic);
});
}
}
/**
* @summary Subscribe to one or more topics
*
* @param {string|Function} topic - The topic to receive updates for.
* If a string is provided, the handler will be called for all updates
* for that topic. If a function is provided, it will be used as a
* matcher: it receives a topic name string and should return a truthy
* value if the subscriber should receive updates for that topic.
* If the matcher function throws an error, the error is logged as a
* warning and the match is skipped.
* @param {Function|SubscriptionConfig} handler - Either your handler function or
* a subscription configuration object
* @param {*} [def] - (Optional) Value to send when topic is empty
*
* @returns {Function} - A function that will remove this
* subscription from getting further updates.
*
* @description
* <p>
* The first argument may be a string or a matcher function.
* If a string is provided, the handler will be called for all
* updates for that topic. If a function is provided, it will be
* called with each topic name and should return a truthy value to
* indicate that the subscriber wishes to receive updates for that topic.
* If the matcher function throws, the error is logged as a warning
* and the subscriber will not receive the update for that topic.
* </p>
*
* <p>
* The second argument may be a function or an object. The object
* is necessary if you want to provide configuration options for
* this subscription. Available options are:
* <ul>
* <li>`default` - (Default: undefined) - Default value for this sub.</li>
* <li>`doPrime` - (Default: true) - Should the handler be primed with
* the last value?</li>
* <li>`allowRepeats` - (Default: false) - Should the handler be called
* when the value doesn't change?</li>
* <li>`handler` - (Required) - The handler to call.</li>
* </ul>
* </p>
*
* <p>
* The handler will be called on topic updates. It will be passed
* the new value of the topic as the first argument, and the name of
* the topic as the second argument.
* </p>
*
* <p>
* Note: Subscribe is synchronous and returns an unsubscribe function
* immediately. Priming of subscribers with existing values happens
* asynchronously via the store.
* </p>
*/
subscribe(topic, handler, def) {
let subscription;
if (typeof handler === 'function') {
subscription = {topic, default: def, handler};
} else if (typeof handler === 'object') {
subscription = {};
Object.keys(handler)
.filter(key => ALLOWED_SUB_PROPS.includes(key))
.forEach(key => {
subscription[key] = handler[key];
});
subscription.topic = topic;
}
this.#addSub(subscription);
if (typeof topic === 'string') {
this.#store.getValue(topic).then(storeVal => {
const topicConfig = this.#getTopicConfig(topic);
const defToUse = isDefined(def) ? def : topicConfig.default;
const val = valueOrDefault(storeVal, defToUse);
const doPrime = hasOwnProperty(subscription, 'doPrime') ? subscription.doPrime : topicConfig.doPrime;
if (doPrime && (topicConfig.eventOnly || isSet(val))) {
this.#scheduleCall(subscription, val, topic);
}
});
} else if (typeof topic === 'function') {
this.#store.getTopicNames().then(names => {
const matchingNames = names.filter(key => {
try {
return topic(key);
} catch (e) {
this.#logger.warn('Pubst.subscribe', `Matcher function threw an error for topic '${key}': ${e.message}`);
return false;
}
});
matchingNames.forEach(key => {
this.#store.getValue(key).then(storeVal => {
const topicConfig = this.#getTopicConfig(key);
const defToUse = isDefined(def) ? def : topicConfig.default;
const val = valueOrDefault(storeVal, defToUse);
const doPrime = hasOwnProperty(subscription, 'doPrime') ? subscription.doPrime : topicConfig.doPrime;
if (doPrime && (topicConfig.eventOnly || isSet(val))) {
this.#scheduleCall(subscription, val, key);
}
});
});
});
}
return () => {
this.#removeSub(subscription);
};
}
/**
* @summary Get the current value of a topic.
*
* @param {string} topic - The topic to get the value of.
* @param {*} [def] - (Optional) a value to return if the topic is
* empty.
* @returns {Promise<*>} - Resolves with the current value or the default
*/
async currentVal(topic, def) {
const defToUse = isDefined(def) ? def : this.#getTopicConfig(topic).default;
const storeVal = await this.#store.getValue(topic);
return valueOrDefault(storeVal, defToUse);
}
/**
* @summary Clears a given topic.
*
* @param {string} topic - The topic to clear
*
* @returns {Promise<void>}
*
* @description Clears the topic by publishing a `null` to it.
*/
async clear(topic) {
const topicNames = await this.#store.getTopicNames();
if (topicNames.includes(topic)) {
await this.publish(topic, null);
}
}
/**
* @summary Clears all known topics.
*
* @returns {Promise<void>}
*/
async clearAll() {
const topicNames = await this.#store.getTopicNames();
for (const topic of topicNames) {
await this.clear(topic);
}
}
}
export default Pubst;