import { Chunk, Duration, Effect, Layer, ParseResult, Schema, Stream, pipe, Ref } from 'effect'; import { beforeAll, beforeEach, describe, expect, it } from 'bun:test'; import { EventStreamId, EventStreamPosition, beginning } from '../streamTypes'; import { type EventStore, ConcurrencyConflictError } from '../eventstore'; // Helper functions for converting stream events const toArraySafely = Chunk.toReadonlyArray; const Id = { randomPart: () => Math.random().toString(36).substring(7), }; export const newEventStreamId = () => pipe(`stream_${Id.randomPart()}`, Schema.decode(EventStreamId)); const FooEvent = Schema.Struct({ bar: Schema.String }); type FooEvent = typeof FooEvent.Type; const readStreamFromBeginning = (streamId: Effect.Effect) => (eventstore: EventStore) => pipe( streamId, Effect.flatMap(beginning), Effect.flatMap(eventstore.read), Effect.flatMap(Stream.runCollect) ); const appendEventsToStream = (events: readonly FooEvent[], position: EventStreamPosition) => (eventstore: EventStore) => { const stream = Stream.make(...events); return pipe(stream, Stream.run(eventstore.append(position))); }; const subscribeAndTakeEvents = (count: number, receivedEventsRef: Ref.Ref>) => (stream: Stream.Stream) => pipe( stream, Stream.take(count), Stream.tap((event) => Ref.update(receivedEventsRef, Chunk.append(event))), Stream.runDrain ); const takeAndCollectStreamEvents = (count: number) => (stream: Stream.Stream) => pipe(stream, Stream.take(count), Stream.runCollect); const readFromPosition = (streamId: Effect.Effect, eventNumber: number) => (eventstore: EventStore) => pipe( streamId, Effect.map((streamId) => ({ streamId, eventNumber })), Effect.flatMap(eventstore.read), Effect.flatMap(takeAndCollectStreamEvents(2)) ); const createWrongEndPosition = (streamId: EventStreamId) => pipe( streamId, beginning, Effect.map((streamBeginning: EventStreamPosition) => ({ streamId: streamBeginning.streamId, eventNumber: 10, })) ); const subscribeFromBeginning = (streamId: Effect.Effect) => (eventstore: EventStore) => pipe(streamId, Effect.flatMap(beginning), Effect.flatMap(eventstore.subscribe)); const takeZeroAndDrain = (receivedEventsRef: Ref.Ref>) => (stream: Stream.Stream) => pipe( stream, Stream.take(0), Stream.tap((event) => Ref.update(receivedEventsRef, Chunk.append(event))), Stream.runDrain ); const readAndTakeZeroEvents = ( streamId: Effect.Effect, receivedEventsRef: Ref.Ref> ) => (eventstore: EventStore) => pipe( streamId, Effect.flatMap(beginning), Effect.flatMap(eventstore.read), Effect.flatMap(takeZeroAndDrain(receivedEventsRef)) ); const appendEventsAtPosition = (events: readonly FooEvent[], store: EventStore) => (pos: EventStreamPosition) => pipe(store, appendEventsToStream(events, pos)); const writeEventsAtBeginning = (streamId: EventStreamId, events: readonly FooEvent[]) => (store: EventStore) => pipe(streamId, beginning, Effect.flatMap(appendEventsAtPosition(events, store))); const subscribeAtPosition = (position: EventStreamPosition, receivedEventsRef: Ref.Ref>) => (store: EventStore) => { const subscription = store.subscribe(position); return pipe(subscription, Effect.flatMap(subscribeAndTakeEvents(3, receivedEventsRef))); }; const expectConflictError = (error: unknown) => { expect(error).toBeInstanceOf(ConcurrencyConflictError); }; const appendAndExpectConflict = (events: readonly FooEvent[], position: EventStreamPosition) => (eventstore: EventStore) => pipe( eventstore, appendEventsToStream(events, position), Effect.flip, Effect.map(expectConflictError) ); const subscribeFromBeginningAndTake = ( streamId: Effect.Effect, count: number, receivedEventsRef: Ref.Ref> ) => (eventstore: EventStore) => pipe( eventstore, subscribeFromBeginning(streamId), Effect.flatMap(subscribeAndTakeEvents(count, receivedEventsRef)) ); export class FooEventStore extends Effect.Tag('FooEventStore')< FooEventStore, EventStore >() {} /** * Options for configuring the EventStore test suite */ export interface EventStoreTestOptions { /** * Whether the implementation supports horizontal scaling with multiple instances. * Set to false for in-memory implementations that don't have a shared backend. * @default true */ readonly supportsHorizontalScaling?: boolean; } /** * Reusable test suite for EventStore implementations * * @param name - Display name for the implementation (e.g., "In-memory", "PostgreSQL") * @param makeEventStore - Function that returns a Layer providing the EventStore implementation * @param options - Optional configuration for the test suite */ export function runEventStoreTestSuite( name: string, makeEventStore: () => Layer.Layer, options: EventStoreTestOptions = {} ) { const { supportsHorizontalScaling = true } = options; describe(`${name} EventStore`, () => { let eventstore: Layer.Layer; const runPromiseWithEventStore = ( effect: Effect.Effect ): Promise => // After providing the eventstore Layer, we get Effect // We need to handle the unknown requirement - assume test caller provides complete Layer // Type assertion needed because we can't statically know the Layer's requirements pipe( effect, Effect.provide(eventstore), (e) => e as Effect.Effect, Effect.runPromise ); beforeAll(() => { eventstore = makeEventStore(); }); describe('appending events to the beginning of an empty stream', () => { let streamId: Effect.Effect; let streamBeginning: EventStreamPosition; let result: EventStreamPosition; beforeEach(async () => { streamId = newEventStreamId(); streamBeginning = await pipe(streamId, Effect.flatMap(beginning), Effect.runPromise); result = await runPromiseWithEventStore( pipe( FooEventStore, Effect.flatMap(appendEventsToStream([{ bar: 'baz' }, { bar: 'qux' }], streamBeginning)) ) ); }); describe('when collecting the events from the start of the stream', () => { let eventsRead: Chunk.Chunk; beforeEach(async () => { eventsRead = await runPromiseWithEventStore( pipe(FooEventStore, Effect.flatMap(readStreamFromBeginning(streamId))) ); }); it('should read all of the events', () => { expect(toArraySafely(eventsRead)).toEqual([{ bar: 'baz' }, { bar: 'qux' }]); }); }); describe('and adding more events to the start of the stream', () => { it('should fail because the stream is not empty', async () => { await runPromiseWithEventStore( pipe( FooEventStore, Effect.flatMap(appendAndExpectConflict([{ bar: 'foo' }], streamBeginning)) ) ); }); }); describe('and adding more events to the new end of the stream', () => { beforeEach(async () => { await runPromiseWithEventStore( pipe(FooEventStore, Effect.flatMap(appendEventsToStream([{ bar: 'foo' }], result))) ); }); describe('when collecting the events from the start of the stream', () => { it('should return all the events that have been appended to the stream', async () => { expect( toArraySafely( await runPromiseWithEventStore( pipe(FooEventStore, Effect.flatMap(readStreamFromBeginning(streamId))) ) ) ).toEqual([{ bar: 'baz' }, { bar: 'qux' }, { bar: 'foo' }]); }); }); describe('when collecting the events from partway through the stream', () => { it('should return the requested event and those that follow to the end of the stream', async () => { expect( toArraySafely( await runPromiseWithEventStore( pipe(FooEventStore, Effect.flatMap(readFromPosition(streamId, 1))) ) ) ).toEqual([{ bar: 'qux' }, { bar: 'foo' }]); }); }); describe('trying to add more events at the previous end of the stream', () => { it('should fail because the stream end has moved', () => runPromiseWithEventStore( pipe( FooEventStore, Effect.flatMap(appendAndExpectConflict([{ bar: 'oh-oh' }], result)) ) )); }); describe('appending events to the beginning of another empty stream', () => { let secondStreamId: Effect.Effect; beforeEach(async () => { secondStreamId = newEventStreamId(); const secondStreamBeginning = pipe( secondStreamId, Effect.flatMap(beginning), Effect.runSync ); result = await runPromiseWithEventStore( pipe( FooEventStore, Effect.flatMap( appendEventsToStream([{ bar: 'baz' }, { bar: 'qux' }], secondStreamBeginning) ) ) ); }); }); }); }); describe('appending events to the wrong end of an empty stream', () => { let emptyStreamWrongEnd: EventStreamPosition; beforeEach(() => { const emptyStreamId = newEventStreamId(); emptyStreamWrongEnd = pipe( emptyStreamId, Effect.flatMap(createWrongEndPosition), Effect.runSync ); }); it('should fail because the stream is empty', () => runPromiseWithEventStore( pipe( FooEventStore, Effect.flatMap(appendAndExpectConflict([{ bar: 'foo' }], emptyStreamWrongEnd)) ) )); }); describe('collecting from a non-existent stream', () => { let nonExistentStreamId: Effect.Effect; let result: Chunk.Chunk; beforeEach(async () => { nonExistentStreamId = newEventStreamId(); result = await runPromiseWithEventStore( pipe(FooEventStore, Effect.flatMap(readStreamFromBeginning(nonExistentStreamId))) ); }); it('should return no events', () => { expect(result).toHaveLength(0); }); }); describe('reading events immediately after writing to same stream', () => { let streamId: Effect.Effect; let streamBeginning: EventStreamPosition; beforeEach(async () => { streamId = newEventStreamId(); streamBeginning = await pipe(streamId, Effect.flatMap(beginning), Effect.runPromise); }); it('should be able to read events immediately after writing them', async () => { // Write events to the stream await runPromiseWithEventStore( pipe( FooEventStore, Effect.flatMap( appendEventsToStream( [{ bar: 'immediate-test-1' }, { bar: 'immediate-test-2' }], streamBeginning ) ) ) ); // Immediately try to read from the same stream (simulates session join after creation) const eventsRead = await runPromiseWithEventStore( pipe(FooEventStore, Effect.flatMap(readStreamFromBeginning(streamId))) ); // Should be able to read the events we just wrote expect(toArraySafely(eventsRead)).toEqual([ { bar: 'immediate-test-1' }, { bar: 'immediate-test-2' }, ]); }); it('should be able to read historical events immediately after writing them', async () => { // Write events to the stream await runPromiseWithEventStore( pipe( FooEventStore, Effect.flatMap( appendEventsToStream( [{ bar: 'historical-test-1' }, { bar: 'historical-test-2' }], streamBeginning ) ) ) ); // Immediately try to read using read (which returns only historical events) const eventsRead = await runPromiseWithEventStore( pipe(FooEventStore, Effect.flatMap(readStreamFromBeginning(streamId))) ); // Should be able to read the events we just wrote expect(toArraySafely(eventsRead)).toEqual([ { bar: 'historical-test-1' }, { bar: 'historical-test-2' }, ]); }); }); describe('subscription functionality', () => { let streamId: Effect.Effect; let streamBeginning: EventStreamPosition; beforeEach(async () => { streamId = newEventStreamId(); streamBeginning = await pipe(streamId, Effect.flatMap(beginning), Effect.runPromise); }); it('should receive events written to a subscribed stream', async () => { const receivedEventsRef = Ref.unsafeMake(Chunk.empty()); // First write some initial events to the stream await runPromiseWithEventStore( pipe( FooEventStore, Effect.flatMap(appendEventsToStream([{ bar: 'initial-event' }], streamBeginning)) ) ); // Start subscription from the beginning (which includes the initial event) const subscriptionEffect = runPromiseWithEventStore( pipe( FooEventStore, Effect.flatMap(subscribeFromBeginningAndTake(streamId, 3, receivedEventsRef)) ) ); // Start the subscription const subscription = subscriptionEffect.catch(() => { // Handle subscription errors gracefully in tests }); // Give subscription time to establish and receive initial event await pipe(100, Duration.millis, Effect.sleep, Effect.runPromise); // Write more events to trigger live streaming const nextPosition = await pipe( streamId, Effect.map((streamId) => ({ streamId, eventNumber: 1 })), Effect.runPromise ); await runPromiseWithEventStore( pipe( FooEventStore, Effect.flatMap( appendEventsToStream([{ bar: 'live-event-1' }, { bar: 'live-event-2' }], nextPosition) ) ) ); // Wait for subscription to process events await Promise.race([ subscription, pipe(1000, Duration.millis, Effect.sleep, Effect.runPromise), ]); // Verify events were received (historical + live) const receivedEvents = await pipe( receivedEventsRef, Ref.get, Effect.map(Chunk.toReadonlyArray), Effect.runPromise ); expect(receivedEvents).toEqual([ { bar: 'initial-event' }, { bar: 'live-event-1' }, { bar: 'live-event-2' }, ]); }); it('should handle multiple subscribers to the same stream', async () => { const subscriber1EventsRef = Ref.unsafeMake(Chunk.empty()); const subscriber2EventsRef = Ref.unsafeMake(Chunk.empty()); // Create first subscription const subscription1 = runPromiseWithEventStore( pipe( FooEventStore, Effect.flatMap(subscribeFromBeginningAndTake(streamId, 1, subscriber1EventsRef)) ) ).catch(() => {}); // Create second subscription const subscription2 = runPromiseWithEventStore( pipe( FooEventStore, Effect.flatMap(subscribeFromBeginningAndTake(streamId, 1, subscriber2EventsRef)) ) ).catch(() => {}); // Give subscriptions time to establish await pipe(50, Duration.millis, Effect.sleep, Effect.runPromise); // Write an event after subscriptions are established await runPromiseWithEventStore( pipe( FooEventStore, Effect.flatMap( appendEventsToStream([{ bar: 'multi-subscriber-event' }], streamBeginning) ) ) ); // Wait for both subscriptions to complete await Promise.all([ Promise.race([ subscription1, pipe(1000, Duration.millis, Effect.sleep, Effect.runPromise), ]), Promise.race([ subscription2, pipe(1000, Duration.millis, Effect.sleep, Effect.runPromise), ]), ]); // Both subscribers should receive the same event const subscriber1Events = await pipe( subscriber1EventsRef, Ref.get, Effect.map(Chunk.toReadonlyArray), Effect.runPromise ); const subscriber2Events = await pipe( subscriber2EventsRef, Ref.get, Effect.map(Chunk.toReadonlyArray), Effect.runPromise ); expect(subscriber1Events).toEqual([{ bar: 'multi-subscriber-event' }]); expect(subscriber2Events).toEqual([{ bar: 'multi-subscriber-event' }]); }); it('should handle subscription to non-existent stream', async () => { const receivedEventsRef = Ref.unsafeMake(Chunk.empty()); // Subscribe to stream that doesn't exist const subscription = runPromiseWithEventStore( pipe(FooEventStore, Effect.flatMap(readAndTakeZeroEvents(streamId, receivedEventsRef))) ).catch(() => {}); await Promise.race([ subscription, pipe(500, Duration.millis, Effect.sleep, Effect.runPromise), ]); // Should receive no events const receivedEvents = await pipe(receivedEventsRef, Ref.get, Effect.runPromise); expect(receivedEvents).toHaveLength(0); }); }); /** * Test horizontal scaling with multiple EventStore instances * This simulates multiple application instances sharing the same data store */ if (supportsHorizontalScaling) { describe('horizontal scaling with multiple instances', () => { it('should support cross-instance event propagation', async () => { const receivedEventsRef = Ref.unsafeMake(Chunk.empty()); const streamId = await pipe(newEventStreamId(), Effect.runPromise); // Create TWO separate EventStore instances (simulating different application instances) const writerInstance = makeEventStore(); const subscriberInstance = makeEventStore(); // Helper to run effects with different instances const runWithWriter = (effect: Effect.Effect) => pipe( effect, Effect.provide(writerInstance), (e) => e as Effect.Effect, Effect.runPromise ); const runWithSubscriber = (effect: Effect.Effect) => pipe( effect, Effect.provide(subscriberInstance), (e) => e as Effect.Effect, Effect.runPromise ); // FIRST: Write initial events using the writer instance await runWithWriter( pipe( FooEventStore, Effect.flatMap( writeEventsAtBeginning(streamId, [{ bar: 'event-1' }, { bar: 'event-2' }]) ) ) ); // SECOND: Subscribe from the beginning with subscriber instance // Should receive both historical events AND any new events const subscriberFiber = runWithSubscriber( pipe( FooEventStore, Effect.flatMap(subscribeAtPosition({ streamId, eventNumber: 0 }, receivedEventsRef)), Effect.scoped ) ).catch(() => {}); // Give subscription time to receive historical events await pipe(100, Duration.millis, Effect.sleep, Effect.runPromise); // THIRD: Write a new event with writer instance // This should be received by the active subscription on the subscriber instance await runWithWriter( pipe( FooEventStore, Effect.flatMap( appendEventsToStream([{ bar: 'event-3-live' }], { streamId, eventNumber: 2 }) ) ) ); // Wait for subscription to complete const receivedEvents = await pipe( receivedEventsRef, Ref.get, Effect.map(Chunk.toReadonlyArray), Effect.runPromise ); await Promise.race([ subscriberFiber, pipe( 5000, Duration.millis, Effect.sleep, Effect.andThen( Effect.fail( new Error( `Subscription timed out. Received ${receivedEvents.length} events: ${JSON.stringify(receivedEvents)}` ) ) ), Effect.runPromise ), ]); // Verify we received all events (2 historical + 1 live) const finalReceivedEvents = await pipe( receivedEventsRef, Ref.get, Effect.map(Chunk.toReadonlyArray), Effect.runPromise ); expect(finalReceivedEvents).toHaveLength(3); expect(finalReceivedEvents[0]).toEqual({ bar: 'event-1' }); expect(finalReceivedEvents[1]).toEqual({ bar: 'event-2' }); expect(finalReceivedEvents[2]).toEqual({ bar: 'event-3-live' }); }); }); } }); }