Skip to content

Event store

To isolate the core library from a particular way of storing events, Eventuous uses the IEventStore abstraction. Whilst it’s used by AggregateStore, you can also use it in a more generic way, when you need to persist or read events without having an aggregate.

The IEventStore interface inherits from IEventReader and IEventWriter interfaces. Each of those interfaces is focused on one specific task – reading events from streams and appending events to streams. This separation is necessary for scenarios when you only need, for example, to read events from a specific store but not to append them. In such a case, you’d want to use the IEventReader interface only.

Eventuous has several implementations of event store abstraction, you can find them in the infrastructure section. The default implementation is KurrentDBEventStore, which uses KurrentDB as the event store. It’s a great product, and we’re happy to provide first-class support for it. It’s also a great product for learning about Event Sourcing and CQRS.

In addition, Eventuous has an in-memory event store, which is mostly used for testing purposes. It’s not recommended to use it in production, as it doesn’t provide any persistence guarantees.

Event store works with a couple of primitives, which allow wrapping infrastructure-specific structures. Those primitives are:

Record typeWhat it’s for
StreamReadPositionRepresent the stream revision, from there the event store will read the stream forwards or backwards.
ExpectedStreamVersionThe stream version (revision), which we expect to have in the database, when event store tries to append new events. Used for optimistic concurrency.
StreamEventA structure, which holds the event type as a string as well as serialised event payload and metadata.

All of those are immutable records.

The event store provides the following operations:

FunctionWhat’s it for
AppendEventsAppend one or more events to a given stream.
AppendEventsAppend events to multiple streams in a single operation.
ReadEventsRead events from a stream forwards, from a given start position.
StreamExistsCheck if a stream exists.
TruncateStreamRemove events from a stream up to a given position.
DeleteStreamDelete a stream entirely.

Use AppendEvents to persist events. The ExpectedStreamVersion parameter provides optimistic concurrency control.

var streamName = new StreamName("Order-123");
var events = new[] {
new NewStreamEvent(Guid.NewGuid(), new OrderCreated("123", 99.99m), new Metadata()),
new NewStreamEvent(Guid.NewGuid(), new OrderConfirmed("123"), new Metadata())
};
// Append to a new stream
var result = await eventStore.AppendEvents(
streamName,
ExpectedStreamVersion.NoStream, // stream must not exist yet
events,
cancellationToken
);
// result.NextExpectedVersion can be used for subsequent appends

The Store extension method accepts plain domain event objects and handles wrapping them into NewStreamEvent instances:

var streamName = new StreamName("Order-123");
object[] changes = [new OrderCreated("123", 99.99m), new OrderConfirmed("123")];
var result = await eventStore.Store(
streamName,
ExpectedStreamVersion.NoStream,
changes,
cancellationToken: cancellationToken
);

You can append events to multiple streams in a single operation using the multi-stream overload of AppendEvents:

NewStreamAppend[] appends = [
new(orderStream, ExpectedStreamVersion.NoStream, orderEvents),
new(inventoryStream, new ExpectedStreamVersion(currentVersion), inventoryEvents)
];
AppendEventsResult[] results = await eventStore.AppendEvents(appends, cancellationToken);

Each element specifies a target stream, its expected version, and the events to append. The return array contains one AppendEventsResult per stream in the same order as the input.

Atomicity guarantees vary by store:

StoreAtomicity
KurrentDB (25.1+)Atomic — all streams updated or entire operation fails
PostgreSQLAtomic — uses a single database transaction
SQL ServerAtomic — uses a single database transaction
SQLiteAtomic — uses a single database transaction
Default (other stores)Not atomic — streams are written sequentially, fails on first error
var streamName = new StreamName("Order-123");
// Read up to 100 events from the start
var events = await eventStore.ReadEvents(
streamName,
StreamReadPosition.Start,
count: 100,
failIfNotFound: true,
cancellationToken
);
foreach (var evt in events) {
// evt.Payload contains the deserialized event object
// evt.Revision is the position within the stream
// evt.Created is the timestamp
Console.WriteLine($"Event {evt.Revision}: {evt.Payload}");
}

The ReadStream extension method handles pagination automatically and returns all events:

var allEvents = await eventStore.ReadStream(
streamName,
StreamReadPosition.Start,
failIfNotFound: true,
cancellationToken
);
// Check if a stream exists
bool exists = await eventStore.StreamExists(streamName, cancellationToken);
// Truncate a stream up to a given position
await eventStore.TruncateStream(
streamName,
new StreamTruncatePosition(5),
new ExpectedStreamVersion(currentVersion),
cancellationToken
);
// Delete a stream
await eventStore.DeleteStream(
streamName,
new ExpectedStreamVersion(currentVersion),
cancellationToken
);

Extension methods on IEventReader and IEventWriter provide aggregate-level operations:

// Load an aggregate from its event stream
var booking = await eventReader.LoadAggregate<Booking, BookingState, BookingId>(
bookingId,
cancellationToken: cancellationToken
);
// Execute domain logic
booking.Confirm();
// Persist new events
await eventWriter.StoreAggregate<Booking, BookingState, BookingId>(
booking,
bookingId,
cancellationToken: cancellationToken
);

Eventuous has several implementations of the event store:

If you use one of the implementations provided, you won’t need to know about the event store abstraction. It is required though if you want to implement it for your preferred database.