How to combine ORM entities with event sourcing aggregates

This blog post is inspired by a question on GitHub, which was initially about using the Normalizer from our php event-sourcing library, but later resulted in an issue related to combining doctrine ORM-based entities and event sourcing-based aggregates. Since I have been asked this multiple times, I decided to write a little article about it. There are cases where a traditional project receives a new feature where an event sourcing approach would perfectly fit, and you decide to do exactly that. But what about references to the ORM entities? How do we handle them? In the question I mentioned, the user had exactly this situation and took the wrong approach by referencing the ORM-based entity with the new aggregate. This led to a situation where the Normalizer needed more power to transform this into a serializable representation and back.

So let's discuss different approaches to solve this problem, but first, we need to set a baseline.

Setup

We have a car-sharing application where we have an Account as an ORM-based entity. We also have an event sourcing-based CarRide aggregate. In this application, a user can register and create an Account. With the Account, the user can book someCarRides.

This is how our Account looks like:

use Doctrine\ORM\Mapping\Column;
use Doctrine\ORM\Mapping\Entity;
use Doctrine\ORM\Mapping\Id;

#[Entity]
class Account
{
    #[Id]
    #[Column('uuid')]
    private AccountId $id;

    #[Column('string')]
    private string $name;
}

And this is how our basic aggregate looks like:

use Patchlevel\EventSourcing\Aggregate\BasicAggregateRoot;
use Patchlevel\EventSourcing\Attribute\Aggregate;
use Patchlevel\EventSourcing\Attribute\Id;

#[Aggregate('car_ride')]
final class CarRide extends BasicAggregateRoot
{
    #[Id]
    private CarRideId $id;
    private CarId $carId; // a reference to the car which is booked for the ride
}

1. Use the entity in the aggregate

In the first approach, we try to make it as simple as possible. We just put the Account into the CarRide and its events.

#[Aggregate('car_ride')]
final class CarRide extends BasicAggregateRoot
{
    #[Id]
    private CarRideId $id;
    private CarId $carId;
    private Account $account;

    public static function book(CarRideId $rideId, CarId $carId, Account $account): self
    {
        $self = new self();
        $self->recordThat(new RideBooked($rideId, $carId, $account));

        return $self;
    }
}
#[Event('car_ride.booked')]
final class RideBooked
{
    public function __construct(
        public readonly Account $account
    ) {
    }
}

But now, if we try to save the aggregate, it will crash. This is due to the event not being serializable with the complete Account in it. To fix this, we can create a Normalizer that serializes the object when saving and deserializes it when we load the aggregate with the events. We can then add the Normalizer via an attribute to the event.

use Attribute;
use Patchlevel\Hydrator\Normalizer\Normalizer;

#[Attribute(Attribute::TARGET_PROPERTY)]
final class AccountNormalizer implements Normalizer
{
    public function __construct(
        private readonly AccountRepository $accountRepository
    ) {
    }

    public function normalize(mixed $value): string|null
    {
        if ($value === null) {
            return null;
        }

        return $account->getId()->toString();
    }

    public function denormalize(mixed $value): Account|null
    {
        if ($value === null) {
            return null;
        }

        return $this->accountRepository->get(AccountId::fromString($value));
    }
}
#[Event('car_ride.booked')]
final class RideBooked
{
    public function __construct(
        #[AccountNormalizer]
        public readonly Account $account
    ) {
    }
}

Now we can save and load the aggregate without any problems, right? Well, to be fair, this is not easily possible with our event-sourcing library, since we don't support Dependency Injection for Normalizers. We think it is good that this is not easily possible; naturally, the next question is: why, though? We believe that this would open up a can of worms for the misuse of normalizers. The Normalizer should only transform values into a savable format and vice versa. Therefore, extra services shouldn't be necessary.

Besides our limitation here, there are other points against it. First, it's too heavy for the normalization process. This process should be as fast as possible since it will always be triggered for each event when loading the aggregate from the database. Calling extra services that can perform calculations, or in this case, communicate with external services like a database or API, kills performance.

Second, events are facts - this means everything in them has already happened and cannot be changed afterward. Since the entity can change at any time, which will reflect in our events, this will break our contract with them. That's why this approach is, from our point of view, not suitable. So let's dive into the next one.

2. Use the entity id

Instead of trying to put the whole Account into the aggregate, we will just put the AccountId in there. We only want an association between the Account and the booked CarRide. Let's update our code!

#[Aggregate('car_ride')]
final class CarRide extends BasicAggregateRoot
{
    #[Id]
    private CarRideId $id;
    private CarId $carId;
    private AccountId $accountId;

    public static function book(CarRideId $rideId, CarId $carId, Account $account): self
    {
        $self = new self();
        $self->recordThat(new RideBooked($rideId, $carId, $accountId->getId()));

        return $self;
    }
}

Now we will not encounter the problem we described earlier since we are not saving the whole Account, but instead only the unique identifier. The custom Normalizer now only needs to cast this object to a string and back again into the object, no database is involved whatsoever.

But what if we need the name from the Account for some business logic? Well, since we have the AccountId, we can get all the data we need to execute our logic on that, and it will always be up to date. We only need to retrieve the Account via the AccountRepository. This would look like this:

final class SomeBusinessLogicHandler
{
    public function __construct(
        private RideRepository $rideRepository,
        private AccountRepository $accountRepository,
    ) {
    }

    public function __invoke(RideId $rideId): void
    {
        $ride = $this->rideRepository->get($rideId);
        $account = $this->accountRepository->get($ride->accountId());

        $name = $account->getName(); // do something with it.
    }
}

So, as soon as the update occurs in the ORM space, the Account will always have the new data in our handlers.

What about the projections? Well, if you need the data, we would recommend putting the data together as soon as you read it out. You could join the tables together if you are using the same database/connection for the projection and the ORM.

3. Copy the needed data

In the second approach, we saved only the identifier. Now, in the third approach, we are taking a different route. We will copy all relevant data from the Account into the CarRide that we need to perform this ride. To simplify the example, all we need is the name of the Account.

#[Aggregate('car_ride')]
final class CarRide extends BasicAggregateRoot
{
    #[Id]
    private CarRideId $id;
    private CarId $carId;
    private string $name;

    public static function book(CarRideId $rideId, CarId $carId, Account $account): self
    {
        $self = new self();
        $self->recordThat(new RideBooked($rideId, $carId, $account->getName()));

        return $self;
    }
}

And that's it - no Normalizer in this case, no casting, no headache, right? Compared to the second approach, we don't have the most up-to-date data here. This is because the name is now in the event and will not be updated if the Account name changes. For that, we would need an extra event, like NameChanged, for example.

This might seem like a downside compared to the second approach, but it can also be a significant upside. There are cases where we strictly want this behavior. For example, if an application reflects contracts, we don't want to update names or any other information on a signed contract afterward. Additionally, our use case here in the article would be better with the third approach since updating the driver's name afterward does not make much sense, from my point of view. But as always, it depends on the business and what is truly needed.

3.5 Update the copied data

But what if we used this approach but want to update the data? How can we "fix" this? There are possibilities to update the data, but they also come with their own quirks and risks. We could create a new event for the CarRide aggregate that we could dispatch as soon as the name changes for the Account. Let's try it out:

#[Event('car_ride.account_name_changed')]
final class AccountNameChanged
{
    public function __construct(
        public readonly string $name
    ) {
    }
}

This is how the event could look. Next, we will discuss the aggregate to dispatch the event and consume it.

#[Aggregate('car_ride')]
final class CarRide extends BasicAggregateRoot
{
    #[Id]
    private CarRideId $id;
    private string $name;

    public function changeAccountName(string $name): void
    {
        $self->recordThat(new AccountNameChanged($name));
    }

    #[Apply(AccountNameChanged::class)]
    public function applyAccountNameChanged(AccountNameChanged $event): void
    {
        $self->name = $event->name;
    }
}

Now we also need a method on the Account to change the name.

use Doctrine\ORM\Mapping\Column;
use Doctrine\ORM\Mapping\Entity;

#[Entity]
class Account
{
    #[Column('string')]
    private string $name;

    public function changeName(string $newName): void
    {
        $this->name = $newName;
    }
}

We now have the event for the name change, and the Account is able to change the name. What is missing is that the event is dispatched as soon as the name is changed. But where do we do this? It would be best if we only had one place to handle this logic, where we can access all CarRide aggregates associated with the Account. Let's provide a service that handles this in the Account::changeName method.

use Doctrine\ORM\Mapping\Column;
use Doctrine\ORM\Mapping\Entity;

#[Entity]
class Account
{
    #[Column('string')]
    private string $name;

    public function changeName(string $newName, UpdateRidesWithNewAccountName $updateService): void
    {
        $oldName = $this->name;
        $this->name = $newName;
        // warning: this is simplified! If an exception will be thrown here the data consistency will break
        $updateService($oldName, $newName);
    }
}

And the service could look like this:

final class UpdateRidesWithNewAccountName
{
    public function __construct(
        private CarRideProjection $carRideProjection,
        private CarRideRepository $carRideRepository,
    ) {
    }

    public function __invoke(string $oldName, string $newName)
    {
        // find rides by name based on a projection
        $rides = $this->carRideProjection->getRidesByAccountName($oldName);

        foreach ($rides as $rideData) {
            // get the Ride Aggregate
            $ride = $this->carRideRepository->get($rideData->getId());

            $ride->updateAccountName($newName); // dispatches the AccountsNameChanged event
            $this->rideRepository->store($ride);
        }
    }
}

So what is happening here in the service? First, since we cannot perform complex queries directly on the aggregate repository, we need a projection that can provide us with the relevant data to filter and later retrieve the aggregate. In this case, we are using the name to query for the rides for simplification reasons. You should use some unique data for querying. After obtaining our aggregate, we can perform our actions on it, such as changing the name. Then our event will be dispatched to also change the name in our aggregates.

With that, the Account::changeName method will call the UpdateRidesWithNewAccountName service, which will be responsible for querying the correct rides and calling Ride::changeAccountName, which then dispatches our AccountNameChanged event.

This works, but it can get really cumbersome and slow if we have a lot of data changes in the ORM entities that we need to reflect in our aggregates. We also need to make sure to save the changes in both places to keep them in sync. Now, coming to the problem: If the service throws an exception, our system will have inconsistent data. Why is that? Since we already partially updated the rides - okay, for that, we could wrap it in a transaction. But what if the ORM crashes when updating the dataset? Then we will have inconsistencies again, but we can wrap everything in a transaction, yes. However, this is only possible if both the ORM and the event store are using the same connection. If this precondition is not met, we need to be really careful about that.

Conclusion

The second and third approaches discussed in the article are both valid options. It depends on the concrete use case which of them you should use. If you only want to link an entity to the aggregate and always want the newest data, you should probably go for the second solution. If you want the historical aspect and don't want to auto-update the data, you should try the third one.

In the mentioned question on GitHub, the first solution was originally taken. To be exact, the aggregate was only a historical representation of the entity. We would not recommend this hybrid approach, as this can lead to a lot of duplicated code, data inconsistency, and weird behavior. Instead, if you need historical data and do not want to track this with a big overhead with the ORM-based approach, then migrate this ORM-based entity into an event sourcing-based aggregate. This then comes with all the pros and cons of event sourcing - yes, event sourcing also has its downsides - just like everything else in life.

As an example, handling personal data is more complex. Since all the data are in an immutable stream and cannot be easily deleted, our name in this example would also be such a case. If Account were an event sourcing-based aggregate, then you would need a plan on how to remove/anonymize this data. But don't worry, we got you covered on that topic! We will also discuss this soon here.

Other Recent Posts

RSS

The Performance Factor in Event Sourcing: What You Need to Know

This article addresses the common concern regarding the performance of event sourcing, particularly the speed at which long-living aggregates with many events are loaded. It explores solutions such as snapshotting and stream splitting to optimize aggregate loading. Furthermore, projections allow for the creation of highly flexible and optimized read models, each can be tailored to specific needs.

Daniel Badura
Daniel Badura
Software Entwickler

What is new in patchlevel/event-sourcing in version 3.7

We’re excited to announce the release of our php library patchlevel/event-sourcing version 3.7.0. This release features better testing capabilities with InMemorySubscriptionStore::clear, improved subscription performance and a new #[Stream] attribute for micro aggregates!

Daniel Badura
Daniel Badura
Software Entwickler

What is new in php event sourcing 3.6

We are happy to announce the release of the php event sourcing library in version 3.6.0. This release contains several exciting new features like Pipe and Reducer. In this blog post, we will provide you with an overview of the changes.

David Badura
David Badura
Software Entwickler