import { StoreReadModel } from '../Model/StoreReadModel'; import { StoreFactory } from '../../Redux/Store/StoreFactory'; import { Identity } from 'ts-eventsourcing/ValueObject/Identity'; import { ReadModelAction, ReadModelMetadata } from '../ReadModelAction'; import { tap, toArray } from 'rxjs/operators'; import { NotFoundError } from '../Error/NotFoundError'; import { Store } from 'redux'; import { INITIAL_PLAYHEAD, Playhead } from '../../ValueObject/Playhead'; import { ActionRepositoryInterface } from '../ActionRepositoryInterface'; import { ActionStream } from '../ActionStream'; import { SimpleActionStream } from '../SimpleActionStream'; import { Observable } from 'rxjs'; import { Map } from 'immutable'; export class InMemoryActionRepository = ReadModelMetadata, Action extends ReadModelAction = ReadModelAction> implements ActionRepositoryInterface { private actions: Map = Map(); constructor(private readonly storeFactory: StoreFactory) { } public async create(id: Id): Promise> { return new StoreReadModel(id, this.storeFactory.create(), INITIAL_PLAYHEAD); } public async save(model: StoreReadModel): Promise { const actions: Action[] = await model.getUncommittedActions().pipe(toArray()).toPromise(); const previousActions: Action[] = this.actions.get(model.getId(), []); this.actions = this.actions.set(model.getId(), previousActions.concat(actions)); } public async has(id: Id): Promise { return this.actions.has(id); } public async get(id: Id): Promise> { const actions: Action[] | undefined = this.actions.get(id); if (!actions) { throw NotFoundError.storeNotFound(id); } const store: Store = this.storeFactory.create(); actions.forEach((action: Action) => { store.dispatch(action); }); const lastPlayhead: Playhead = actions.length === 0 ? 0 : actions[actions.length - 1].metadata.playhead as Playhead; return new StoreReadModel(id, store, lastPlayhead); } public async find(id: Id): Promise> { if (!this.has(id)) { return null; } return this.get(id); } public async remove(id: Id): Promise { this.actions = this.actions.remove(id); } public async append(id: Id, eventStream: ActionStream): Promise { const actions: Action[] = this.actions.get(id, []); await eventStream.pipe(tap((action) => actions.push(action))).toPromise(); } public load(id: Id): ActionStream { const actions: Action[] | undefined = this.actions.get(id); if (!actions) { throw NotFoundError.storeNotFound(id); } return SimpleActionStream.of(actions); } public loadFromPlayhead(id: Id, playhead: number): ActionStream { const actions: Action[] | undefined = this.actions.get(id); if (!actions) { throw NotFoundError.storeNotFound(id); } return SimpleActionStream.of(actions.slice(playhead)); } public findAll(): Observable> { return new Observable>((observer) => { const all = async () => { for (const id of this.actions.keys()) { if (observer.closed) { return; } observer.next(await this.get(id)); } }; all() .then(() => observer.complete()) .catch((error) => observer.error(error)); }); } }