<p align="center">
    <image src="nestjstools-logo.png" width="400" alt="NestJS Messaging Library">
</p>

# NestJS Messaging Library - Message Bus & Service Bus for Distributed Systems

A NestJS library for managing asynchronous and synchronous messages (service bus) with support for buses, handlers,
channels, and consumers. This library simplifies building scalable and decoupled applications by facilitating robust
message handling pipelines while ensuring flexibility and reliability.

---

## Features

- **Message Buses**: Define multiple buses for commands, events, and queries to streamline message routing.
- **Handlers**: Easily register and manage handlers for processing messages.
- **Channels**: Support for in-memory channels and **easy extension** to create custom channel implementations tailored
  to your needs.
- **Consumers**: Run message consumers to process queued messages asynchronously, ensuring system reliability and fault
  tolerance.
- **Middleware Support**: Add custom middleware for message transformation such like validation, logging - do whatever
  you want.
- **Debug Mode**: Enable enhanced logging and debugging capabilities for development.
- **Extensibility**: Creating new channels is straightforward, allowing developers to expand and integrate with external
  systems or protocols effortlessly.
- **Concurrent Handler Execution**: Messages dispatched to multiple handlers are processed concurrently, improving
  performance and responsiveness across your system.

## Channels

- [Redis channel adapter](https://www.npmjs.com/package/@nestjstools/messaging-redis-extension)
- [RabbitMQ channel adapter](https://www.npmjs.com/package/@nestjstools/messaging-rabbitmq-extension)
- [Amazon SQS channel adapter](https://www.npmjs.com/package/@nestjstools/messaging-amazon-sqs-extension)
- [Google PubSub channel Adapter](https://www.npmjs.com/package/@nestjstools/messaging-google-pubsub-extension)
- [Nats channel Adapter](https://www.npmjs.com/package/@nestjstools/messaging-nats-extension)
- [Azure Service Bus](https://www.npmjs.com/package/@nestjstools/messaging-azure-service-bus-extension)

---

## Documentation

- Documentation: https://docs.nestjstools.com/messaging
- Website: https://nestjstools.com
- Example repository: https://github.com/nestjstools/messaging-rabbitmq-example

---

## Installation

```bash
npm install @nestjstools/messaging
```

or

```bash
yarn add @nestjstools/messaging
```

## Getting Started

### Basic Usage (In-memory)

```typescript
import { MessagingModule, InMemoryChannelConfig } from '@nestjstools/messaging';
import { SendMessageHandler } from './handlers/send-message.handler';


@Module({
  imports: [
    MessagingModule.forRoot({
      buses: [
        {
          name: 'message.bus',
          channels: ['my-channel'],
        },
      ],
      channels: [
        new InMemoryChannelConfig({
          name: 'my-channel',
          middlewares: [],
        }),
      ],
      debug: true,
    }),
  ],
})
export class AppModule {
}
```

### Define a Message & Message Handler

Create a new handler that processes specific message

#### Define your message

```typescript
export class SendMessage {
  constructor(
    public readonly content: string,
  ) {
  }
}

```

#### Define your message handler

```typescript
import { SendMessage } from './send-message';
import { MessageResponse, MessageHandler, IMessageHandler } from '@nestjstools/messaging';
import { Injectable } from '@nestjs/common';

// You can define multiple routing messages on the same handler: @MessageHandler('your.message', 'your.message2')
@Injectable()
@MessageHandler('your.message')
export class SendMessageHandler implements IMessageHandler<SendMessage> {
  // If you want to receive the message as a properly typed instance (not just a raw object),
  // use the `@DenormalizeMessage()` decorator on the parameter:
  // async handle(@DenormalizeMessage() message: SendMessage): Promise<MessageResponse | void> {

  async handle(message: SendMessage): Promise<object | void> {
    console.log(message.content);
    // Example handling logic
  }
}
```

### Next Step: Dispatching a Message

Messages can be dispatched from anywhere in your application—whether from services, controllers, or other components.
Here’s an example using an HTTP endpoint:

```typescript
import { Controller, Get } from '@nestjs/common';
import { MessageBus, IMessageBus, RoutingMessage } from '@nestjstools/messaging';
import { SendMessage } from './test/send-message';


@Controller()
export class AppController {
  //You can inject every bus which you defined in configuration
  constructor(@MessageBus('message.bus') private readonly messageBus: IMessageBus) {
  }

  @Get()
  async dispatchMessage(): Promise<string> {
    // Dispatching a SendMessage instance with a route
    await this.messageBus.dispatch(
      new RoutingMessage(new SendMessage('Message from HTTP request'), 'your.message'),
    );

    return 'Message dispatched successfully!';
  }
}
```

### Flow:

1. **Flexible Dispatching**:
    - You can call the `dispatch` method from any layer (e.g., controller, service, or scheduled job). This example uses
      an HTTP `GET` endpoint for demonstration.

2. **`@MessageBus` Decorator**:
    - Injects the particular message bus (identified by its name, `message.bus`) into the `AppController`.

3. **Routing and Payload**:
    - Wrap the payload (`SendMessage`) in a `RoutingMessage` to specify its route (`your.message`), which ensures the
      message is handled by the appropriate handler.

4. **HTTP Trigger**:
    - This implementation illustrates an entry point triggered via an HTTP request, showcasing how simple it is to
      connect the messaging system to a web interface.

### ⚠️ Warning!

Important Notice: You can **return responses from handlers**, but currently, it only works with the `InMemoryChannel`.
This behavior may not function as expected if multiple handlers are processing a single message.

Please ensure you're using a compatible setup when working with multiple handlers, as this could result in unexpected
behavior.

---

## Normalizers

What is a Normalizer?
A Normalizer is a component that transforms messages between different formats. It ensures that messages are correctly
encoded when sent and properly decoded when received. This is particularly useful in messaging systems where messages
need to be serialized and deserialized efficiently.

You can use it to make it works with:

* [protobuf](https://protobuf.dev/)
* Custom JSONs
* Base64
* Any custom format

```typescript
import { Injectable } from '@nestjs/common';
import { MessagingNormalizer, MessageNormalizer } from '@nestjstools/messaging';
import { Buffer } from 'buffer';

@Injectable()
@MessagingNormalizer()
export class Base64Normalizer implements MessageNormalizer {
  denormalize(message: string | object, type: string): Promise<object> {
    if (typeof message === 'object') {
      throw new Error('Message must be a string!');
    }
    return Promise.resolve(JSON.parse(Buffer.from(message, 'base64').toString('utf-8')));
  }

  normalize(message: object, type: string): Promise<string> {
    const jsonString = JSON.stringify(message);
    return Promise.resolve(Buffer.from(jsonString, 'utf-8').toString('base64'));
  }
}

```

### How It Works

#### Normalization (normalize)

* Converts a JSON object to a Base64 string before sending.

#### Denormalization (denormalize)

* Decodes the Base64 string back into a JSON object after receiving.

You can define a **Normalizer** per Channel
___

## ⤵️ Middlewares

A **middleware** in the context of the `MessagingModule` is a function that processes messages as they pass through the
message pipeline. The middleware can intercept, modify, or log messages before they are handled by the respective *
*message handler**. This is particularly useful for logging, authentication, validation, or any other pre-processing
step before the actual business logic is applied.

Each **channel** in the messaging system has its own set of middlewares, and these middlewares are executed in order
when a message is dispatched through the respective channel.

### How to Use Middleware in Messaging Channels:

To use middleware, you need to:

1. **Define the middleware class** that implements the `Middleware` interface, which contains the `process` method that
   processes the message.
2. **Attach the middleware to a specific channel** via the channel configuration.

### Example Middleware Code:

Here's an example middleware class that logs a message when the middleware is applied.

```typescript
import { Injectable } from '@nestjs/common';
import { Middleware, RoutingMessage } from '@nestjstools/messaging';

@Injectable()
@MessagingMiddleware()
export class TestMiddleware implements Middleware {
  async process(message: RoutingMessage, context: MiddlewareContext): Promise<MiddlewareContext> {
    console.log('!!!! WORKS');  // Log or process the message here

    return await context.next().process(message, context); //TODO call `next()` method from `MiddlewareContext` to process next middleware
  }
}
```

### Attaching Middleware to a Channel:

Now that we've defined the middleware, it needs to be attached to a specific channel in the `MessagingModule`
configuration. Here's how you would configure the middleware for a channel:

```typescript
import { MessagingModule, AmqpChannelConfig, InMemoryChannelConfig } from '@nestjstools/messaging';
import { TestMiddleware } from './middlewares/test.middleware';
import { SendMessageHandler } from './handlers/send-message.handler';

@Module({
  imports: [
    MessagingModule.forRoot({
      buses: [
        {
          name: 'message.bus',
          channels: ['my-channel'],
        },
      ],
      channels: [
        new InMemoryChannelConfig({
          name: 'my-channel',
          middlewares: [TestMiddleware],  // Attach TestMiddleware to this channel
        }),
        new AmqpChannelConfig({
          name: 'amqp-command',
          connectionUri: 'amqp://guest:guest@localhost:5672/',
          exchangeName: 'my_app_command.exchange',
          bindingKeys: ['my_app.command.#'],
          exchangeType: ExchangeType.TOPIC,
          queue: 'my_app.command',
          autoCreate: true,
          enableConsumer: true,
          middlewares: [TestMiddleware],  // Attach TestMiddleware to this AMQP channel
        }),
      ],
      debug: true,
    }),
  ],
})
export class AppModule {
}
```

### Explanation of How It Works:

1. **Middleware Class**:
    - A `Middleware` is a class that implements the `next` method. In this case, the `TestMiddleware` simply logs
      `'!!!! WORKS'` and allows the message to continue.

2. **Message Pipeline**:
    - When a message is dispatched, it passes through the series of middlewares configured for its channel.
    - The middlewares execute in the order they're listed for the channel, and each `next` method decides what happens
      to the message—whether it continues or gets transformed.

3. **Channel-Specific Middlewares**:
    - Each channel can have its own set of middlewares defined in the channel's configuration (e.g.,
      `InMemoryChannelConfig` and `AmqpChannelConfig`).
    - This allows flexible control of how messages are processed depending on the channel, enabling different logic for
      each transport mechanism (in-memory vs. RabbitMQ).

### Benefits of Using Middlewares:

- **Separation of Concerns**: Middlewares help encapsulate cross-cutting concerns like logging, validation, and
  authentication, making the code cleaner.
- **Reusability**: A middleware can be reused across different channels to perform the same actions on various messages.
- **Custom Logic**: You can apply custom transformations, logging, or other types of business logic to messages as they
  move through the pipeline.

---

## 🔰 ExceptionListener

The **ExceptionListener** provides a centralized way to handle exceptions thrown during asynchronous message processing
from any **channel** in your **messaging system**.

By decorating a class with `@MessagingExceptionListener()` and implementing the `ExceptionListener` interface, you can
intercept and respond to any unhandled exception occurring during message handling — whether it's logging, reporting,
retries, or custom recovery logic.

Example Use Case:
You can log the error, send a notification, or trigger fallback logic whenever a message handler throws an exception.

### Example ExceptionListener Code:

```typescript
import { Injectable } from '@nestjs/common';
import { ExceptionListener, MessagingExceptionListener, ExceptionContext } from '@nestjstools/messaging';

@MessagingExceptionListener()
export class CustomExceptionListener implements ExceptionListener {
  onException(context: ExceptionContext): Promise<void> {
    console.log(`Here I can handle exception If I want and do some action`);
  }
}
```

---

## 📝 Custom Logger

By default, the messaging system uses Nest’s built-in `NestLogger` for logging.  
However, you can plug in your own **custom logger** to gain more control over how messages and errors are recorded.

A custom logger must implement the interface `MessagingLogger` interface:

### Providing a custom logger

When configuring the **messaging module**, you can pass your own logger instance via options `customLogger`.

```ts

@Module({
  imports: [
    MessagingModule.forRoot({
      customLogger: new MyCustomLogger(),
      ...
    }),
  ],
})
export class AppModule {
}
```

or if you defined it as provider

```ts

@Module({
  imports: [
    MessagingModule.forRoot({
      customLogger: MyCustomLogger,
      ...
    }),
  ],
  providers: [
    MyCustomLogger,
  ],
})
export class AppModule {
}
```

## Configuration options

### `MessagingModule.forRoot` Configuration

<br>

| **Property**                   | **Description**                                                                                                                                                                                                  | **Default Value**             |
|--------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-------------------------------|
| **`buses`**                    | Array of message buses that define routing and processing of messages.                                                                                                                                           | `[]` (empty array by default) |
| **`channels`**                 | Array of channel configurations used by the message buses.                                                                                                                                                       | `[]` (empty array by default) |
| **`debug`**                    | Enables or disables debug mode for logging additional messages.                                                                                                                                                  | `false`                       |
| **`logging`**                  | Enables or disables logging for bus activity (e.g., message dispatch).                                                                                                                                           | `true`                        |
| **`customLogger`**             | Instance of a class implements `MessagingLogger` for custom logging integration.                                                                                                                                 | `NestLogger`                  |
| **`forceDisableAllConsumers`** | Forces all messaging consumers to be disabled. Messages can only be processed through the `InMemoryChannel`. Useful in testing environments to prevent external transports or background consumers from running. | `false`                       |

---

### Buses

| **Property**   | **Description**                                                      | **Default Value** |
|----------------|----------------------------------------------------------------------|-------------------|
| **`name`**     | Name of the message bus (e.g., 'command.message-bus').               |                   |
| **`channels`** | List of channel names to be used by this bus (e.g., `'my-channel'`). | `[]`              |

---

### Channels

#### **InMemoryChannelConfig**

| **Property**                           | **Description**                                          | **Default Value** |
|----------------------------------------|----------------------------------------------------------|-------------------|
| **`name`**                             | Name of the in-memory channel (e.g., `'my-channel'`).    |                   |
| **`middlewares`**                      | List of middlewares to apply to the channel.             | `[]`              |
| **`avoidErrorsForNotExistedHandlers`** | Avoid errors if no handler is available for the message. | `false`           |
| **`normalizer`**                       | Set your custom normalizer for messages                  |                   |

