Проектирование своего проекта по DDD последнее время становится всё более популярным. Сейчас не будем углубляться в данную методологию с её принципами, плюсами и минусами. Я хочу рассказать с какой проблемой столкнулась наша команда при использовании данной методологии на PHP, а именно внедрении Data Mapper’а Doctrine ORM.

Чтобы более понятно донести нашу проблему я буду использовать известный всем Агрегат Заказа (Order) и позиции заказа (OrderLine), которые являются коллекциями Dortrine ORM. Так же сильно упростим данный агрегат, чтобы фокусироваться на самой проблеме. И так начинаем!

Для начала мы создадим наш агрегат заказа:

<?php
declare(strict_types=1);

namespace App\Order;

use App\Shared\Domain\AggregateRoot;
use Doctrine\Common\Collections\Collection;
use Doctrine\ORM\Mapping as ORM;

/**
 * @ORM\Entity
 * @ORM\Table(name="orders")
 */
class Order extends AggregateRoot
{
    private const STATUS_NEW = 'new';
    private const STATUS_CLOSED = 'closed';

    /**
     * @ORM\Id
     * @ORM\Column(type="guid")
     */
    private string $id;
    /**
     * @ORM\Column(type="string")
     */
    private string $status;
    /**
     * @var OrderLine[]|Collection
     * @ORM\OneToMany(targetEntity="OrderLine", mappedBy="order", orphanRemoval=true, cascade={"persist"})
     */
    private Collection $items;

    public function __construct(string $id, Collection $lineItems)
    {
        if ($lineItems->isEmpty()) {
            throw new \DomainException('Позиции заказа отсутствуют.');
        }
        $this->id = $id;
        $this->items = $lineItems;
        $this->status = self::STATUS_NEW;
    }
  	
    public function close(): void
    {
        $this->status = self::STATUS_CLOSED;
    }

    public function addLine(Line $line): void
    {
        $this->items->add(new OrderLine($this, $line));
    }

    public function getId(): string
    {
        return $this->id;
    }

    public function getStatus(): string
    {
        return $this->status;
    }

    /**
     * @return OrderLine[]
     */
    public function getItems(): array
    {
        return $this->items->toArray();
    }
}

Затем создадим позиции заказа:

<?php
declare(strict_types=1);

namespace App\Order;

use Doctrine\ORM\Mapping as ORM;
use Ramsey\Uuid\Uuid;

/**
 * @ORM\Entity
 * @ORM\Table(name="order_lines")
 */
class OrderLine
{
    /**
     * @ORM\Id
     * @ORM\Column(type="guid")
     */
    private string $id;
    /**
     * @ORM\ManyToOne(targetEntity="Order", inversedBy="orderLines")
     * @ORM\JoinColumn(name="order_id")
     */
    private Order $order;
    /**
     * @ORM\Embedded(class="Line")
     */
    private Line $line;

    public function __construct(Order $order, Line $line)
    {
        $this->id = Uuid::uuid4()->toString();
        $this->order = $order;
        $this->line = $line;
    }

    public function getId(): string
    {
        return $this->id;
    }

    public function getOrder(): Order
    {
        return $this->order;
    }

    public function getLine(): Line
    {
        return $this->line;
    }
}

Для удобства работы с позицией заказа вынесли из него Line в Value Object:

<?php
declare(strict_types=1);

namespace App\Order;

use Doctrine\ORM\Mapping as ORM;

/**
 * @ORM\Embeddable
 */
final class Line
{
    /**
     * @var string
     *
     * @ORM\Column(type="string")
     */
    private $name;
    /**
     * @var int
     *
     * @ORM\Column(type="integer")
     */
    private $quantity;
    /**
     * @ORM\Column(type="float")
     */
    private $price;

    public function __construct(string $name, int $quantity, float $price)
    {
        $this->name = $name;
        $this->quantity = $quantity;
        $this->price = $price;
    }

    public function getQuantity(): int
    {
        return $this->quantity;
    }

    public function getName(): string
    {
        return $this->name;
    }

    public function getPrice(): float
    {
        return $this->price;
    }
}

Чтобы иметь возможность хранить во всех агрегатах возникшие события, а так же публиковать их — мы создали супер класс AggregateRoot:

<?php

declare(strict_types=1);

namespace Atiline\Shared\Domain;

use Doctrine\ORM\Mapping as ORM;

/**
 * @ORM\MappedSuperclass
 */
abstract class AggregateRoot
{
    /**
     * @var DomainEvent[]
     */
    private array $recordedEvents = [];

    abstract public function getId();

    protected function recordEvent(DomainEvent $event): void
    {
        $this->recordedEvents[] = $event;
    }

    public function releaseEvents(): array
    {
        $events = $this->recordedEvents;
        $this->recordedEvents = [];
        return $events;
    }
}

Напомню, что в качестве Data Mapper'а мы выбрали Doctrine ORM, а для маппинга сущности будем использовать Doctrine Annotations, поэтому сразу настроили для всех Entity и Value Object необходимые аннотации.

Проблема

Чтобы предотвратить возникновение конфликтов между параллельными бизнес-транзакциями путем обнаружения конфликта и отката транзакции мы хотим использовать оптимистическую блокировку. Doctrine предоставляет решение данной проблемы путём добавляения свойства $version и аннотации @ORM\Version

Для того чтобы не добавлять данное свойство в каждый агрегат мы можем вынести его в наш супер класс ArggregateRoot и пометить его аннотацией @ORM\MappedSuperclass чтобы Doctrine смогла распознать аннотации в наследуемом абстрактном классе.

<?php

declare(strict_types=1);

namespace App\Shared\Domain;

use Doctrine\ORM\Mapping as ORM;

/**
 * @ORM\MappedSuperclass
 */
abstract class AggregateRoot
{
    /**
     * @ORM\Version
     * @ORM\Column(type="integer")
     */
    protected int $version;
    
    //....
}

Теперь давайте проверим работу оптимистической блокировки. Для этого генерируем миграции и применим их. Затем создадим тестовый заказ, сохраним изменения в базе данных, а после получим из базы данных созданный заказ и попытаемся закрыть заказ.

<?php

final class OrderTestController extends AbstractController
{
    public function createAndClose(EntityManagerInterface $em): Response
    {
        // Создаём заказ
        $order = new Order('09597988-aa74-4bb3-9fb9-0e154dd7cdec');
        $em->persist($order);
        $em->flush();

        // Закзываем заказ
        $order = $em->getRepository(Order::class)->find('09597988-aa74-4bb3-9fb9-0e154dd7cdec');
        $order->close();
        $em->flush();
      
        return $this->json([], 201);
    }
}

После того как мы вызовем $order->close() наш заказ сменит статус на closed, а так же версия агрегата, которая записывается в поле version, увеличится на +1. Это значит, что оптимистическая блокировка будет работать.

Теперь давайте попробуем у данного агрегата добавить позицию заказа через метод $order->addLine($line). После вызова данного метода наш агрегат не увеличил версию в агрегате на +1, так как данная сущность-агрегат не изменилась. Следовательно мы встречаем проблему с работой оптимистической блокировки. Для изменений полей самого агрегата — она работает, но если изменяется сущность-коллекция внутри агрегата — блокировка перестаёт работать и версия не увеличивается на +1.

Для решение данной проблемы мы обратимся к документации Doctrine в надежде на то, что данная проблема возникала не только у нас и решение уже есть.

Почитав документацию, Doctrine предлагает пойти путём добавления агрегатного поля, которое будет изменяться при каждом изменении коллекции. Это может быть дата обновления, может общее количество позиций в заказе. Однако это не очень удобно по нескольким причинам:

  1. Всегда нужно придумывать какое-то поле для агрегации

  2. Есть риск забыть изменение агрегатного поля в каждом методе, изменяющим коллекцию из-за чего блокировка не сработает

  3. Нужно произвести много изменений, чтобы внедрить данный подход в существующий код

Сразу бы хотелось обозначить, что проблема не надуманная. Мы встретились с реальными ситуациями, когда событие возникало несколько раз.

Решение

Так как мы не хотим отказываться от уже созданной Doctrine оптимистической блокировки, то мы решили на основе этого сделать своё решение, которое имело бы данное агрегированное поле и менялось каждый раз при обновлении коллекции.

Для этого мы введём новый супер класс AggregateEntity, от которого всегда будем наследоваться нашими вложенными коллекциями. Пока что он будет пустым, но мы всегда можем добавить в него какую-то логику.

<?php
declare(strict_types=1);

namespace App\Shared\Domain;

use Doctrine\ORM\Mapping as ORM;

/**
 * @ORM\MappedSuperclass
 */
abstract class AggregateEntity
{
}

Чтобы иметь возможность определять к какому агрегату принадлежит вложенная сущность можно пойти несколькими путями, но мы решили пойти более гибким - через аннотацию.

Создадим свою аннотацию, в которой будем указывать метод, вызывающий агрегат:

<?php

declare(strict_types=1);

namespace App\Shared\Domain;

/**
 * @Annotation
 * @Target("CLASS")
 */
final class AggregateEntityAnnotation
{
    /**
     * Method for getting the Aggregated Root
     */
    public string $methodAggregateRoot;
}

И добавим данную аннотацию к нашей коллекции OrderLine, а также наследуем её от супер класса AggegateEntity :

<?php
declare(strict_types=1);


namespace App\Order;

use App\Shared\Domain\AggregateEntity;
use App\Shared\Domain\AggregateEntityAnnotation;
use Doctrine\ORM\Mapping as ORM;
use Ramsey\Uuid\Uuid;

/**
 * @ORM\Entity
 * @ORM\Table(name="order_lines")
 * @AggregateEntityAnnotation(methodAggregateRoot="getOrder")
 */
class OrderLine extends AggregateEntity
{
   	...
      
    /**
     * @ORM\ManyToOne(targetEntity="Order", inversedBy="items")
     * @ORM\JoinColumn(name="order_id")
     */
    private Order $order;
   
  	...

    public function getOrder(): Order
    {
        return $this->order;
    }
  	
  	...
}

После чего добавим наше агрегированное свойство $aggregateVersion в супер класс агрегата AggregateRoot и добавим метод updateAggregateVersion(), который будет изменять наше поле аналогично с полем version предоставляемой Doctrine. Таким образом у нас будет два изменяемых поля увеличивающееся на +1:

<?php

declare(strict_types=1);

namespace App\Shared\Domain;

use Doctrine\ORM\Mapping as ORM;

/**
 * @ORM\MappedSuperclass
 */
abstract class AggregateRoot extends AggregateEntity
{
    /**
     * @ORM\Column(type="integer", options={"default" : 1})
     */
    protected int $aggregateVersion = 1;
    
  	...
      
    public function updateAggregateVersion(): void
    {
        $this->aggregateVersion++;
    }
}

Нам остаётся придумать сервис, который будет изменять нашу версию. Займёмся его проектированием:

<?php

declare(strict_types=1);

namespace App\Shared\Doctrine;

use App\Shared\Domain\AggregateEntity;
use App\Shared\Domain\AggregateEntityAnnotation;
use App\Shared\Domain\AggregateRootEntity;
use Doctrine\Common\Annotations\Reader;
use Doctrine\ORM\EntityManagerInterface;
use LogicException;
use ReflectionClass;

final class DoctrineAggregateVersioning
{
    private Reader $reader;

    public function __construct(Reader $reader)
    {
        $this->reader = $reader;
    }

    public function update(EntityManagerInterface $em, AggregateEntity ...$entities): void
    {
        $aggregateRoot = $this->getAggregateRoot($entities);

        if (!$em->contains($aggregateRoot)) {
            $em->persist($aggregateRoot);
        }

      	//Изменяем версию агрегата
        $aggregateRoot->updateAggregateVersion();

        $uow = $em->getUnitOfWork();
        $classMetadata = $em->getClassMetadata(get_class($aggregateRoot));
      	//Применяем изменения
        $uow->recomputeSingleEntityChangeSet($classMetadata, $aggregateRoot);
    }

    /**
     * @param AggregateEntity[] $entities
     * @return AggregateRootEntity
     */
    private function getAggregateRoot(array $entities): AggregateRootEntity
    {
        $aggregateRoot = null;
  			
      	//Пытаемся получить сразу агрегат
        foreach ($entities as $entity) {
            if ($entity instanceof AggregateRootEntity) {
                $aggregateRoot = $entity;
            }
        }

      	//Если агрегат не нашёлся, то получаем его из аннотаций последней сущности
        if ($aggregateRoot === null) {
            $aggregateEntity = end($entities);
            $annotation = $this->getAggregateEntityAnnotation($aggregateEntity);

            if (!method_exists($aggregateEntity, $annotation->methodAggregateRoot)) {
                throw new LogicException(sprintf('Method "%s" not exists in class "%s".',
                    $annotation->methodAggregateRoot,
                    $aggregateEntity::class
                ));
            }

            $aggregateRoot = $aggregateEntity->{$annotation->methodAggregateRoot}();
        }

        return $aggregateRoot;
    }

    private function getAggregateEntityAnnotation(AggregateEntity $entity): AggregateEntityAnnotation
    {
        $reflectionClass = new ReflectionClass($entity);

        if (!$annotation = $this->reader->getClassAnnotation($reflectionClass, AggregateEntityAnnotation::class)) {
            throw new LogicException(sprintf('Aggregate "%s" must have AggregateEntityAnnotation!', $entity::class));
        }

        return $annotation;
    }
}

Теперь создадим подписчика, который будет вызывать созданный нами сервис перед тем как данные будут сохранены. Внести изменения мы можем только в момент события onFlush.

<?php

declare(strict_types=1);

namespace App\Infrastructure\Doctrine\Listener;

use App\Shared\Doctrine\DoctrineAggregateVersioning;
use App\Shared\Domain\AggregateEntity;
use Doctrine\Bundle\DoctrineBundle\EventSubscriber\EventSubscriberInterface;
use Doctrine\ORM\Event\OnFlushEventArgs;
use Doctrine\ORM\Events;

final class DoctrineAggregateVersioningSubscriber implements EventSubscriberInterface
{
    private DoctrineAggregateVersioning $aggregateVersioning;

    public function __construct(DoctrineAggregateVersioning $aggregateVersioning)
    {
        $this->aggregateVersioning = $aggregateVersioning;
    }

    public function getSubscribedEvents(): array
    {
        return [
            Events::onFlush,
        ];
    }

    public function onFlush(OnFlushEventArgs $args): void
    {
        $em = $args->getEntityManager();
        $uow = $em->getUnitOfWork();

        //Собираем все изменённые сущности
        $entities = array_merge(
            $uow->getScheduledEntityInsertions(),
            $uow->getScheduledEntityUpdates(),
            $uow->getScheduledEntityDeletions()
        );

    		//Получаем только агрегированные сущности
        $aggregateEntities = [];
        foreach ($entities as $entity) {
            if ($entity instanceof AggregateEntity) {
                $aggregateEntities[] = $entity;
            }
        }

        if (empty($aggregateEntities)) {
            return;
        }
      
				//Изменяем версию
        $this->aggregateVersioning->update($em, ...$aggregateEntities);
    }
}

Итоги

Для поиска решение данной проблемы мы потратили около 2-х дней, однако мы не смогли найти нужного нам решения, поэтому пришли к собственному. Конечно, мы понимаем, что данное решение не идеально и, в какой-то степени, «костыльное». Поэтому мы публикуем данное решение, в надежде придти к какому-то более лаконичному решению с помощью сообщества Хабр. Если данное решение окажется хорошим, то оно определённо поможет другим. А мы, в свою очередь готовы предоставить данное решение как публичную библиотеку.

Авторы:

Ворожцов Максим

PHP Developer

Захаров Илья

PHP Developer

Комментарии (9)


  1. laatoo
    21.11.2021 13:09
    +2

    По-моему OrderLine вообще не должен знать какому order'у он принадлежит (вложенные сущности не должны знать о том, куда их засунули — это не их дело). Завтра OrderLine окажется в другом агрегате, что будете делать?


    AggregateEntity не должно существовать.


    Проблема: когда вы меняете что-то в модели, доктрина об этом не догадывается


    Решение1: учить доктрину ловить ваши изменения (найти место, где она ищет изменения в модели, и расширить так, чтобы она учитывала те изменения, которые вам нужны).


    Я бы думал в сторону вычисления хэша агрегата по его содержимому (у вас наоборот: вы через аннотации учите вложенную сущность определять какому агрегату она принадлежит). Добавляете в супертип AggregateRoot метод getHash(). Доктрина, когда решает увеличивать версию или нет, должна сверять этот хэш: если изменился — увеличивает версию.


    Внутри доктрины вроде как свой UoW, пока она не видит ваши изменения — там много чего может сломаться и работать не так.


    Решение 2: вручную увеличивать версию при изменении агрегата (так же, как это делается в Unit Of Work)


    Выбрасывайте события при изменениях, а когда ловите — обновляйте версию соотв.агрегата


    1. myks92 Автор
      21.11.2021 15:45
      +1

      По-моему OrderLine вообще не должен знать какому order'у он принадлежит (вложенные сущности не должны знать о том, куда их засунули — это не их дело). Завтра OrderLine окажется в другом агрегате, что будете делать?

      Соглашусь с Вами что, возможно, пример не самый удачный, но всем понятный. Здесь рассматривал проблему не в том, что OrderLine может жить в другом агрегате или OrderLine сделать агрегатом. Проектирование ответственности агрегата всегда рассматривается индивидуально. Проблема здесь заключается в изменении любых коллекций-сущностей в агрегате, при которых не будет работать оптимистическая блокировка. Ведь такая проблема есть. Проблема, как мне кажется, не архитектуры. Любая вложенная коллекция в виде сущности даст нам эту проблему. Это может быть UserRoles, UserNetworks и так далее.

      Я бы думал в сторону вычисления хэша агрегата по его содержимому (у вас наоборот: вы через аннотации учите вложенную сущность определять какому агрегату она принадлежит). Добавляете в супертип AggregateRoot метод getHash(). Доктрина, когда решает увеличивать версию или нет, должна сверять этот хэш: если изменился — увеличивает версию.

      Хорошая идея. О таком не подумали. Пока что пришли к такому решению. Посмотрим в сторону предложенного вами решения. Возможно, из этого что-то получится. Здесь только есть одна проблема, когда вложенная сущность изменяется агрегат не попадает в Unit Of Work. Поэтому пришлось придумать обратную ссылку на агрегат, чтобы знать к какому агрегату принадлежит изменённая сущность. Но есть идея как получить rootEntity по другому, а именно вот так:

      $meta = $em->getClassMetadata(get_class($aggregateRoot));
      $meta->rootEntityName; // App/Entity/Order

      Но пока что это ничего не даёт) Надо её ID этого агрегата.

      Решение 2: вручную увеличивать версию при изменении агрегата (так же, как это делается в Unit Of Work)

      Выбрасывайте события при изменениях, а когда ловите — обновляйте версию соотв.агрегата

      Да. О таком решении знали. Иногда даже изменение версии делаются при публикации событий. Такое тоже практикуется некоторыми командами. Однако шли от того, что есть множество агрегатов, которые имеют данную проблему и хотели её решить наименьшим изменением в коде. С данным примером нам удалось достичь этого.

      В любом случае спасибо вам за ёмкий комментарий. Здесь есть над чем подумать и придти к каком-то более лаконичному решению, особенно с хэшем агрегата. Если есть что-то добавить — напишите.


  1. nogo
    21.11.2021 15:25

    Код класса OrderLine дублируется, вместо второго примера возможно Line должен был быть.


    1. myks92 Автор
      21.11.2021 15:26

      Благодарю) Не заметил, исправил)


  1. Maksclub
    23.11.2021 16:36

    Предлагаю более простое решение для:

    для изменений полей самого агрегата — она работает, но если изменяется сущность-коллекция внутри агрегата — блокировка перестаёт работать и версия не увеличивается на +1.

    использовать immutable значения внутри сущностей, то есть чтобы UoW видел изменение, то при изменении коллекции надо, чтобы это была новая коллекция... и всего-то

    Это как-то более явно


    1. myks92 Автор
      23.11.2021 19:29

      К сожалению, такой способ не работает. Пример метода addLine:

      public function addLine(Line $line): void
      {
          $items = new ArrayCollection();
          $items->add(new OrderLine($this, $line));
          $this->items = $items;
      }

      При таком использовании поле version остаётся прежней - не меняется. Вроде бы пример правильный.

      На счёт коллекций думал иначе. Сделать свою коллекцию, которая будет вызывать корень агрегата вроде такого:

      public function __construct(string $id)
      {
      	$this->items = new MyArrayCollection($this); //Передаём агрегат ($s), чтобы вызвать у него внутри $aggregate->updateAggregateVersion()
      }


      1. Maksclub
        23.11.2021 19:51

        Должен UoW видеть изменение сущности

        public function addLine(Line $line): void 
        {
          $items = clone $this->items;
          $items->add(new OrderLine($this, $line));
          $this->items = $items;
        }

        а ну да, если через корень делаете изменения, то сам Order тоже должен быть новым объектом, ведь по нему смотрим изменения

        крч следите за тем, что сущность должна меняться


        1. myks92 Автор
          23.11.2021 20:09

          Выше пример тоже не работает. Order агрегат и вряд ли когда-то будет новым, при вызове его из em:

          $order = $this->em->getRepository(self::ORDER)->find($id)

          крч следите за тем, что сущность должна меняться

          Вот как раз над этим и озаботились и эта проблема вышла в данную статью. Ищем лаконичный способ следить за изменениями. Можно пойти простым путём вроде:

          public function addLine(Line $line): void
          {
              $this->items->add(new OrderLine($this, $line));
              $this->aggregateVersion++; //Обновляем версию агрегата
          }
          
          public function editLine(string $id, Line $line): void
          {
              foreach ($this->items as $item) {
                if ($item->getId() === $id) {
                  $item->edit($line);
                  $this->aggregateVersion++; //Обновляем версию агрегата
                  return;
                }
              }
            	throw new DomainException('Order line not found.');
          }

          Но пока от этого способа отошли, так как это требует постоянно изменять версию в каждом методе. При наличии большого количества агрегатов в действующем проекте - это достаточно долго и много изменений. Хотя тоже не плохой вариант.

          Ещё был вариант изменениие версии при публикации события:

          public function releaseEvents(): array
          {
              $events = $this->recordedEvents;
              $this->recordedEvents = [];
              $this->updateAggregateVersion(); // Обновляем версию
              return $events;
          }

          Этот способ дополнительно требует наличие DomainEvent, даже если он не нужен или не написан в действующем проекте.


  1. michael_v89
    24.11.2021 00:47
    +1

    Мне кажется, в этой ситуации будет правильнее делать нормальную блокировку через FOR SHARE/FOR UPDATE, а не имитировать ее с помощью фиктивных полей. Может быть это медленнее, зато стабильнее и без всяких откатов и усложнения кода.