///
///
import {Observable, Observer} from '@reactivex/rxjs';
import xs from '../../src/index';
import * as assert from 'assert';
describe('xs.fromObservable', () => {
it('should convert an observable to a stream', (done) => {
const observable = new Observable((observer: Observer) => {
// Emit a single value after 1 second
const timer = setTimeout(() => {
observer.next('yes');
observer.complete();
}, 1000);
// On unsubscription, cancel the timer
return () => clearTimeout(timer);
});
const stream = xs.fromObservable(observable);
let nextSent = false;
stream.addListener({
next: (x: string) => {
assert.strictEqual(x, 'yes');
nextSent = true;
},
error: (err: any) => done(err),
complete: () => {
assert.strictEqual(nextSent, true);
done();
},
});
});
});