The good and the bad of event bus in event sourcing
The event bus is a powerful design pattern in the world of event sourcing. It allows you to decouple the event producers from the event consumers. But it also has its downsides.
We are happy to announce the release of the php event sourcing library in version 3.4.0. This release contains several exciting new features and improvements. In this blog post, we will provide you with an overview of the changes.
We have added a new in-memory event store. This store is useful for testing and prototyping. It is not recommended for production use because it does not persist events. Aside from that, it behaves like any other event store and is feature-complete.
use Patchlevel\EventSourcing\Store\InMemoryStore;
use Patchlevel\EventSourcing\Message\Message;
$store = new InMemoryStore([
Message::create(new NameChanged('foo')),
]);
As you can see, you can pass an array of messages to the constructor to initialize the store with some events.
Furthermore, we have added a new experimental stream event store.
use Doctrine\DBAL\DriverManager;
use Doctrine\DBAL\Tools\DsnParser;
use Patchlevel\EventSourcing\Serializer\DefaultEventSerializer;
use Patchlevel\EventSourcing\Store\StreamDoctrineDbalStore;
$connection = DriverManager::getConnection(
(new DsnParser())->parse('pdo-pgsql://user:secret@localhost/app'),
);
$store = new StreamDoctrineDbalStore(
$connection,
DefaultEventSerializer::createFromPaths(['src/Event']),
);
The main difference from the previous event store is that the columns aggregate
and aggregate_id
are merged into a single column, stream
.
If we have an aggregate with the name order
and the ID 123
, the stream name would be order-123
.
To make this work, we created a new header, StreamHeader
, as a replacement for AggregateHeader
.
use Patchlevel\EventSourcing\Store\StreamHeader;
$store->store(
Message::create(new NameChanged('foo'))
->withHeader(new StreamHeader('order-123'))
);
What is the benefit compared to the other store? The main advantage is that you can store messages from other streams that are not related to aggregates in the same table. For example, you can store events consumed by a message queue like Kafka or created outside the event sourcing context, such as with Doctrine ORM. This allows us to use the powerful subscription engine to consume events from different sources.
Another new experimental feature is the support for child aggregates.
In some cases, it makes sense to split an aggregate into several smaller aggregates. This can happen if the aggregate becomes too large or if it is used in different contexts. Child aggregates can be used for this purpose and function in the same way as the root aggregate.
In the following example, we have an order aggregate that includes a shipping child aggregate.
use Patchlevel\EventSourcing\Aggregate\BasicChildAggregate;
final class Shipping extends BasicChildAggregate
{
private bool $arrived = false;
public function __construct(
private string $trackingId,
) {
}
public function arrive(): void
{
$this->recordThat(new Arrived());
}
#[Apply]
public function applyArrived(Arrived $event): void
{
$this->arrived = true;
}
public function isArrived(): bool
{
return $this->arrived;
}
}
And the order aggregate root looks like this:
use Patchlevel\EventSourcing\Aggregate\BasicAggregateRoot;
use Patchlevel\EventSourcing\Aggregate\Uuid;
use Patchlevel\EventSourcing\Attribute\ChildAggregate;
#[Aggregate('order')]
final class Order extends BasicAggregateRoot
{
#[ChildAggregate]
private Shipping $shipping;
public static function create(Uuid $id, string $trackingId): static
{
$self = new static();
$self->recordThat(new OrderCreated($id, $trackingId));
return $self;
}
public function applyOrderCreated(OrderCreated $event): void
{
$this->shipping = new Shipping($event->trackingId);
}
public function arrive(): void
{
$this->shipping->arrive();
}
}
As you can see, the shipping aggregate is a child aggregate of the order aggregate.
To make this work, we need to add the #[ChildAggregate]
attribute to the property.
The child aggregate handles its own events and can be used in the same way as the root aggregate.
We have achieved a more structured and easier-to-understand code.
Let us know what you think about this feature.
We have added new experimental features in this release. These features are not yet stable and may change in future releases. Please give us feedback on these features so we can improve them. We will notify you when these features are stable from an API perspective.
We also released version 3.3.0 of the Symfony event sourcing bundle that integrates the new features of the event sourcing library. You can start right away with the new version and use the new features in your Symfony application.
The event bus is a powerful design pattern in the world of event sourcing. It allows you to decouple the event producers from the event consumers. But it also has its downsides.
Often, when talking with people about event sourcing and whether they have used it, I get the same answer: "No, I have never used it, but I want to!" My question afterward is always: "So why the hesitation?" Let’s take a look at that.