We are happy to announce the release of our php library patchlevel/event-sourcing
in version 3.6.0.
This release contains several exciting new features like Pipe and Reducer.
In this blog post, we will provide you with an overview of the changes.
Pipe
First and foremost, we’d like to introduce you to a new feature: the Pipe.
The Pipe is a new concept designed for transforming events.
While we already have transformers for messages, you had to manually iterate over them and handle the processing yourself.
Now, we’ve bridged that gap by introducing a class that takes care of this for you.
Here’s how it works:
use Patchlevel\EventSourcing\Message\Pipe;
use Patchlevel\EventSourcing\Message\Translator\ExcludeEventTranslator;
use Patchlevel\EventSourcing\Message\Translator\RecalculatePlayheadTranslator;
$messages = new Pipe(
$messages,
new ExcludeEventTranslator([ProfileCreated::class]),
new RecalculatePlayheadTranslator(),
);
foreach ($messages as $message) {
}
You can pass any iterator containing messages, such as the result from the Event Store, attach a few transformers, and then iterate over it.
The transformers are applied to each message as you iterate.
Here’s an example of how you can use the Pipe to migrate your Event Store to the new experimental Stream Store using the Subscription Engine:
use Patchlevel\EventSourcing\Attribute\Setup;
use Patchlevel\EventSourcing\Attribute\Subscribe;
use Patchlevel\EventSourcing\Attribute\Subscriber;
use Patchlevel\EventSourcing\Attribute\Teardown;
use Patchlevel\EventSourcing\Message\Message;
use Patchlevel\EventSourcing\Message\Pipe;
use Patchlevel\EventSourcing\Message\Translator\AggregateToStreamHeaderTranslator;
use Patchlevel\EventSourcing\Message\Translator\Translator;
use Patchlevel\EventSourcing\Schema\ChainDoctrineSchemaConfigurator;
use Patchlevel\EventSourcing\Schema\DoctrineSchemaDirector;
use Patchlevel\EventSourcing\Schema\SchemaDirector;
use Patchlevel\EventSourcing\Store\StreamDoctrineDbalStore;
use Patchlevel\EventSourcing\Subscription\RunMode;
use Patchlevel\EventSourcing\Subscription\Subscriber\BatchableSubscriber;
use function count;
#[Subscriber('migrate', RunMode::Once)]
final class MigrateAggregateToStreamStoreSubscriber implements BatchableSubscriber
{
private readonly SchemaDirector $schemaDirector;
private array $messages = [];
private readonly array $middlewares;
public function __construct(
private readonly StreamDoctrineDbalStore $targetStore,
) {
$this->schemaDirector = new DoctrineSchemaDirector(
$targetStore->connection(),
new ChainDoctrineSchemaConfigurator([$targetStore]),
);
$this->middlewares = [new AggregateToStreamHeaderTranslator()];
}
#[Subscribe('*')]
public function handle(Message $message): void
{
$this->messages[] = $message;
}
public function beginBatch(): void
{
$this->messages = [];
}
public function commitBatch(): void
{
$pipeline = new Pipe($this->messages, ...$this->middlewares);
$this->messages = [];
$this->targetStore->save(...$pipeline);
}
public function rollbackBatch(): void
{
$this->messages = [];
}
public function forceCommit(): bool
{
return count($this->messages) >= 10_000;
}
#[Setup]
public function setup(): void
{
$this->schemaDirector->create();
}
#[Teardown]
public function teardown(): void
{
$this->schemaDirector->drop();
}
}
The Stream Store is still experimental and subject to change.
Use it cautiously and stay updated on any modifications.
Reducer
Next, we’d like to introduce another new feature: the Reducer.
The Reducer is a new concept designed to reduce messages or events into a single state.
It’s particularly useful for creating temporary projections or making decisions based on a specific state.
Here’s an example:
$state = (new Reducer())
->initState(['name' => 'unknown'])
->match([
ProfileCreated::class => static function (Message $message): array {
return ['name' => $message->event()->name];
},
NameChanged::class => static function (Message $message): array {
return ['name' => $message->event()->name];
},
])
->reduce($messages);
echo $state['name'];
In this example, we aim to determine the name of a profile.
We initialize the state with the value unknown
.
Then, we match the ProfileCreated
and NameChanged
events to update the name accordingly.
Once the Reducer has processed all the messages, we have the final state.
This feature works seamlessly in combination with the Pipe.
Filter Events in Subscription Engine
Furthermore, we have made several optimizations to the Subscription Engine.
The engine now terminates early if there are no more subscriptions to process.
This can happen, for example, when an error occurs.
In addition, we’ve introduced the ability to write custom Message Loaders.
We’ve also added two Message Loaders:
- The
StoreMessageLoader
, which serves as the default loader and maintains the existing behavior of the Subscription Engine.
- The new
EventFilteredStoreMessageLoader
, which analyzes the subscriptions to be processed, identifies their subscribed events, and filters accordingly. This can significantly improve performance under certain conditions, as not all events need to be loaded anymore—especially when subscriptions are distributed across multiple workers.
Here’s an example of how to use the EventFilteredStoreMessageLoader
:
use Patchlevel\EventSourcing\Metadata\Event\EventMetadataFactory;
use Patchlevel\EventSourcing\Subscription\Engine\EventFilteredStoreMessageLoader;
use Patchlevel\EventSourcing\Store\Store;
$messageLoader = new EventFilteredStoreMessageLoader(
$store,
$eventMetadataFactory,
$subscriberRepository,
);
Instead of passing the Event Store directly to the Subscription Engine, you can now provide a Message Loader:
use Patchlevel\EventSourcing\Subscription\Engine\DefaultSubscriptionEngine;
use Patchlevel\EventSourcing\Subscription\Engine\MessageLoader;
use Patchlevel\EventSourcing\Subscription\RetryStrategy\NoRetryStrategy;
use Patchlevel\EventSourcing\Subscription\Store\DoctrineSubscriptionStore;
use Patchlevel\EventSourcing\Subscription\Subscriber\MetadataSubscriberAccessorRepository;
$subscriptionEngine = new DefaultSubscriptionEngine(
$messageLoader,
$subscriptionStore,
$subscriberAccessorRepository,
$retryStrategy,
);
For more details, check out our documentation.
Stream Store
And finally, we are currently working on a new Event Store implementation called StreamDoctrineDbalStore
.
This store is experimental
and may still undergo breaking changes. In this version, we’ve made some minor updates.
The StreamHeader
message header has been split into StreamNameHeader
, PlayheadHeader
, and RecordedOnHeader
.
Additionally, a new header, EventId
, has been introduced, along with the corresponding field in the database.
This change allowed us to fix the archiving functionality.
There’s not much left before we can move this store out of its experimental phase.
We’ll keep you updated on our progress!