Event Sourcing with Symfony

Today we want to show you how to use the patchlevel event sourcing library with Symfony. We will guide you through the installation and show you how to use the bundle in a small example. If you follow the introduction, you will have a working hotel REST-API that you can use and extend as you like.

For this, we assume some system requirements in our tutorial. You can of course have a different setup, but then you have to make sure that Symfony and a database are running. The requirements are nothing special, here is the list:

We use Symfony CLI to create a new Symfony project, have php and composer installed and to run the local web server. Docker is used to start the database for the event store and projections.

With the basics clarified, let’s dive in and get started.

Setup New Symfony Project

Before we can start with event sourcing, we need to set up a Symfony project. If you already have a new Symfony project, you can skip this chapter.

We follow the Symfony Setup Guide to install Symfony. If something is unclear about the installation, please refer to the Symfony documentation.

Okay, let's start by creating a new Symfony project. With this both cli commands, we create a new project and navigate into the project folder.

symfony new event-sourcing-symfony
cd event-sourcing-symfony

Once this is done, we quickly test if everything is working. For this, we start the local web server with the following command:

symfony server:start

Then open your browser and navigate to http://localhost:8000/. If everything is working, you'll see a welcome page. So far so good, now we can start with the installation of the event sourcing bundle.

Install Event-Sourcing Bundle

The patchlevel event sourcing bundle is a symfony bundle that integrates the event sourcing library into symfony. All services are put together correctly for you, so you can start directly with the development. We can install the event sourcing bundle with composer by running the following command:

composer require patchlevel/event-sourcing-bundle

If you are using our symfony flex recipe, you don't need to do anything else. The bundle is automatically configured and ready to use. If you are not using the flex recipe, then you have to do some manual configuration. For this, please follow the manual bundle configuration documentation.

Database setup

After the installation of the event sourcing bundle, we need to set up our database. As mentioned at the beginning, we use docker to start the database. Our flex recipe provides a docker compose file that we can use to start the database. So, we need only to execute the following command:

docker compose up -d

After the database is running, we can create the database schema. Best practice is to use the doctrine migration system. The first command creates the migration file and the second command executes the migration.

symfony console event-sourcing:migration:diff
symfony console event-sourcing:migration:migrate

So far so good, we have set up the database and can start with the planning of our system.

Planning

Before we start with the development, we need to define our business case. We want to create a small hotel system where guests can check in and out. We also want to see which guests are currently in the hotel and be notified when a new guest checks in. And finally, the whole thing should be a REST API.

Developing

Okay, let's start with the development. We need to define some events, aggregates and projections. And we also need a controller to interact with the system.

Define some events

First we define the events that happen in our system. We build simple readonly classes in past tense, mark them with the #[Event] attribute and give them a unique name.

A hotel can be created with a hotelName and an id:

namespace App\Event;

use Patchlevel\EventSourcing\Aggregate\Uuid;
use Patchlevel\EventSourcing\Attribute\Event;

#[Event('hotel.created')]
final class HotelCreated
{
    public function __construct(
        public readonly Uuid $hotelId,
        public readonly string $hotelName,
    ) {
    }
}

A guest can check in by guestName:

namespace App\Event;

use Patchlevel\EventSourcing\Aggregate\Uuid;
use Patchlevel\EventSourcing\Attribute\Event;

#[Event('hotel.guest_is_checked_in')]
final class GuestIsCheckedIn
{
    public function __construct(
        public readonly Uuid $hotelId,
        public readonly string $guestName,
    ) {
    }
}

And also check out again, which is also done by guestName:

namespace App\Event;

use Patchlevel\EventSourcing\Aggregate\Uuid;
use Patchlevel\EventSourcing\Attribute\Event;

#[Event('hotel.guest_is_checked_out')]
final class GuestIsCheckedOut
{
    public function __construct(
        public readonly Uuid $hotelId,
        public readonly string $guestName,
    ) {
    }
}

That's it for the events. This is the foundation of our system.

Define aggregates

Next we need to define an aggregate. Aggregates allow us to ensure that the business rules are followed. In our case, we don't want a guest to check in twice or check out without checking in first etc. In order for event sourcing aggregates to be able to take on this task, they have an internal current state that can be built up again and again from the events. They are also the "root" through which all interactions must pass and are responsible for generating new events.

So let's create a hotel aggregate now. We need to extend the BasicAggregateRoot class and mark the class with the #[Aggregate] attribute with a unique name. Each aggregate needs an aggregate id, so we add a property with the #[Id] attribute.

namespace App\Aggregate;

use Patchlevel\EventSourcing\Aggregate\BasicAggregateRoot;
use Patchlevel\EventSourcing\Aggregate\Uuid;
use Patchlevel\EventSourcing\Attribute\Aggregate;
use Patchlevel\EventSourcing\Attribute\Id;

#[Aggregate(name: 'hotel')]
final class Hotel extends BasicAggregateRoot
{
    #[Id]
    private Uuid $id;
}

This is our basic structure for an aggregate. In the next steps we will add more and more parts to the class. For the sake of clarity, we will not always show all previously added code blocks again. But if you merge the code before with the next one, then you have a working aggregate.

No we need a static method to create a new hotel, because we can't use the constructor directly. In this method we create a new instance of the aggregate and record the event HotelCreated.

namespace App\Aggregate;

use App\Event\HotelCreated;
use Patchlevel\EventSourcing\Aggregate\BasicAggregateRoot;
use Patchlevel\EventSourcing\Aggregate\Uuid;
use Patchlevel\EventSourcing\Attribute\Aggregate;
use Patchlevel\EventSourcing\Attribute\Id;

#[Aggregate(name: 'hotel')]
final class Hotel extends BasicAggregateRoot
{
    #[Id]
    private Uuid $id;
    private string $name;

    /** @var list<string> */
    private array $guests;

    public static function create(Uuid $id, string $hotelName): self
    {
        $self = new self();
        $self->recordThat(new HotelCreated($id, $hotelName));

        return $self;
    }

    #[Apply]
    protected function applyHotelCreated(HotelCreated $event): void
    {
        $this->id = $event->hotelId;
        $this->name = $event->hotelName;
        $this->guests = [];
    }
}

So that the state in the aggregate can also be updated, we have added a so-called apply method and marked it with #[apply] attribute. The state of the aggregate can only be changed in these Apply methods. We have done that here. We have set the ID, name and an empty guest list.

Now we need to add the methods to check in and out guests. In these methods we need check our defined business rules. If the rules are not followed, we throw an exception, otherwise we record the event.

namespace App\Aggregate;

use App\Event\GuestIsCheckedIn;
use App\Event\GuestIsCheckedOut;
use Patchlevel\EventSourcing\Aggregate\BasicAggregateRoot;
use Patchlevel\EventSourcing\Aggregate\Uuid;
use Patchlevel\EventSourcing\Attribute\Aggregate;
use Patchlevel\EventSourcing\Attribute\Apply;
use Patchlevel\EventSourcing\Attribute\Id;
use RuntimeException;

use function array_filter;
use function array_values;
use function in_array;
use function sprintf;

#[Aggregate(name: 'hotel')]
final class Hotel extends BasicAggregateRoot
{
    #[Id]
    private Uuid $id;
    private string $name;

    /** @var list<string> */
    private array $guests;

    // ############################
    // # create aggregate methods #
    // ############################

    public function checkIn(string $guestName): void
    {
        if (in_array($guestName, $this->guests, true)) {
            throw new RuntimeException(
                sprintf('Guest "%s" is already checked in.', $guestName),
            );
        }

        $this->recordThat(new GuestIsCheckedIn($this->id, $guestName));
    }

    public function checkOut(string $guestName): void
    {
        if (!in_array($guestName, $this->guests, true)) {
            throw new RuntimeException(
                sprintf('Guest "%s" is not checked in.', $guestName),
            );
        }

        $this->recordThat(new GuestIsCheckedOut($this->id, $guestName));
    }

    #[Apply]
    protected function applyGuestIsCheckedIn(GuestIsCheckedIn $event): void
    {
        $this->guests[] = $event->guestName;
    }

    #[Apply]
    protected function applyGuestIsCheckedOut(GuestIsCheckedOut $event): void
    {
        $this->guests = array_values(
            array_filter(
                $this->guests,
                static fn ($name) => $name !== $event->guestName,
            ),
        );
    }
}

Here as well, we include the appropriate apply methods to directly update the aggregate's state based on the events. In these methods, there is no further check for correctness; this must happen before the event is recorded. Everything from here on has happened and cannot be changed.

That's it for the aggregate let's create a controller to interact with the system.

Create and Update with a Controller

There we are, we can now interact with the aggregate. To do this, we create a Symfony controller. As already mentioned, a UI would go beyond the scope of the tutorial, so we limit ourselves to offering a stripped down and simple Rest API.

We need only inject our hotel repository. If you are using autowiring, the right repository will be injected automatically, if the parameter name start with the aggregate name and ends with Repository.

Everything else is standard Symfony development.

namespace App\Controller;

use App\Aggregate\Hotel;
use Patchlevel\EventSourcing\Aggregate\Uuid;
use Patchlevel\EventSourcing\Repository\Repository;
use Symfony\Component\HttpFoundation\JsonResponse;
use Symfony\Component\HttpFoundation\Request;
use Symfony\Component\HttpKernel\Attribute\AsController;
use Symfony\Component\Routing\Annotation\Route;

#[AsController]
final class HotelController
{
    /**
     * @param Repository<Hotel> $hotelRepository
     */
    public function __construct(
        private readonly Repository $hotelRepository,
    ) {
    }

    #[Route('/create', methods:['POST'])]
    public function createAction(Request $request): JsonResponse
    {
        $hotelName = $request->getPayload()->get('name'); // need validation!
        $id = Uuid::generate();

        $hotel = Hotel::create($id, $hotelName);
        $this->hotelRepository->save($hotel);

        return new JsonResponse(['id' => $id->toString()]);
    }

    #[Route('/{hotelId}/check-in', methods:['POST'])]
    public function checkInAction(Uuid $hotelId, Request $request): JsonResponse
    {
        $guestName = $request->getPayload()->get('name'); // need validation!

        $hotel = $this->hotelRepository->load($hotelId);
        $hotel->checkIn($guestName);
        $this->hotelRepository->save($hotel);

        return new JsonResponse();
    }

    #[Route('/{hotelId}/check-out', methods:['POST'])]
    public function checkOutAction(Uuid $hotelId, Request $request): JsonResponse
    {
        $guestName = $request->getPayload()->get('name'); // need validation!

        $hotel = $this->hotelRepository->load($hotelId);
        $hotel->checkOut($guestName);
        $this->hotelRepository->save($hotel);

        return new JsonResponse();
    }
}

This is a very simple controller that allows us to create a hotel and check in and out guests. Now we can test our system. Here are some curl commands. For some commands you have to replace the placeholders in curly brackets.

# create hotel, response is the id of the hotel
curl -X POST http://localhost:8000/create -d '{"name":"Hotel California"}' -H "Content-Type: application/json"
export hotelId= # uuid from the response

# check in a guest
curl -X POST http://localhost:8000/$hotelId/check-in -d '{"name":"John Doe"}' -H "Content-Type: application/json"

# check out a guest
curl -X POST http://localhost:8000/$hotelId/check-out -d '{"name":"John Doe"}' -H "Content-Type: application/json"

Great, we can now create a hotel and let guests check in and out. But we don't know how which guests are currently in the hotel or any history of past guests. For that we need a projection.

Projections and read models

To create a read model in event sourcing, we need a projection. This projection listens to the events and updates the read model accordingly. The read model can take various forms, such as a database, a cache, a file, or similar. We use in our tutorial the same database as the event store.

We create a class and mark it with the #[Projector] attribute. In this class we need a method for each event we want to listen to. The method must be marked with the #[Subscribe] attribute and have the event class as a parameter. We can then define different parameters in these methods, such as the event or the ID, by using the type hint. Everything else takes over the subscription engine.

But in order to be able to write something into the database, we need a table. The subscription engine can manage this for us. All we need to do is define #[Setup] and #[Teardown] methods and create or remove the table there.

namespace App\Subscriber;

use App\Event\GuestIsCheckedIn;
use App\Event\GuestIsCheckedOut;
use DateTimeImmutable;
use Doctrine\DBAL\Connection;
use Patchlevel\EventSourcing\Aggregate\Uuid;
use Patchlevel\EventSourcing\Attribute\Projector;
use Patchlevel\EventSourcing\Attribute\Setup;
use Patchlevel\EventSourcing\Attribute\Subscribe;
use Patchlevel\EventSourcing\Attribute\Teardown;
use Patchlevel\EventSourcing\Subscription\Subscriber\SubscriberUtil;

/**
 * @psalm-type GuestData = array{
 *     guest_name: string,
 *     hotel_id: string,
 *     check_in_date: string,
 *     check_out_date: string|null
 * }
 */
#[Projector('guests')]
final class GuestProjection
{
    use SubscriberUtil;

    public function __construct(
        private Connection $db
    ) {
    }

    /** @return list<GuestData> */
    public function findGuestsByHotelId(Uuid $hotelId): array
    {
        return $this->db->createQueryBuilder()
            ->select('*')
            ->from($this->table())
            ->where('hotel_id = :hotel_id')
            ->setParameter('hotel_id', $hotelId->toString())
            ->fetchAllAssociative();
    }

    #[Subscribe(GuestIsCheckedIn::class)]
    public function onGuestIsCheckedIn(
        GuestIsCheckedIn $event,
        DateTimeImmutable $recordedOn
    ): void {
        $this->db->insert(
            $this->table(),
            [
                'hotel_id' => $event->hotelId->toString(),
                'guest_name' => $event->guestName,
                'check_in_date' => $recordedOn->format('Y-m-d H:i:s'),
                'check_out_date' => null,
            ]
        );
    }

    #[Subscribe(GuestIsCheckedOut::class)]
    public function onGuestIsCheckedOut(
        GuestIsCheckedOut $event,
        DateTimeImmutable $recordedOn
    ): void {
        $this->db->update(
            $this->table(),
            [
                'check_out_date' => $recordedOn->format('Y-m-d H:i:s'),
            ],
            [
                'hotel_id' => $event->hotelId->toString(),
                'guest_name' => $event->guestName,
                'check_out_date' => null,
            ]
        );
    }

    #[Setup]
    public function create(): void
    {
        $this->db->executeStatement(
            "CREATE TABLE {$this->table()} (
                hotel_id VARCHAR(36) NOT NULL,
                guest_name VARCHAR(255) NOT NULL,
                check_in_date TIMESTAMP NOT NULL,
                check_out_date TIMESTAMP NULL
            );"
        );
    }

    #[Teardown]
    public function drop(): void
    {
        $this->db->executeStatement("DROP TABLE IF EXISTS {$this->table()};");
    }

    private function table(): string
    {
        return 'projection_' . $this->subscriberId();
    }
}

This is our projection. We store the guests along with the hotel ID, guest name, check-in date, and check-out date. Additionally, we have defined a method to retrieve guests based on their hotel ID.

Now we just need to add a symfony action so that we can read the data. To do this, we inject our projection and get the data.

namespace App\Controller;

use App\Aggregate\Hotel;
use App\Subscriber\GuestProjection;
use Patchlevel\EventSourcing\Repository\Repository;
use Symfony\Component\HttpFoundation\JsonResponse;
use Symfony\Component\HttpKernel\Attribute\AsController;
use Symfony\Component\Routing\Annotation\Route;

#[AsController]
final class HotelController
{
    /**
     * @param Repository<Hotel> $hotelRepository
     */
    public function __construct(
        private readonly Repository $hotelRepository,
        private readonly GuestProjection $guestProjection,
    ) {
    }

    #[Route('/{hotelId}/guests', methods:['GET'])]
    public function hotelGuestsAction(Uuid $hotelId): JsonResponse
    {
        return new JsonResponse(
            $this->guestProjection->findGuestsByHotelId($hotelId),
        );
    }

    // ######################################
    // # previous create and update actions #
    // ######################################
}

That's it! We can now display all guests for a hotel, including those currently checked in and those who have checked out. Here’s an example curl command for this:

# list hotels
curl -X GET http://localhost:8000/$hotelId/guests

Send an email with a processor

The only requirement missing is to send an email when a new guest has checked in. For this we will now write a processor. It basically works in exactly the same way as a projection and also uses the subscription engine. However, it doesn't account past events and only works from now. This is for usecases like emails useful, so that user don't get spammed with same emails over and over again. Therefore we need to run it in a different mode then the projections, which we can by the #[Processor] attribute instead.

But first we need to install the symfony/mailer, which we want to use to send mail. We do this quickly with composer.

composer require symfony/mailer

If you have allowed this component to modify the docker compose.yaml, then you can start a dev mail server. To do this, call the following again:

docker compose up -d

But let's write our processor first. We inject the mailer here and send out an email directly. We don't have to do anything else.

namespace App\Subscriber\Processor;

use App\Event\GuestIsCheckedIn;
use Patchlevel\EventSourcing\Attribute\Processor;
use Patchlevel\EventSourcing\Attribute\Subscribe;
use Symfony\Component\Mailer\MailerInterface;
use Symfony\Component\Mime\Email;

use function sprintf;

#[Processor('admin_emails')]
final class SendCheckInEmailProcessor
{
    public function __construct(
        private readonly MailerInterface $mailer,
    ) {
    }

    #[Subscribe(GuestIsCheckedIn::class)]
    public function onGuestIsCheckedIn(GuestIsCheckedIn $event): void
    {
        $email = (new Email())
            ->from('[email protected]')
            ->to('[email protected]')
            ->subject('Guest is checked in')
            ->text(sprintf('A new guest named "%s" is checked in', $event->guestName));

        $this->mailer->send($email);
    }
}

If you now open the web-interface of the mail server with following command:

symfony open:local:webmail

Then you will see that you have not yet received any emails from the check-ins from the past. That is correct and intended. We want only to send emails for new check-ins. In order for us to see an email now, we need to trigger a check in. Once you've done that, you should now see an email there.

Finish Line

That's it. We have created a small hotel system with symfony and event sourcing. We have seen how to create aggregates, projections, processors and how to interact with them. We have also seen how to use the symfony mailer to send emails.

Feel free to develop this application further or use it as a starting point for your own project. You can also find more information on our detailed documentation pages for event sourcing and event sourcing bundle. Or browse our blog to discover other interesting topics about event sourcing.

Other Recent Posts

RSS

The Performance Factor in Event Sourcing: What You Need to Know

This article addresses the common concern regarding the performance of event sourcing, particularly the speed at which long-living aggregates with many events are loaded. It explores solutions such as snapshotting and stream splitting to optimize aggregate loading. Furthermore, projections allow for the creation of highly flexible and optimized read models, each can be tailored to specific needs.

Daniel Badura
Daniel Badura
Software Entwickler

What is new in patchlevel/event-sourcing in version 3.7

We’re excited to announce the release of our php library patchlevel/event-sourcing version 3.7.0. This release features better testing capabilities with InMemorySubscriptionStore::clear, improved subscription performance and a new #[Stream] attribute for micro aggregates!

Daniel Badura
Daniel Badura
Software Entwickler

What is new in php event sourcing 3.6

We are happy to announce the release of the php event sourcing library in version 3.6.0. This release contains several exciting new features like Pipe and Reducer. In this blog post, we will provide you with an overview of the changes.

David Badura
David Badura
Software Entwickler