What is new in php event sourcing 3.6

We are happy to announce the release of our php library patchlevel/event-sourcing in version 3.6.0. This release contains several exciting new features like Pipe and Reducer. In this blog post, we will provide you with an overview of the changes.

Pipe

First and foremost, we’d like to introduce you to a new feature: the Pipe.
The Pipe is a new concept designed for transforming events. While we already have transformers for messages, you had to manually iterate over them and handle the processing yourself. Now, we’ve bridged that gap by introducing a class that takes care of this for you.

Here’s how it works:

use Patchlevel\EventSourcing\Message\Pipe;
use Patchlevel\EventSourcing\Message\Translator\ExcludeEventTranslator;
use Patchlevel\EventSourcing\Message\Translator\RecalculatePlayheadTranslator;

$messages = new Pipe(
    $messages,
    new ExcludeEventTranslator([ProfileCreated::class]),
    new RecalculatePlayheadTranslator(),
);

foreach ($messages as $message) {
    // do something with the message
}

You can pass any iterator containing messages, such as the result from the Event Store, attach a few transformers, and then iterate over it. The transformers are applied to each message as you iterate.

Here’s an example of how you can use the Pipe to migrate your Event Store to the new experimental Stream Store using the Subscription Engine:

use Patchlevel\EventSourcing\Attribute\Setup;
use Patchlevel\EventSourcing\Attribute\Subscribe;
use Patchlevel\EventSourcing\Attribute\Subscriber;
use Patchlevel\EventSourcing\Attribute\Teardown;
use Patchlevel\EventSourcing\Message\Message;
use Patchlevel\EventSourcing\Message\Pipe;
use Patchlevel\EventSourcing\Message\Translator\AggregateToStreamHeaderTranslator;
use Patchlevel\EventSourcing\Message\Translator\Translator;
use Patchlevel\EventSourcing\Schema\ChainDoctrineSchemaConfigurator;
use Patchlevel\EventSourcing\Schema\DoctrineSchemaDirector;
use Patchlevel\EventSourcing\Schema\SchemaDirector;
use Patchlevel\EventSourcing\Store\StreamDoctrineDbalStore;
use Patchlevel\EventSourcing\Subscription\RunMode;
use Patchlevel\EventSourcing\Subscription\Subscriber\BatchableSubscriber;

use function count;

#[Subscriber('migrate', RunMode::Once)]
final class MigrateAggregateToStreamStoreSubscriber implements BatchableSubscriber
{
    private readonly SchemaDirector $schemaDirector;

    /** @var list<Message> */
    private array $messages = [];

    /** @var list<Translator> */
    private readonly array $middlewares;

    public function __construct(
        private readonly StreamDoctrineDbalStore $targetStore,
    ) {
        $this->schemaDirector = new DoctrineSchemaDirector(
            $targetStore->connection(),
            new ChainDoctrineSchemaConfigurator([$targetStore]),
        );

        $this->middlewares = [new AggregateToStreamHeaderTranslator()];
    }

    #[Subscribe('*')]
    public function handle(Message $message): void
    {
        $this->messages[] = $message;
    }

    public function beginBatch(): void
    {
        $this->messages = [];
    }

    public function commitBatch(): void
    {
        $pipeline = new Pipe($this->messages, ...$this->middlewares);
        $this->messages = [];

        $this->targetStore->save(...$pipeline);
    }

    public function rollbackBatch(): void
    {
        $this->messages = [];
    }

    public function forceCommit(): bool
    {
        return count($this->messages) >= 10_000;
    }

    #[Setup]
    public function setup(): void
    {
        $this->schemaDirector->create();
    }

    #[Teardown]
    public function teardown(): void
    {
        $this->schemaDirector->drop();
    }
}

The Stream Store is still experimental and subject to change.
Use it cautiously and stay updated on any modifications.

Reducer

Next, we’d like to introduce another new feature: the Reducer.
The Reducer is a new concept designed to reduce messages or events into a single state.
It’s particularly useful for creating temporary projections or making decisions based on a specific state.

Here’s an example:

$state = (new Reducer())
    ->initState(['name' => 'unknown'])
    ->match([
        ProfileCreated::class => static function (Message $message): array {
            return ['name' => $message->event()->name];
        },
        NameChanged::class => static function (Message $message): array {
            return ['name' => $message->event()->name];
        },
    ])
    ->reduce($messages);

echo $state['name']; // 'John Doe'

In this example, we aim to determine the name of a profile.
We initialize the state with the value unknown.
Then, we match the ProfileCreated and NameChanged events to update the name accordingly.
Once the Reducer has processed all the messages, we have the final state.

This feature works seamlessly in combination with the Pipe.

Filter Events in Subscription Engine

Furthermore, we have made several optimizations to the Subscription Engine. The engine now terminates early if there are no more subscriptions to process. This can happen, for example, when an error occurs.

In addition, we’ve introduced the ability to write custom Message Loaders.
We’ve also added two Message Loaders:

  • The StoreMessageLoader, which serves as the default loader and maintains the existing behavior of the Subscription Engine.
  • The new EventFilteredStoreMessageLoader, which analyzes the subscriptions to be processed, identifies their subscribed events, and filters accordingly. This can significantly improve performance under certain conditions, as not all events need to be loaded anymore—especially when subscriptions are distributed across multiple workers.

Here’s an example of how to use the EventFilteredStoreMessageLoader:

use Patchlevel\EventSourcing\Metadata\Event\EventMetadataFactory;
use Patchlevel\EventSourcing\Subscription\Engine\EventFilteredStoreMessageLoader;
use Patchlevel\EventSourcing\Store\Store;

/**
 * @var Store $store
 * @var EventMetadataFactory $eventMetadataFactory
 * @var SubscriberRepository $subscriberRepository
 */
$messageLoader = new EventFilteredStoreMessageLoader(
    $store,
    $eventMetadataFactory,
    $subscriberRepository,
);

Instead of passing the Event Store directly to the Subscription Engine, you can now provide a Message Loader:

use Patchlevel\EventSourcing\Subscription\Engine\DefaultSubscriptionEngine;
use Patchlevel\EventSourcing\Subscription\Engine\MessageLoader;
use Patchlevel\EventSourcing\Subscription\RetryStrategy\NoRetryStrategy;
use Patchlevel\EventSourcing\Subscription\Store\DoctrineSubscriptionStore;
use Patchlevel\EventSourcing\Subscription\Subscriber\MetadataSubscriberAccessorRepository;

/**
 * @var MessageLoader $messageLoader
 * @var DoctrineSubscriptionStore $subscriptionStore
 * @var MetadataSubscriberAccessorRepository $subscriberAccessorRepository
 * @var NoRetryStrategy $retryStrategy
 */
$subscriptionEngine = new DefaultSubscriptionEngine(
    $messageLoader, // inject the message loader
    $subscriptionStore,
    $subscriberAccessorRepository,
    $retryStrategy,
);

For more details, check out our documentation.

Stream Store

And finally, we are currently working on a new Event Store implementation called StreamDoctrineDbalStore.
This store is experimental and may still undergo breaking changes. In this version, we’ve made some minor updates.

The StreamHeader message header has been split into StreamNameHeader, PlayheadHeader, and RecordedOnHeader.
Additionally, a new header, EventId, has been introduced, along with the corresponding field in the database. This change allowed us to fix the archiving functionality.

There’s not much left before we can move this store out of its experimental phase. We’ll keep you updated on our progress!

Other Recent Posts

RSS

The Performance Factor in Event Sourcing: What You Need to Know

This article addresses the common concern regarding the performance of event sourcing, particularly the speed at which long-living aggregates with many events are loaded. It explores solutions such as snapshotting and stream splitting to optimize aggregate loading. Furthermore, projections allow for the creation of highly flexible and optimized read models, each can be tailored to specific needs.

Daniel Badura
Daniel Badura
Software Entwickler

What is new in patchlevel/event-sourcing in version 3.7

We’re excited to announce the release of our php library patchlevel/event-sourcing version 3.7.0. This release features better testing capabilities with InMemorySubscriptionStore::clear, improved subscription performance and a new #[Stream] attribute for micro aggregates!

Daniel Badura
Daniel Badura
Software Entwickler

Event Sourcing with Symfony

Today we want to show you how to use the event sourcing library with Symfony. We will guide you through the installation, show you how to use the bundle in a small example and give you a working hotel REST-API.

David Badura
David Badura
Software Entwickler