The event bus is a powerful design pattern in the world of event sourcing.
It allows you to decouple the event producers from the event consumers.
But it also has its downsides, if you use it as backbone of your event sourcing system.
In this post, I will discuss the good and the bad of the event bus in event sourcing.
The Good
Let’s begin with the positives. The event bus enables you to decouple event producers from event consumers.
This pattern is very useful for event sourcing because an event is published somewhere and you have several subscribers that react to it.
This can range from processors that trigger actions like sending emails, to generating projections and read models.
Here, we’ll show you a simplified version of how the pattern is implemented in the PHP event sourcing library.
We will also use classes and features such as the Event Bus from the library.
However, this principle applies to any other Event Bus implementation as well.
Setup
Let’s first take a look at the entry point.
We create a new aggregate, which internally records the events.
Afterward, we save the aggregate in a repository.
use Patchlevel\EventSourcing\Aggregate\Uuid;
use Patchlevel\EventSourcing\Repository\Repository;
$id = Uuid::generate();
$profile = Profile::create($id, 'Max Mustermann');
$repository->save($profile);
In the repository itself, which is provided by the PHP event sourcing library, the following happens in a simplified manner:
The events are retrieved from the aggregate, packaged into messages, and assigned headers.
Subsequently, the messages are stored and published to the Event Bus.
use Patchlevel\EventSourcing\Aggregate\AggregateRoot;
final class Repository
{
public function save(AggregateRoot $aggregateRoot): void
{
$events = $aggregateRoot->releaseEvents();
$this->store->save(...$messages);
$this->eventBus->dispatch(...$messages);
}
}
But what happens to the events after they land in the Event Bus?
Ultimately, they are forwarded to all subscribers that are interested in the event.
In the PHP event sourcing library, this is handled using the #[Subscribe]
attribute.
A subscriber might look something like this:
use Patchlevel\EventSourcing\Attribute\Subscribe;
use Patchlevel\EventSourcing\Message\Message;
final class WelcomeSubscriber
{
#[Subscribe(ProfileCreated::class)]
public function onProfileCreated(Message $message): void
{
$event = $message->event();
echo sprintf('Welcome %s!', $event->name);
}
}
That's it! We've written the code necessary to publish an event and respond to it.
At this point, we could send emails, trigger various actions, or build projections.
However, there's still something missing when it comes to projections.
Projections Schema
Projections are used to construct read models from the event store,
which could be a relational database, document store, or search index.
Typically, projections require a predefined schema before they can be utilized.
Therefore, we need to create a schema for the projection.
This can be done manually, or we can implement a mechanism to handle it automatically.
Of course, we want to automate this process, so we’ll now write the logic for it.
We’ll define an interface called Projection
with two methods: one for creating schemas and another for removing them.
interface Projection
{
public function setup(): void;
public function teardown(): void;
}
After defining the interface, we can proceed to implement it.
The implementation might look something like this:
use Doctrine\DBAL\Connection;
use Patchlevel\EventSourcing\Message\Message;
final class ProfileProjection implements Projection
{
public function __construct(
private Connection $connection
) {
}
#[Subscribe(ProfileCreated::class)]
public function onProfileCreated(Message $message): void
{
$this->connection->executeStatement('INSERT INTO profile (id, name) VALUES (?, ?)', [
$message->event()->id,
$message->event()->name,
]);
}
public function setup(): void
{
$this->connection->executeStatement('CREATE TABLE profile (id UUID, name VARCHAR(255))');
}
public function teardown(): void
{
$this->connection->executeStatement('DROP TABLE profile');
}
}
The final step is to create CLI commands that handle both the setup and teardown of the projections.
This should be relatively easy to implement with the framework of your choice,
so we will skip this step in this article to keep it concise.
We have now achieved the ability to create and populate projections when an event is published to the Event Bus.
Additionally, we can create and remove the schemas for the projections.
Rebuild Projections
But what if we have a new projection or need to rebuild an existing one?
The solution is quite simple:
we need to retrieve all events from the event store and have them processed by the subscribers again.
Once again, we can leverage the Event Bus to assist with this.
use Patchlevel\EventSourcing\EventBus\DefaultConsumer;
use Patchlevel\EventSourcing\Store\Store;
$consumer = DefaultConsumer::create([$profileProjection]);
$stream = $store->load();
foreach ($stream as $message) {
$consumer->consume($message);
}
Here’s how the code looks for loading all events from the event store and having them processed by the subscribers again.
You can also wrap this code in a CLI command for easy execution.
Recap
In summary, we now have the capability to publish events and respond to them synchronously.
We can create and delete schemas for projections, as well as rebuild projections.
Essentially, we have everything we need for an event sourcing system - right?
The Bad
It may seem that the Event Bus is a silver bullet, allowing us to solve everything with it.
However, there are also challenges in detail when using the Event Bus.
I will now take a closer look at some of these topics with you.
Rebuild Projections
Let’s revisit the last point.
While rebuilding projections is straightforward, it can be time-consuming with a large number of events.
And since we’re currently working synchronously with the Event Bus, we would need to shut down the application to rebuild the projections.
Why do we need to shut down the application?
Because the events are processed by the projections as soon as they are created in the application.
This means that if we rebuild a projection in a separate process, the projection is still available in the application.
New events would be added to the projection simultaneously, causing the events to land in complete disorder within the projection.
Okay, but what if we temporarily remove the projection from the application and only re-enable it once the new projection is complete?
The issue then is that whenever we think the projection is up to date,
there’s always the possibility that new events were created in parallel,
which weren’t present when we queried all events. So, we’re constantly playing catch-up.
This is similar to one of Zeno's paradoxes,
which describes this problem well. The tortoise would represent our event store, and Achilles would represent our projection.
What can we do now?
At the very least, we can reduce downtime by having the projections catch up as much as possible.
Once that point is reached, we can shut down the application and process the final small batch of events.
This should be quick since only a few events need to be handled.
Afterward, we can start the new application, where the projections will again work synchronously with the events.
The alternative would be to let the projections work asynchronously.
But that’s a whole different topic, which we’ll address later.
Error Handling
Now, let’s address a somewhat less pleasant topic. What happens when an error occurs somewhere?
Let’s take another look at the repository code:
use Patchlevel\EventSourcing\Aggregate\AggregateRoot;
final class Repository
{
public function save(AggregateRoot $aggregateRoot): void
{
$events = $aggregateRoot->releaseEvents();
$this->store->save(...$messages);
$this->eventBus->dispatch(...$messages);
}
}
What happens if storing the events in the event store is successful, but executing the subscribers in the event bus fails?
In that case, we may have saved the data, but possibly did not send the email or update the projection.
And what if we change the order of saving and dispatching?
use Patchlevel\EventSourcing\Aggregate\AggregateRoot;
final class Repository
{
public function save(AggregateRoot $aggregateRoot): void
{
$events = $aggregateRoot->releaseEvents();
$this->eventBus->dispatch(...$messages);
$this->store->save(...$messages);
}
}
That doesn’t really help us. Now it could happen that the email is sent, but we don't save the events.
We’ve simply shifted the problem. Okay, but can’t we perhaps build a transaction around it?
use Patchlevel\EventSourcing\Aggregate\AggregateRoot;
final class Repository
{
public function save(AggregateRoot $aggregateRoot): void
{
$events = $aggregateRoot->releaseEvents();
$this->store->transaction(function () use ($messages) {
$this->store->save(...$messages);
$this->eventBus->dispatch(...$messages);
});
}
}
We still have the same problem with email delivery. However, it can help us with projections.
If the projection uses the same database and connection, then the changes can also be rolled back in case of an error.
While we have addressed a solution for a specific type of subscription, we haven't solved it for all others.
We should not be satisfied with that.
Outbox Pattern
When exploring this issue, you will often come across the recommended Outbox Pattern.
Instead of having subscribers process events directly, the events are temporarily stored in an outbox
table within the database.
This can be accomplished by modifying the current implementation of the EventBus
.
Here’s a simplified example of what it might look like:
use Patchlevel\EventSourcing\Message\Message;
final OutboxEventBus implements EventBus
{
public function dispatch(Message ...$messages): void
{
foreach ($messages as $message) {
$this->connection->executeStatement(
'INSERT INTO outbox (message) VALUES (?)',
[serialize($message)]
);
}
}
}
By replacing the actual EventBus implementation with the OutboxEventBus,
we can now truly perform both the storing of events in the event store and the dispatching of the events within a single transaction.
And if an error occurs now, we can roll back both operations.
All that's left is processing the events from the outbox
table.
For this, we need a consumer that reads the events from the outbox
table and then invokes the individual subscribers.
Here is a very simplified implementation:
use Patchlevel\EventSourcing\Message\Message;
final class OutboxConsumer
{
public function consume(): void
{
$rows = $this->connection->executeQuery('SELECT id, message FROM outbox');
foreach ($rows as $row) {
$message = unserialize($row['message']);
foreach ($this->subscribers as $subscriber) {
$subscriber($message);
}
$this->connection->executeStatement('DELETE FROM outbox WHERE id = ?', [$row['id']]);
}
}
}
We just need to run the whole process as a worker in a separate process,
and we’ll have a robust solution to the problem - right?
We have solved the problem when there is exactly one subscriber.
But if we take a closer look, does this also apply when there are multiple subscribers?
What happens if one subscriber throws an error among many?
use Patchlevel\EventSourcing\Message\Message;
final class OutboxConsumer
{
public function consume(): void
{
$rows = $this->connection->executeQuery('SELECT id, message FROM outbox');
foreach ($rows as $row) {
$message = unserialize($row['message']);
$subscriberA($message);
$subscriberB($message);
$subscriberC($message);
$this->connection->executeStatement('DELETE FROM outbox WHERE id = ?', [$row['id']]);
}
}
}
In this example, Subscriber B
has thrown an error.
Subscriber A
has already been called and might have sent an email, for instance.
Subscriber C
will no longer be called because the error in Subscriber B
caused the program to terminate.
We still face the problem of not being able to ensure that either all subscribers were executed successfully or none at all.
Unfortunately, there is no straightforward solution with the Outbox Pattern for this problem.
To be honest, this pattern isn’t even intended for this issue.
The pattern is designed for sending events to an external broker, such as RabbitMQ or Kafka.
If we use this pattern only for sending events to a broker, we won’t have a problem with error handling in our scope,
and it serves as a good solution.
Conclusion
As we can see, it’s not as straightforward as it seems.
People often think that simply integrating an event bus into event sourcing will allow for easy synchronous processing of events to build projections or send emails.
However, people often encounter problems that are then attempted to be resolved with the Outbox Pattern,
which only shifts the issues further down the line. And I've only scratched the surface.
Don’t get me wrong, I think the Event Bus Pattern is great
and has its place in an event sourcing system.
However, it may not always be the best solution for subscriptions, at least not in the way it has been implemented here.
We could go down the path of using a broker like Kafka or RabbitMQ to process events asynchronously.
This would add even more complexity to our system by introducing another technology.
But perhaps it's not necessary? Maybe there's another solution that doesn't require immediately implementing technologies like Kafka.
In the next blog post, I will introduce another solution called the subscription engine,
which might be simpler and less complex, but even more powerful. Stay tuned!