Are you aware that storing and updating current state means loosing important data?
Event sourcing is a way to solve this problem. It is the technique of storing state transitions rather than updating the current state itself.
Event sourcing has some more benefits:
- Complete audit-proof log for free
- Complete history of every state change ever
- No more mapping objects to tables
- Distribution support
- CQRS (Command Query Responsibility Segregation) support
- Natural fit for domain-driven design and functional programming
- Be prepared for unanticipated use cases in the future (for free)
State transitions are an important part of our problem space and should be modelled within our domain -- Greg Young
When I first encountered the concept of event sourcing and CQRS and looked at some sample applications, I had the impression that it must be extremely difficult to implement. But later I found out that event sourcing is easier than I first thought, especially when it is expressed with functional programming.
Here are 12 things about event sourcing that should help you to get started today.
1. An event is something that has happened in the past
An event is a fact that happened in the past.
Events indicate that something within the domain has changed. They contain all the information that is needed to transform the state of the domain from one version to the next.
Events make the concept of side effects explicit. In an event sourced system, a state change is no longer an implicit result of performed operation. Instead the state change is explicitly defined and expressed in the domain language.
Here are some simplified example events for an accounting system:
sealed trait Event
final case class OnlineAccountCreated(accountId: UUID) extends Event
final case class DepositMade(accountId: UUID, depositAmount: BigDecimal) extends Event
final case class MoneyWithdrawn(accountId: UUID, withdrawalAmount: BigDecimal) extends Event
Events vs. Commands
Commands on the other hand have the intent of asking the system to perform an operation. Commands are represented in the imperative mood (e.g. CreateOnlineAccount
) and can be rejected by the application.
2. An event should be presented as a verb in past tense
State transitions are an important part of the domain. Therefore they are explicitly recorded as events and should be named according to the ubiquitous language.
Because events are facts that have happened in the past, they should be presented as verbs in past tense.
3. State is a first level derivative of the event stream
Current state (e.g. the balance of a bank account) is the derivative of all the events that have happened up until now.
This means that an object is persisted as a stream of events. There is no need anymore to map its structural model with all its properties to tables in a relational database.
"There is not an impedance mismatch between events and the domain model." -- Greg Young
State is also often referred to as a projection of the event stream.
4. Every aggregate has its own event stream
An aggregate is a cluster of associated objects treated as a single unit. This could be e.g. an order and its line-items.
In the first example from above the aggregate is the bank account.
Every aggregate has its own event stream. Therefore every event must be stored together with an identifier for its aggregate. This ID is often called AggregateId
, or StreamId
.
5. The structure of an event and how to persist it
As just mentioned, an event must have an aggregate ID because we must be able to query events by their aggregate ID.
They also need either a timestamp or a sequence or version number. This is important because we have to be able to sort the event stream chronologically to be able to correctly replay the events.
The aggregate ID and the sort key do not necessarily have to be part of the domain event's data structure although they sure can be. I personally find it a little redundant to store the aggregate ID within every event payload. In the domain code the ID is always known within the context and can be passed explicitly if needed. But I guess it is the default for most implementations to be part of the event data.
We also need so store the event data itself. This is usually called Data
or Payload
and is done by serializing the domain event.
The minimum information to store per event is:
Column | Type |
---|---|
StreamId | Guid |
Data | Blob |
Version | Int |
There is also some additional but optional information that can be stored if convenient or required by the business:
- The event type (e.g.
ShippingInformationAdded
) - Event version (unique within context of a given stream)
- Correlation ID
- Timestamp
- Other meta data (e.g. user, permission level, IP addresses)
When persisting events, the current version of the event stream (sometimes called StreamRevision
) must be equal to the expected version. The expected version is the version of the stream on which the creation of the given events was based on. If the version is not equal to the expected version there will be a concurrency conflict. This check must be done within a single transaction.
Note that the StreamId
column should be indexed.
Additionally it can be useful to to track all the aggregates currently in the system, e.g. in a separate table.
A selection of suitable event storages are e.g. Event Store, Redis, or plain old relational databases. (I guess there are many more options that I haven't evaluated, yet.)
You can find more detailed information on event storage in the CQRS Documents by Greg Young under Building an Event Storage.
6. Events are immutable
An event is a fact that happened in the past. So unless you've invented a time machine, it cannot be changed.
7. There is no delete
Just as events cannot be changed, they cannot be deleted either. A deletion is just another event, a compensating action sometimes called "reversal transaction".
Immutable events and streams have some benefits:
- Append-only models distribute more easily than updating models. There are far fewer locks to deal with and horizontal partitioning with the aggregate ID as the partition key is easy.
- No information is lost. This is extremely valuable if the business can derive a competitive advantage from the data.
8. The apply function
want to learn event sourcing?
f(state, event) => state
— gregyoung (@gregyoung) 17. März 2013
The apply
function is the essence of event sourcing. It takes a state and an event and returns a new state:
Here is a code example:
def apply(account: Account, event: Event): Account = {
(account, event) match {
case (Uninitialized, OnlineAccountCreated(accountId)) =>
OnlineAccount(accountId, 0)
case (OnlineAccount(accountId, balance), DepositMade(_, depositAmount)) =>
OnlineAccount(accountId, balance + depositAmount)
case (OnlineAccount(accountId, balance), MoneyWithdrawn(_, withdrawalAmount)) =>
OnlineAccount(accountId, balance - withdrawalAmount)
case _ => account
}
}
Note that the apply
function is pure. There must be no side effects when applying events to recreate the aggregate.
9. The replay function
To recreate state from an event stream we need to replay all the events. This is simply done by folding the events given an initial state. The replay
function takes an initial state and the event stream and returns the current state of the aggregate and its version:
It is convenient to also return the version to be able to detect concurrency conflicts. Here is the code:
def replay(initial: Account, events: List[Event]): (Account, Int) = {
events.foldLeft((initial, -1)) {
case ((state, version), event) => (apply(state, event), version + 1)
}
}
10. The decide function
The decide
function takes the current state and a command as inputs. It then decides whether the operation that the command requested can be performed at the current state by applying business rules. If so it will return one or more events:
Let's look at the code:
def decide(cmd: Command, state: Account): Either[Error, List[Event]] = {
(state, cmd) match {
case (Uninitialized, CreateOnlineAccount(accountId)) =>
Right(List(OnlineAccountCreated(accountId)))
case (OnlineAccount(accountId, balance), MakeDeposit(_, amount)) =>
if (amount <= 0) {
Left("deposit amount must be positive")
} else {
Right(List(DepositMade(accountId, amount)))
}
case (OnlineAccount(accountId, balance), Withdraw(_, amount)) =>
if (amount <= 0) {
Left("withdrawal amount must be positive")
} else if (balance - amount < 0) {
Left("overdraft not allowed")
} else {
Right(List(MoneyWithdrawn(accountId, amount)))
}
case _ =>
Left(s"invalid operation $cmd on current state $state")
}
}
Note that validation of the command is also done in the decide
function.
Commands will be rejected if:
- The deposit or withdrawal amount is less or equal than 0
- The account does not exist (will be handled by the default case)
- A withdrawal would result in a negative account balance
11. Event sourcing: the complete pattern
Now we need to combine the replay
operation, the decide
function, and the event persistence to create a fully functional event sourcing application.
Here is how it works:
Let's create a Scala case class that represents the event store:
final case class EventStore(
appendToStream: (String, Int, List[Event]) => Either[Error, Unit],
readFromStream: String => Either[Error, List[Event]])
Now we can implement the integration code:
def handleCommand(store: EventStore)
(accountId: UUID, cmd: Command) : Either[Error, Unit] = {
for {
events <- store.readFromStream(accountId.toString)
(state, version) = Domain.replay(Uninitialized, events)
newEvents <- Domain.decide(cmd, state)
_ <- store.appendToStream(accountId.toString, version, newEvents)
} yield ()
}
Testing the application
To test the application we will create a console application:
val store = EventStore(
appendToStream = InMemoryEventStore.appendToStream,
readFromStream = InMemoryEventStore.readFromStream
)
// dependency injection by partial application
val handle = CommandHandling.handleCommand(store) _
def query(accountId: UUID) = {
store
.readFromStream(accountId.toString)
.map(events => Domain.replay(Uninitialized, events))
}
val accountId = UUID.randomUUID()
val commands = List(
CreateOnlineAccount(accountId),
MakeDeposit(accountId, 1000),
Withdraw(accountId, 500),
Withdraw(accountId, 501)
)
commands.foreach(cmd => {
val result = handle(accountId, cmd)
println(cmd)
println(s"${result.fold(err => s"[ERROR] $err", _ => "[SUCCESS]")}")
println(s"current state: ${query(accountId).fold(err => "invalid", account => s"$account")}")
})
First we create an in-memory event store. The implementation is not shown for the sake of brevity. You can find it here. The in-memory implementation is only for testing and demonstration purposes, don't use this in production!
Then we inject the store into the handle function by partial application.
We create a query function that reads the event stream from the in-memory event store and replays the events.
Then we handle a few commands and print the results to the console:
CreateOnlineAccount(7c05de5a-a2bd-4c2a-8ca1-20438db94b9a)
[SUCCESS]
current state: (OnlineAccount(7c05de5a-a2bd-4c2a-8ca1-20438db94b9a,0),0)
MakeDeposit(7c05de5a-a2bd-4c2a-8ca1-20438db94b9a,1000)
[SUCCESS]
current state: (OnlineAccount(7c05de5a-a2bd-4c2a-8ca1-20438db94b9a,1000),1)
Withdraw(7c05de5a-a2bd-4c2a-8ca1-20438db94b9a,500)
[SUCCESS]
current state: (OnlineAccount(7c05de5a-a2bd-4c2a-8ca1-20438db94b9a,500),2)
Withdraw(7c05de5a-a2bd-4c2a-8ca1-20438db94b9a,501)
[ERROR] overdraft not allowed
current state: (OnlineAccount(7c05de5a-a2bd-4c2a-8ca1-20438db94b9a,500),2)
12. When to apply event sourcing?
Finally let's discuss when to apply event souring.
Greg Young says:
"In fact every domain is a naturally transaction based domain when Domain Driven Design is being applied"
So this means that event sourcing can be applied to any application that is driven by the domain.
However, event sourcing comes with some costs regarding the implementation effort when every behavior is modelled explicitly. It also will be more expensive regarding the disk costs because it leads to larger amounts of data being stored.
It's a trade-off. We have to decide wether theses costs are worth the ROI for the business. There's more information on the trade-offs in Greg Young's CQRS document.
Generally speaking, I suggest to consider event sourcing when:
- You want to leverage any of the benefits of event sourcing
- The domain is behavior driven
- The system is task based
- The system is not CRUD based
Conclusion
We have discussed the most important ideas of event sourcing.
While doing this we created a fully functional event sourcing application by applying these ideas in practice.
Even with the extra overhead of explicitly modelling all state transitions, the domain could be implemented in an uncluttered less then 90 lines single source file.
There are still a few open ends that we haven't covered:
- Durability of commands and events
- Performance issues that arise when there is a very large number of events that have to be processed
- How to improve performance by implementing snapshots and CQRS
- How to improve performance by keeping aggregates in memory e.g. with Akka
- How to deal with asynchrony
- Evolving and upgrading events
- Error handling
- Aggregates can only be queries by their ID, however, SQL-like querying can be implemented with CQRS and appropriate read models.
- How to do inter bounded context communication (context maps and process managers)
- How to implement sagas
- …
While many of these aspects are non-trivial it is always a good idea to start out simple which is what I did in this post.
Don't introduce complex concepts until you've proven their need!
So if you are new to event sourcing, I hope this gives you a good starting point. Tell me how you are doing.
If you are an expert on event sourcing, and you have any comments or suggestions, please let me know.
The complete source code can be found on GitHub.