# RxJS wrapper for gRPC and TypeScript

A package that wraps the callback and event based [@grpc/grpc-js](https://www.npmjs.com/package/@grpc/grpc-js) and [grpc-web](https://www.npmjs.com/package/grpc-web) with Promises and RxJS Observables. Both clients as well as servers may be used reactively.
Works well in conjunction with [grpc-tools](https://www.npmjs.com/package/grpc-tools), [grpc_tools_node_protoc_ts](https://www.npmjs.com/package/grpc_tools_node_protoc_ts) and [protoc-gen-grpc-web](https://www.npmjs.com/package/protoc-gen-grpc-web).

#### Before
```typescript
class ExampleService implements IExampleServer {
  incrementStream(call: grpc.ServerDuplexStream<OneNumber, OneNumber>): void {
    call.on('data', (number) => {
      call.write(new OneNumber().setA(number.getA() + 1));
    });
    call.on('end', () => call.end());
  }
}
```

#### After
```typescript
defineService<IExampleServer>(ExampleService, {
  incrementStream(request: Observable<OneNumber>): Observable<OneNumber> {
    return request.pipe(map((number) => new OneNumber().setA(number.getA() + 1)));
  },
});
```

# Table of Contents
* [Installation](#installation)
  * [yarn](#yarn)
  * [npm](#npm)
* [Usage](#usage)
  * [Node.js server implementation](#nodejs-server-implementation)
    * [Reactify an entire service](#reactify-an-entire-service-(recommended))
    * [Reactify individual methods](#reactify-individual-methods)
    * [Throwing errors](#throwing-errors)
  * [Node.js client](#nodejs-client)
  * [Web client](#web-client)

## Installation

`reactive-grpc` requires `rxjs`, as well as either `@grpc/grpc-js` (for Node.js server applications) or `grpc-web` (for web frontends) to be added to your project.

### yarn
```bash
yarn add reactive-grpc rxjs @grpc/grpc-js # or grpc-web
```

### npm
```bash
npm install --save reactive-grpc rxjs @grpc/grpc-js # or grpc-web
```

## Usage

### Node.js server implementation
You can either "reactify" individual service methods or entire services. The latter is recommended, due to better type inference.

#### Reactify an entire service (recommended)
Use the `defineService` function to convert an object implementing the reactive service interface into a regular gRPC service. The reactive type is created automatically from the regular interface generated by `grpc-tools`.
```typescript
import { interval, Observable } from 'rxjs';
import { map, reduce } from 'rxjs/operators';

import { defineService } from 'reactive-grpc/node';

// Generated by grpc-tools and grpc_tools_node_protoc_ts
import { OneNumber, TwoNumbers, Empty } from '../generated/service_pb';
import { ExampleService, IExampleServer } from '../generated/service_grpc_pb';

/** Reactive server of the example service. */
export default defineService<IExampleServer>(ExampleService, {
  async addTwoNumbers(request: TwoNumbers): Promise<OneNumber> {
    return new OneNumber().setA(request.getA() + request.getB());
  },
  addStreamOfNumbers(request: Observable<OneNumber>): Promise<OneNumber> {
    return request
      .pipe(
        reduce((acc, value) => acc + value.getA(), 0),
        map((value) => new OneNumber().setA(value)),
      )
      .toPromise();
  },
  getFibonacciSequence(request: Empty): Observable<OneNumber> {
    let a = 0;
    let b = 1;
    return interval(100).pipe(
      map(() => {
        const next = a + b;
        a = b;
        b = next;
        return new OneNumber().setA(a);
      })
    );
  },
  runningAverage(request: Observable<OneNumber>): Observable<OneNumber> {
    let average = 0;
    return request.pipe(
      map((value, index) => {
        average = (value.getA() + index * average) / (index + 1);
        return new OneNumber().setA(average);
      })
    );
  },
});


```
If you require advanced functionality from the standard gRPC call objects, such as reading the metadata or watching the cancellation status, you may simply add the usual argument:
```typescript
defineService<IExampleServer>(ExampleService, {
  runningAverage(
    request: Observable<OneNumber>,
    call: grpc.ServerDuplexStream<OneNumber, OneNumber>,
  ): Observable<OneNumber> {
    ...
  },
}
```
Methods with `Promise<ResponseType>` return types can also return objects instead to include trailer metadata and flags, which would otherwise be provided when calling the `callback` function:
```typescript
defineService<IExampleServer>(ExampleService, {
  async addTwoNumbers(request: TwoNumbers): Promise<OneNumber> {
    return {
      value: new OneNumber().setA(request.getA() + request.getB()),
      trailer: ...,
      flags: ...,
    };
  },
}
```

#### Reactify individual methods
Use `defineUnaryMethod`, `defineRequestStreamMethod`, `defineResponseStreamMethod` and `defineBidirectionalStreamMethod` to wrap your reactive function definitions. You can then assign the returned function to, for example, a member function of a non-reactive gRPC service:
```typescript
import { interval, Observable } from 'rxjs';
import { map, reduce } from 'rxjs/operators';

import {
  defineUnaryMethod,
  defineRequestStreamMethod,
  defineResponseStreamMethod,
  defineBidirectionalStreamMethod,
} from 'reactive-grpc/node';

// Generated by grpc-tools and grpc_tools_node_protoc_ts
import { OneNumber, TwoNumbers, Empty } from '../generated/service_pb';
import { IExampleServer } from '../generated/service_grpc_pb';

/** Server of the example service that wraps each method individually. */
export default class ExampleServer implements IExampleServer {
  addStreamOfNumbers = defineRequestStreamMethod(function (request: Observable<OneNumber>): Promise<OneNumber> {
    return request
      .pipe(
        reduce((acc, value) => acc + value.getA(), 0),
        map((value) => new OneNumber().setA(value)),
      )
      .toPromise();
  });
}
```

#### Throwing errors
gRPC errors can be emitted by throwing instances of the `RpcError` class to reject the returned `Promise`:
```typescript
defineService<IExampleServer>(ExampleService, {
  async addTwoNumbers(request: TwoNumbers): Promise<OneNumber> {
    throw new RpcError(RpcError.StatusCode.UNIMPLEMENTED, 'This method is not yet implemented!');
  },
}
```
For streaming responses, the error should be given to the the returned `Observable`:
```typescript
import { Observable, throwError } from 'rxjs';

defineService<IExampleServer>(ExampleService, {
  getFibonacciSequence(request: Empty): Observable<OneNumber> {
    return throwError(new RpcError(RpcError.StatusCode.UNAUTHENTICATED, 'Client is not authenticated.'));
  },
}
```

### Node.js client
To create a reactive gRPC client for use with Node.js, you must first create a regular client instance, and then apply `reactive-grpc`'s `reactifyNodeClient` function:
```typescript
import * as grpc from '@grpc/grpc-js';
import { reactifyNodeClient, RpcError } from 'reactive-grpc/node';

// Generated by grpc-tools and grpc_tools_node_protoc_ts
import { ExampleClient, ExampleService } from '../generated/service_grpc_pb';

const client = new ExampleClient(port, grpc.credentials.createInsecure());
const reactiveClient = reactifyNodeClient(ExampleService, client);
```
The new client then creates an API with Promises and Observables:
```typescript
try {
  const inputStream = from([1, 2, 3, 4]).pipe(map((num) => new OneNumber().setA(num)));
  const sum = await reactiveClient.addNumbers(inputStream);
  console.log(sum.getA());
} catch (err) {
  if (err instanceof RpcError) {
    // Handle gRPC errors here.
  }
}
```
```typescript
const observable = reactiveClient.getFibonacciNumbers(new OneNumber().setA(20));
observable.subscribe(
  (value) => console.log(value.getA()),
  (err) => console.log("Oh no!"), // Handle gRPC errors here.
  () => console.log("Done!"),
);
```
Optionally, you can also supply Metadata and CallOptions as the second and third parameters of the calls.
To retrieve the standard gRPC call object, use `.call` on the returned Promise or Observable:
```typescript
console.log(observable.call);
```

Note that unsubscribing from a server side stream will automatically cancel the request:
```typescript
const observable = reactiveClient.getFibonacciNumbers(new OneNumber().setA(20));
// Will produce a gRPC CANCELLED error after receiving 5 values.
// The error will be ignored by the client.
observable.pipe(take(5)).subscribe(
  (value) => console.log(value.getA()),
  (err) => console.log('Oh no!'),
  () => console.log('Done!'),
);
```

### Web client
To create a reactive gRPC client for use in a browser (e.g. with webpack), you must first create a regular client instance, and then apply `reactive-grpc`'s `reactifyWebClient` function. Make sure to use the generated client which uses callbacks, not promises, for reactification:
```typescript
import * as grpc from 'grpc-web';
import { reactifyWebClient } from 'reactive-grpc/web';

// Generated by protoc-gen-grpc-web
import { ExampleClient } from '../generated/service_grpc_pb';

const client = new ExampleClient(port);
const reactiveClient = reactifyWebClient(client);
```
The new client then creates an API with Promises and Observables:
```typescript
try {
  const request = new TwoNumbers().setA(5).setB(6);
  const sum = await reactiveClient.addTwoNumbers(request);
  console.log(sum.getA());
} catch (err) {
  if (err instanceof RpcError) {
    // Handle gRPC errors here.
  }
}
```
```typescript
const observable = reactiveClient.getFibonacciNumbers(new OneNumber().setA(20));
observable.subscribe(
  (value) => console.log(value.getA()),
  (err) => console.log('Oh no!'), // Handle gRPC errors here.
  () => console.log('Done!'),
);
```
Optionally, you can also supply Metadata as the second parameter of the calls.
To retrieve the standard gRPC call object, use `.call` on the returned Promise or Observable:
```typescript
console.log(observable.call);
```

Note that unsubscribing from a server side stream will automatically cancel the request:
```typescript
const observable = reactiveClient.getFibonacciNumbers(new OneNumber().setA(20));
// Will produce a gRPC CANCELLED error after receiving 5 values.
// The error will be ignored by the client.
observable.pipe(take(5)).subscribe(
  (value) => console.log(value.getA()),
  (err) => console.log('Oh no!'),
  () => console.log('Done!'),
);
```
