import {Observable, Subject} from 'rxjs'; export const onSubscribe = (callback: () => void) => (source: Observable) => new Observable(subscriber => { callback(); return source.subscribe(subscriber); }); export const controlStream = (suspendSelector: Observable, resumeSelector: Observable) => (source: Observable) => new Observable(subscriber => { let semaphoreCount = 0; let buffer: T[] = []; const subscriptions = [ suspendSelector.subscribe(() => { semaphoreCount++; }), resumeSelector.subscribe(() => { semaphoreCount--; if (semaphoreCount === 0) { buffer.forEach(item => subscriber.next(item)); buffer = []; } }), source.subscribe(value => { if (semaphoreCount > 0) { buffer.push(value); } else { subscriber.next(value); } }, // be sure to handle errors and completions as appropriate and // send them along err => subscriber.error(err), () => subscriber.complete()) ]; // to return now return () => { subscriptions.forEach(value => value.unsubscribe()); }; });