import { StoreReadModel } from '../Model/StoreReadModel'; import { Identity } from 'ts-eventsourcing/ValueObject/Identity'; import { ReadModelAction, ReadModelMetadata } from '../ReadModelAction'; import { tap } from 'rxjs/operators'; import { ActionRepositoryInterface } from '../ActionRepositoryInterface'; import { ActionStream } from '../ActionStream'; import { StoreRepository } from './StoreRepository'; import { Observable } from 'rxjs'; export class ActionWithSnapshotRepository = ReadModelMetadata, Action extends ReadModelAction = ReadModelAction> implements ActionRepositoryInterface { constructor(private readonly actionRepository: ActionRepositoryInterface, private readonly snapshotRepository: StoreRepository, ) { } public async append(id: Id, eventStream: ActionStream): Promise { const model = await this.snapshotRepository.find(id) || await this.snapshotRepository.create(id); const store = model.getStore(); await eventStream.pipe(tap((action: Action) => { store.dispatch(action); })).toPromise(); await this.snapshotRepository.save(model); await this.actionRepository.save(model); } public create(id: Id): Promise> { return this.actionRepository.create(id); } public find(id: Id): Promise> { return this.snapshotRepository.find(id); } public get(id: Id): Promise> { return this.snapshotRepository.get(id); } public has(id: Id): Promise { return this.snapshotRepository.has(id); } public load(id: Id): ActionStream { return this.actionRepository.load(id); } public loadFromPlayhead(id: Id, playhead: number): ActionStream { return this.actionRepository.loadFromPlayhead(id, playhead); } public async remove(id: Id): Promise { await this.actionRepository.remove(id); await this.snapshotRepository.remove(id); } public async save(model: StoreReadModel): Promise { const events = await model.getUncommittedActions(); return this.append(model.getId(), events); } public findAll(): Observable> { return this.snapshotRepository.findAll(); } }