Мотивация

Сегодня я наткнулся на статью за авторством @enamored_poc. Увидев заголовок, я был в предвкушении: наконец-то кто-то взялся за этот гайд — в своё время я как раз искал что-то подобное. Однако, дочитав статью до конца, понял, что автор по сути просто пересказал раздел Bigger applications из официальной документации и лишь добавил пару замечаний оттуда же.

С одной стороны уже есть куча видео, статей, где можно изучить как пишутся правильно сервисы по всем заветам дядюшки Боба (автора "Чистой Архитектуры"). Но, с другой стороны, всё это обычно показывают на примере стандартного интернет-магазина или списка TODO, и в какой-то момент перестаёшь понимать, а зачем всё это вообще нужно.

Поэтому в этом гайде мы возьмём достаточно нетривиальную тему, которую я регулярно встречаю в вакансиях и на «галерах», и попробуем реализовать её на практике.

Постановка задачи

Владелец сети магазинов хочет загружать видео с камер видеонаблюдения на некоторый сервер. Это видео должно сохраняться в хранилище и покадрово обрабатываться нейросетями. В результате мы должны получить метаданные о том, сколько людей было в кадре, координаты их местоположения в кадре, а также их пол. Но больше всего владельцу нужно получать сводную информацию о том, сколько людей и какого пола фиксируется в разные моменты времени, чтобы он мог например менять выкладку продуктов. После сдачи MVP-версии заказчик может выдать нам контракт на дальнейшее развитие системы: анализ видео-трафика в реальном времени, распознавание лиц, выявление нарушителей, и, не дай бог, слежкой за сотрудниками и т.д.

Немного мотивации: в этой задачке (правда, не в этой части) мы затронем работу с RabbitMQ, Redis, S3, Postgres, а также пощупаем YOLO, opencv. Тем самым закроем большинство типичных требований к современным вакансиям. А еще эту задачку можно переложить на любую предметную область, связанную с видеоаналитикой.

Анализ задачи

Да, всё начинается с анализа предметной области, а не с создания структуры папок. Так что разложим задачку по полочкам.

  1. На вход поступает видеофайл; при этом в будущем этот видеофайл будет загружаться в потоковом режиме или напрямую транслироваться с камеры видеонаблюдения.

  2. Эти видеофайлы должны где-то и как-то храниться. Мы пока не можем спрогнозировать, сколько таких видео будет храниться, какие серверы готов выделить заказчик, какие требования будут к времени записи и чтения, будут ли SSD на этих серверах.

  3. Все видеофайлы должны быть разбиты на кадры и скормлены нейросетям. Сейчас мы ничего не знаем о нейросетях (мы перекладыватели JSON, а не аналитики): о том, как быстро они способны обработать все кадры, как часто эти кадры нужно подсовывать и многое другое. Зато мы можем предположить, что пользователю не нужно ждать результата обработки в рамках одного запроса, так как его интересуют только аналитические данные.

  4. Для каждого кадра нам необходимо выявить человека, его координаты в кадре, время и пол. В будущем заказчик хочет и распознавание воришек, а может, потом захочет следить и за кассирами, или за тем, какие тележки берут. Всё это тоже должно куда-то и в каком-то виде сохраняться. Мы слышали про NoSQL и Postgres, но пока не знаем, что лучше применить для будущих аналитических задач.

  5. Вероятно, нужно подготовить REST API, которое позволит делать запросы о количестве людей в кадре и их поле, а также показывать красивые графики и таблицы для заказчика. Держим в голове, что позже появится ещё множество данных, которые он захочет получать.

Выглядит жутко, куча неизвестных... Хм.. С чего бы начать?? Может набросать endpoint-ы для загрузки видео? Или сделать endpoint "загрузить картинку", а потом запустить нейросеть, которую советует ChatGPT, все-равно же разбивать покадрово видео придется? А может спроектировать БД, где будут сохранятся методанные с видео, и уже от данных плясать?

НЕТ! На самом деле у нас уже есть всё необходимое, чтобы проектировать архитектуру, а все «неизвестные» — это всего лишь детали, которые будут сбивать вас с пути. Даже слово «endpoint» здесь лишнее, не говоря уже о FastAPI. Так что же делать дальше?

Выделение сущностей и постановка задач доменного слоя

Как уже было сказано выше, у нас достаточно информации, чтобы выделить ключевые сущности домена. Начнём с самой базовой.

  • Источник записи. Нам важно сохранять информацию о том, откуда поступает видеофайл: связан ли он с одной конкретной камерой, группой камер или, например, с целой сетью магазинов. Пользователь может захотеть загружать как одиночные записи, так и десятки небольших роликов с одной и той же камеры.

Исходный код (domain/entity/video_source.py)
from dataclasses import dataclass, field
from typing import Optional, Dict, Any
from uuid import UUID, uuid4

@dataclass(frozen=True)
class RecordingSourceId:
    """Уникальный идентификатор источника записи внутри нашей системы."""
    value: UUID

    @staticmethod
    def new() -> "RecordingSourceId":
        return RecordingSourceId(uuid4())

    def __str__(self) -> str:
        return str(self.value)


@dataclass(frozen=True)
class RecordingSourceExtra:
    """
    Необязательные данные об источнике.
    Сейчас здесь почти ничего нет, но именно сюда
    позже поедут метаданные о местоположении камеры
    """
    floor: Optional[int] = None
    location_label: Optional[str] = None
    meta: Dict[str, Any] = field(default_factory=dict)


class RecordingSource:
    """
    Источник записи (камера/логический канал).
    На уровне домена нам пока важно только то,
    что есть его идентификатор и необязательные доп.данные.
    """
    idx: RecordingSourceId
    extra: RecordingSourceExtra = RecordingSourceExtra()
    def __init__(
        self,
        idx: RecordingSourceId
        extra: RecordingSourceExtra = RecordingSourceExtra()
    ) -> None:
        self.idx: RecordingSourceId = idx
        self.source_id: RecordingSourceExtra = extra
  • Видеофайл. Некоторый файл; при этом нам сейчас не важно, передан ли он нам сразу или записывается в потоковом режиме. Самое главное, что он точно должен содержать: уникальный идентификатор, время начала, уникальный идентификатор источника. При этом такие атрибуты, как время окончания, длительность, кодеки, размер кадра, уже будут опциональными и сейчас нам не особенно нужны.

Исходный код (domain/entity/video_file.py)
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional, Dict, Any
from uuid import UUID, uuid4

from domain.entity.video_source import RecordingSourceId


@dataclass(frozen=True)
class VideoFileId:
    """Уникальный идентификатор видеозаписи внутри нашей системы."""
    value: UUID

    @staticmethod
    def new() -> "VideoFileId":
        return VideoFileId(uuid4())

    def __str__(self) -> str:
        return str(self.value)


@dataclass
class VideoFileExtra:
    """
    Необязательные тех.детали видео.
    Сейчас они нам не критичны, но сюда можно будет
    добавить конец записи, кодеки, размер кадра и т.п.
    """
    ended_at: Optional[datetime] = None
    duration_seconds: Optional[float] = None
    codec: Optional[str] = None
    frame_width: Optional[int] = None
    frame_height: Optional[int] = None
    meta: Dict[str, Any] = field(default_factory=dict)

class VideoFile:
    """
    Видеофайл/видеозапись.
    Нас сейчас интересует:
      - уникальный идентификатор видео;
      - время начала записи;
      - источник, с которого это видео пришло.
    Остальное — опционально и уезжает в VideoFileExtra.
    """

    def __init__(
        self,
        idx: VideoFileId,
        source_id: "RecordingSourceId",
        started_at: datetime,
        extra: Optional[VideoFileExtra] = None,
    ) -> None:
        self.idx: VideoFileId = idx
        self.source_id: RecordingSourceId = source_id
        self.started_at: datetime = started_at
        self.extra: VideoFileExtra = extra or VideoFileExtra()
  • Кадр. Неважно, хранится ли он в виде картинки или нет, но у него точно будут следующие параметры: время, идентификатор видеофайла, сами данные в каком-то виде. Это всё, что нам пока нужно для идентификации; остальные атрибуты сейчас роли не играют.

Исходный код (domain/entity/frame.py)
from dataclasses import dataclass
from datetime import datetime
from typing import Any

from domain.entity.video_file import VideoFileIdx

@dataclass(frozen=True)
class FrameData:
    """
    Важный value object: «сырые» данные кадра.
    Сейчас нам не важно, это байты картинки, JPEG/PNG, numpy-массив
    или что-то ещё — главное, что это единый объект данных кадра.
    """
    value: Any


class Frame:
    """
    Один кадр видеозаписи.

    Для идентификации кадра в домене нам сейчас достаточно:
      - времени кадра;
      - идентификатора видеозаписи;
      - самих данных кадра (в каком-то виде).

    Остальные атрибуты (номер кадра, размер, формат и т.п.)
    считаем деталями реализации и пока в модель не тащим.
    """

    def __init__(
        self,
        video_idx: "VideoFileIdx",
        captured_at: datetime,
        data: FrameData,
    ) -> None:
        self.video_idx = video_idx
        self.captured_at = captured_at
        self.data = data
  • Объект. Как мы помним, заказчик на данном этапе требует просто находить людей в кадре, но в будущем это может быть что угодно (тележка, телефон в руках у кассира и т.д.). В кадрах мы будем идентифицировать именно объекты: у каждого объекта будет класс, дополнительные признаки (сейчас это только «пол»), а также ссылки на кадры, в которых этот объект был зафиксирован (заказчику ведь неинтересно, как клёво мы умеем работать с одной картинкой — ему нужны агрегированные данные).

Исходный код (domain/entity/detected_object.py)
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from typing import Dict, Any, List, Optional
from uuid import UUID, uuid4

from domain.entity.video_file import VideoFileIdx


@dataclass(frozen=True)
class ObjectIdx:
    """Уникальный идентификатор объекта (человек, тележка, телефон и т.д.)."""
    value: UUID

    @staticmethod
    def new() -> "ObjectIdx":
        return ObjectIdx(uuid4())

    def __str__(self) -> str:
        return str(self.value)


@dataclass(frozen=True)
class ObjectClass:
    """
    Класс объекта:
      - сейчас нас интересует 'person',
      - в будущем: 'cart', 'phone', 'employee', 'thief' и т.д.
    """
    value: str  # например: "person", "cart", "phone"


@dataclass
class ObjectAttributes:
    """
    Дополнительные признаки объекта.
    Их мы будем заполнять пока в виде словаря,
    но потом мы сможем лучше конкретизировать эти объекты.
    Главное, что нам из-за этих изменений не придется переделывать
    саму сущность.
    """
    meta: Dict[str, Any] = field(default_factory=dict)

@dataclass(frozen=True)
class Coordinate:
    """
    Координаты объекта в кадре
    """
    x: int
    y: int
    width: int
    height: int


@dataclass(frozen=True)
class FrameRef:
    """
    Ссылка на кадр, в котором объект был зафиксирован.

    Мы не навязываем отдельный идентификатор кадра, а ссылаемся
    через пару (video_idx, captured_at), чего достаточно в домене,
    чтобы однозначно указать кадр.
    """
    video_idx: "VideoFileIdx"
    captured_at: datetime
    coordinate: Coordinate


# === Сущность ===
class DetectedObject:
    """
    Объект, обнаруженный в видеопотоке.

    На уровне домена нас интересует:
      - уникальный идентификатор объекта;
      - его класс (person/cart/phone/…);
      - дополнительные признаки (сейчас — пол);
      - список кадров, в которых этот объект встречается
        (для дальнейшей агрегации и аналитики).
    """

    def __init__(
        self,
        idx: ObjectIdx,
        object_class: ObjectClass,
        attributes: Optional[ObjectAttributes] = None,
        frames: Optional[List[FrameRef]] = None,
    ) -> None:
        self.idx = idx
        self.object_class = object_class
        self.attributes = attributes or ObjectAttributes()
        self.frames: List[FrameRef] = frames or []
  • Отчет. Результат для заказчика. Конечно, его можно формировать «на лету», но вдруг камеры и нейросети будут работать супербыстро, и мы получим 140 кадров в секунду. Тогда для формирования любого отчёта нам придётся каждый раз пережевывать большой объём данных. Поэтому набросаем, что нам нужно заранее: временные промежутки, вид отчёта (количество людей, половой состав). Ещё не хватает пруфов: заказчику нужно будет доказать работоспособность нашего сервиса, поэтому добавляем ссылки на источники, видеофайлы и объекты.

Исходный код (domain/entity/report.py)
from dataclasses import dataclass, field
from datetime import datetime
from typing import Dict, Any, List
from uuid import UUID, uuid4

from domain.entity.recording_source import RecordingSourceIdx
from domain.entity.video_file import VideoFileIdx
from domain.entity.detected_object import ObjectIdx


@dataclass(frozen=True)
class ReportIdx:
    """Уникальный идентификатор отчёта."""
    value: UUID

    @staticmethod
    def new() -> "ReportIdx":
        return ReportIdx(uuid4())

    def __str__(self) -> str:
        return str(self.value)


@dataclass(frozen=True)
class ReportType:
    """
    Тип отчёта:
      - сейчас достаточно, например, 'people_count', 'gender_distribution';
      - в будущем: 'traffic_heatmap', 'theft_suspects', 'queue_length' и т.д.
    """
    value: str


@dataclass(frozen=True)
class TimeRange:
    """Временной промежуток, за который строится отчёт."""
    start: datetime
    end: datetime


@dataclass
class ReportData:
    """
    Агрегированные данные отчёта.

    Сейчас оставляем это в виде произвольного словаря,
    чтобы не блокировать развитие. Когда домен стабилизируется,
    из meta можно будет вытащить отдельные данные
    """
    meta: Dict[str, Any] = field(default_factory=dict)


# === Сущность ===

class Report:
    """
    Отчёт для заказчика.
    """

    def __init__(
        self,
        idx: ReportIdx,
        report_type: ReportType,
        time_range: TimeRange,
        data: ReportData,
        source_idx_list: List["RecordingSourceIdx"],
        video_idx_list: List["VideoFileIdx"],
        object_idx_list: List["ObjectIdx"],
    ) -> None:
        self.idx = idx
        self.report_type = report_type
        self.time_range = time_range
        self.data = data

        self.source_idx_list = list(source_idx_list)
        self.video_idx_list = list(video_idx_list)
        self.object_idx_list = list(object_idx_list)

Ну всё, вроде бы понятно, ведь у нас есть набор сущностей. Следующий шаг: написать вокруг этого эндпоинты или даже набросать БД, раз всё это уже можно сохранять. Опять нет!

Сначала попробуем выделить всё, что происходит вокруг этих сущностей: какие действия с ними выполняются и какие задачи решают разные части системы. На этом этапе мы будем определять сервисы. А чтобы описать их поведение абстрактно, нам поможет ABC из стандартной библиотеки, позволяющий задавать интерфейсы через абстрактные базовые классы. Начнём:

  • Сервис сохранения файлов. Задача нетривиальная: можно записывать файл в потоковом режиме, можно сразу копировать его на диск, но это детали — сейчас они нам не нужны. Самое главное — результат: файл сохранён или нет.

Исходный код (domain/service/file_storage.py)
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Optional, BinaryIO

from domain.entity.video_file import VideoFileIdx


@dataclass(frozen=True)
class FileLocation:
    """
    Абстрактное местоположение файла.
    Это может быть путь на диске, ключ в объектном хранилище,
    URL и т.д. Конкретику решит инфраструктура.
    """
    value: str


@dataclass(frozen=True)
class FileSaveResult:
    """
    Результат сохранения файла.
    Главное — удалось или нет. Остальное опционально.
    """
    success: bool
    location: Optional[FileLocation] = None
    error_message: Optional[str] = None


class FileStorage(ABC):
    """
    Абстрактный сервис сохранения файлов.

    Детали (стриминг, буферизация, ретраи, конкретное хранилище)
    остаются за пределами домена. Здесь нас интересует только:
    «попробуй сохранить» → FileSaveResult.
    """

    @abstractmethod
    def save_video_content(
        self,
        video_idx: "VideoFileIdx",
        content: BinaryIO,
    ) -> FileSaveResult:
        """
        Сохранить бинарное содержимое видеозаписи,
        связав его с доменной сущностью VideoFile.

        Аргумент `content` — абстрактный поток байт:
        это может быть открытый файл, сетевой стрим и т.п.
        """
        raise NotImplementedError
  • Сервис нарезки кадров из видео. Опять же, мы не знаем, какие-либо библиотеки существуют для этого, какой размер кадра нам нужен на выходе и т.д. Но зато мы знаем, что в результате работы сервиса мы должны получить исчерпывающие данные для формирования сущности «Кадр».

Исходный код (domain/service/frame_extractor.py)
from abc import ABC, abstractmethod
from typing import Iterable

from domain.entity.video_file import VideoFileIdx
from domain.entity.frame import Frame


class FrameExtractor(ABC):
    """
    Абстрактный сервис нарезки кадров из видео.

    На уровне домена нас интересует только то, что
    на вход подаётся идентификатор видеозаписи, а на выходе
    мы получаем набор сущностей Frame, достаточный для
    дальнейшей обработки и аналитики.
    """

    @abstractmethod
    def extract_frames(self, video_idx: "VideoFileIdx") -> Iterable["Frame"]:
        """
        Извлечь кадры для указанного видео.

        Конкретная реализация сама решает:
          - как прочитать файл (локальный диск, object storage и т.п.);
          - как именно и с какими параметрами вырезать кадры.
        """
        raise NotImplementedError
  • Сервис обработки кадров. Мы не знаем, какие именно нейросети нужны; возможно, придётся запускать несколько моделей друг за другом. Но мы точно можем сказать, что на выходе должны быть получены списки классов, признаков и координат в кадре. Эти данные, в свою очередь, нужны для создания и обновления сущности «Объект».

Исходный код (domain/service/frame_processor.py)
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import List


from domain.entity.frame import Frame
from domain.entity.detected_object import ObjectClass, ObjectAttributes, Coordinate


@dataclass(frozen=True)
class RawDetection:
    """
    Результат работы нейросетей по одному объекту на кадре.

    На уровне сервиса обработки кадров нам достаточно:
      - класса объекта;
      - набора признаков (пока в виде ObjectAttributes);
      - координат объекта в кадре.
    """
    object_class: "ObjectClass"
    attributes: "ObjectAttributes"
    coordinate: "Coordinate"


class FrameProcessor(ABC):
    """
    Абстрактный сервис обработки кадров нейросетями.

    Мы не знаем:
      - какие именно нейросети будут использоваться;
      - сколько их будет и в каком порядке они запустятся;
      - на каком железе это всё будет крутиться.
    """

    @abstractmethod
    def process_frame(self, frame: "Frame") -> List[RawDetection]:
        """
        Обработать кадр и вернуть список найденных объектов
        с их классами, признаками и координатами.
        """
        raise NotImplementedError
  • Сервис классификации объектов. Проблема в том, что кадров у нас много, и объекты на кадрах нужно как-то связать. Мы пока не знаем как, но, как подсказывает логика, следует анализировать предыдущие кадры и пытаться понять, нужно ли создавать новый объект или можно обновить существующий.

Исходный код (domain/service/object_classifier.py)
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import List

from domain.entity.frame import Frame
from domain.entity.detected_object import ObjectIdx, FrameRef
from domain.service.frame_processor import RawDetection

@dataclass(frozen=True)
class ObjectClassificationDecision:
    """
    Результат классификации одного «сыро обнаруженного» объекта.

    На уровне домена нас интересует:
      - какой доменной сущности DetectedObject он соответствует
        (существующий объект или новый);
      - в каком кадре и с какими координатами он был обнаружен;
      - исходные данные детекции (класс, признаки, координаты).
    """
    existing_object_idx: Optional["ObjectIdx"]  # None, если объект новый
    frame_ref: "FrameRef"
    raw_detection: "RawDetection"

    @property
    def is_new(self) -> bool:
        """True, если по этой детекции нужно создать новый объект."""
        return self.existing_object_idx is None


class ObjectClassifier(ABC):
    """
    Абстрактный сервис классификации/сопоставления объектов по кадрам.

    Его задача — только принять решение:
      - к какому уже существующему ObjectIdx отнести детекцию;
      - по каким детекциям нужно создать НОВЫЙ объект.

    Как именно это делается (трекеры, эвклидово расстояние, нейросети и т.п.) —
    детализация инфраструктуры, домен этого не знает.
    """

    @abstractmethod
    def classify_objects(
        self,
        frame: "Frame",
        detections: List["RawDetection"],
    ) -> List[ObjectClassificationDecision]:
        """
        На основании набора детекций на кадре решить,
        какие из них принадлежат уже существующим объектам,
        а для каких потребуется создать новый объект.
        """
        raise NotImplementedError
  • Сервис анализа найденных объектов. Заказчику всё-таки нужно получать аналитические данные, и мы уже определились, что формировать их «по запросу» не стоит. Поэтому нам нужен сервис, который анализирует уникальные объекты и старается добыть данные, необходимые для построения отчёта.

Исходный код (domain/service/object_analyzer.py)
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import Dict, Any, List

from domain.entity.detected_object import DetectedObject, ObjectIdx
from domain.entity.video_file import VideoFileIdx
from domain.entity.recording_source import RecordingSourceIdx


@dataclass
class ObjectAnalyticsResult:
    """
    Результат анализа уникальных объектов.

    Здесь мы собираем всё, что нужно для дальнейшего построения отчёта:
      - агрегированные метрики (meta),
      - «пруфы» — ссылки на источники, видео и объекты.
    """
    meta: Dict[str, Any] = field(default_factory=dict)
    source_idx_list: List["RecordingSourceIdx"] = field(default_factory=list)
    video_idx_list: List["VideoFileIdx"] = field(default_factory=list)
    object_idx_list: List["ObjectIdx"] = field(default_factory=list)


class ObjectAnalyzer(ABC):
    """
    Абстрактный сервис анализа найденных (уже классифицированных) объектов.

    Его задача:
      - взять набор уникальных объектов (DetectedObject),
      - «переварить» их,
      - выдать агрегированные данные, которые потом
        пойдут в построение доменной сущности отчёта.

    ВАЖНО: сервис ничего не знает о том, как будут выглядеть
    конкретные отчёты, он лишь добывает сырые аналитические данные
    и пруфы.
    """

    @abstractmethod
    def analyze_objects(
        self,
        objects: List["DetectedObject"],
    ) -> ObjectAnalyticsResult:
        """
        Проанализировать набор уникальных объектов и вернуть
        агрегированную информацию, пригодную для построения отчёта.
        """
        raise NotImplementedError

Теперь поговорим о сохранении, извлечении и поиске сущностей. И здесь нас снова выручает модуль abc, с помощью которого мы напишем абстрактный интерфейс для работы с хранилищем (Repository):

  • Репозиторий источников. Нам нужно всё-таки вести учёт источников и связывать их с видеофайлами.

Исходный код (domain/repository/recording_source_repository.py)
from abc import ABC, abstractmethod
from typing import Optional, List


from domain.entity.recording_source import RecordingSource, RecordingSourceIdx


class RecordingSourceRepository(ABC):
    """
    Абстрактный репозиторий источников записи.

    Детали хранилища (Postgres, NoSQL, файлики, in-memory)
    здесь не определяются.
    """

    @abstractmethod
    def save(self, source: "RecordingSource") -> None:
        """
        Сохранить или обновить источник записи.
        """
        raise NotImplementedError

    @abstractmethod
    def get_by_idx(self, idx: "RecordingSourceIdx") -> Optional["RecordingSource"]:
        """
        Получить источник по его доменному идентификатору.
        """
        raise NotImplementedError

    @abstractmethod
    def list_all(self) -> List["RecordingSource"]:
        """
        Вернуть все известные источники.
        (Пагинация уже как домашнее задание)
        """
        raise NotImplementedError
  • Репозиторий видеофайлов. Должен позволять сохранять важные данные о сущности «Видеофайл», включая её идентификатор, чтобы мы могли легко находить видеофайлы и работать с ними из сервиса сохранения видеофайлов.

Исходный код (domain/repository/video_file_repository.py)
from abc import ABC, abstractmethod
from typing import Optional, List


from domain.entity.video_file import VideoFile, VideoFileIdx
from domain.entity.recording_source import RecordingSourceIdx


class VideoFileRepository(ABC):
    """
    Абстрактный репозиторий видеозаписей.

    Задачи:
      - сохранить/обновить сущность VideoFile;
      - уметь по доменному идентификатору VideoFileIdx быстро её найти;
      - при необходимости находить все видео, привязанные к источнику.
    """

    @abstractmethod
    def save(self, video: "VideoFile") -> None:
        """
        Сохранить или обновить видеозапись.
        """
        raise NotImplementedError

    @abstractmethod
    def get_by_idx(self, idx: "VideoFileIdx") -> Optional["VideoFile"]:
        """
        Найти видеозапись по её доменному идентификатору.
        """
        raise NotImplementedError

    @abstractmethod
    def list_by_source_idx(self, source_idx: "RecordingSourceIdx") -> List["VideoFile"]:
        """
        Получить все видеозаписи, пришедшие с указанного источника.
        """
        raise NotImplementedError
  • Репозиторий кадров. ММы хотим сохранять пруфы нашей работы, поэтому не забываем и про хранилище кадров, которые уже были проанализированы. При этом, нужно ли сохранять сами бинарные данные кадров, можно решить позже.

Исходный код (domain/repository/frame_repository.py)
from abc import ABC, abstractmethod
from datetime import datetime
from typing import Optional, List

from domain.entity.frame import Frame
from domain.entity.video_file import VideoFileIdx


class FrameRepository(ABC):
    """
    Абстрактный репозиторий кадров.

    Задачи:
      - сохранять проанализированные кадры как «пруф» работы системы;
      - уметь находить кадры по видеозаписи и времени;
      - уметь получать набор кадров по видеозаписи и/или диапазону времени.
    """

    @abstractmethod
    def save(self, frame: "Frame") -> None:
        """
        Сохранить или обновить кадр.
        """
        raise NotImplementedError

    @abstractmethod
    def get_by_video_and_time(
        self,
        video_idx: "VideoFileIdx",
        captured_at: datetime,
    ) -> Optional["Frame"]:
        """
        Найти конкретный кадр по видеозаписи и времени съёмки.
        Эта пара (video_idx, captured_at).
        """
        raise NotImplementedError

    @abstractmethod
    def list_by_video_idx(
        self,
        video_idx: "VideoFileIdx",
        *,
        start_time: Optional[datetime] = None,
        end_time: Optional[datetime] = None,
    ) -> List["Frame"]:
        """
        Получить кадры для указанного видео.
        """
        raise NotImplementedError
  • Репозиторий объектов. Нужен набор методов для сохранения объектов и их поиска по различным параметрам, которые могут потребоваться сервисам.

Исходный код (domain/repository/object_repository.py)
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional, List, Dict, Any

from domain.entity.detected_object import DetectedObject, ObjectIdx, ObjectClass
from domain.entity.recording_source import RecordingSourceIdx
from domain.entity.video_file import VideoFileIdx


@dataclass
class ObjectSearchCriteria:
    """
    Критерии поиска объектов.

    Это абстрактный фильтр, который можно постепенно расширять
    по мере появления новых сценариев в сервисах.
    """
    object_classes: Optional[List["ObjectClass"]] = None
    source_idx: Optional["RecordingSourceIdx"] = None
    video_idx: Optional["VideoFileIdx"] = None

    # Временной диапазон, в котором объект хоть раз появлялся
    appeared_from: Optional[datetime] = None
    appeared_to: Optional[datetime] = None

    # Фильтрация по признакам (ключи/значения в ObjectAttributes.meta)
    attributes: Dict[str, Any] = field(default_factory=dict)


class ObjectRepository(ABC):
    """
    Абстрактный репозиторий объектов (DetectedObject).
    """

    @abstractmethod
    def save(self, obj: "DetectedObject") -> None:
        """
        Сохранить или обновить объект.
        """
        raise NotImplementedError

    @abstractmethod
    def get_by_idx(self, idx: "ObjectIdx") -> Optional["DetectedObject"]:
        """
        Получить объект по доменному идентификатору.
        """
        raise NotImplementedError

    @abstractmethod
    def search(
        self,
        criteria: ObjectSearchCriteria,
        *,
        limit: Optional[int] = None,
        offset: Optional[int] = None,
    ) -> List["DetectedObject"]:
        """
        Поиск объектов по набору критериев.
        """
        raise NotImplementedError
  • Репозиторий отчетов. Сохраняем отчёты; при этом мы пока даже не выбираем временные промежутки их хранения — нам важен только результат. Также не забываем про методы извлечения отчётов.

Исходный код (domain/repository/report_repository.py)
from abc import ABC, abstractmethod
from datetime import datetime
from typing import Optional, List

from domain.entity.report import Report, ReportIdx, ReportType


class ReportRepository(ABC):
    """
    Абстрактный репозиторий отчётов.
    """

    @abstractmethod
    def save(self, report: "Report") -> None:
        """
        Сохранить или обновить отчёт.
        """
        raise NotImplementedError

    @abstractmethod
    def get_by_idx(self, idx: "ReportIdx") -> Optional["Report"]:
        """
        Получить отчёт по его доменному идентификатору.
        """
        raise NotImplementedError

    @abstractmethod
    def list_by_type(
        self,
        report_type: "ReportType",
        *,
        start_time: Optional[datetime] = None,
        end_time: Optional[datetime] = None,
        limit: Optional[int] = None,
        offset: Optional[int] = None,
    ) -> List["Report"]:
        """
        Получить отчёты указанного типа.

        Временной промежуток задаёт интересующий нас диапазон
        по времени, за который построены отчёты.

        Реализация сама решает:
          - как интерпретировать start_time/end_time относительно
            поля TimeRange у отчёта;
          - как оптимизировать выборку и пагинацию.
        """
        raise NotImplementedError

    @abstractmethod
    def list_all(
        self,
        *,
        limit: Optional[int] = None,
        offset: Optional[int] = None,
    ) -> List["Report"]:
        """
        Вернуть все отчёты (с возможной пагинацией).
        """
        raise NotImplementedError

Вообще, сущности хорошо бы не создавать «голыми руками». Поэтому лучше сразу позаботиться о фабриках и не конструировать их напрямую. Да, сначала это кажется лишней работой, но по мере усложнения сущностей вы всё равно к этому придёте: формат входных данных будет меняться. Давайте сразу заложим фабрики для всех ключевых сущностей.

Исходный код (domain/factory/recording_source_factory.py)
from dataclasses import dataclass
from typing import Optional

from domain.entity.recording_source import (
    RecordingSource,
    RecordingSourceIdx,
    RecordingSourceExtra,
)


@dataclass
class RecordingSourceFactory:
    """
    Фабрика источников записи.
    """

    def create(
        self,
        extra: Optional["RecordingSourceExtra"] = None,
    ) -> "RecordingSource":
        idx = RecordingSourceIdx.new()
        return RecordingSource(
            idx=idx,
            extra=extra or "RecordingSourceExtra"()
        )
Исходный код (domain/factory/video_file_factory.py)
from dataclasses import dataclass
from datetime import datetime
from typing import Optional

from domain.entity.video_file import VideoFile, VideoFileIdx, VideoFileExtra
from domain.entity.recording_source import RecordingSourceIdx


@dataclass
class VideoFileFactory:
    """
    Фабрика видеозаписей.
    """

    def create(
        self,
        source_idx: RecordingSourceIdx,
        started_at: datetime,
        extra: Optional["VideoFileExtra"] = None,
    ) -> "VideoFile":
        video_idx = VideoFileIdx.new()
        return VideoFile(
            idx=video_idx,
            source_idx=source_idx,
            started_at=started_at,
            extra=extra or VideoFileExtra(),
        )
Исходный код (domain/factory/frame_factory.py)
from dataclasses import dataclass
from datetime import datetime
from typing import Any

from domain.entity.frame import Frame, FrameData
from domain.entity.video_file import VideoFileIdx


@dataclass
class FrameFactory:
    """
    Фабрика кадров.
    """

    def create(
        self,
        video_idx: VideoFileIdx,
        captured_at: datetime,
        raw_data: Any,
    ) -> "Frame":
        frame_data = FrameData(value=raw_data)
        return Frame(
            video_idx=video_idx,
            captured_at=captured_at,
            data=frame_data,
        )
Исходный код (domain/factory/detected_object_factory.py)
from dataclasses import dataclass
from typing import Optional

from domain.entity.detected_object import (
    DetectedObject,
    ObjectIdx,
    ObjectClass,
    ObjectAttributes,
    FrameRef,
)


@dataclass
class DetectedObjectFactory:
    """
    Фабрика доменных объектов (DetectedObject).
    """

    def create_new(
        self,
        object_class: ObjectClass,
        attributes: Optional[ObjectAttributes] = None,
        first_frame: Optional[FrameRef] = None,
    ) -> DetectedObject:
        idx = ObjectIdx.new()
        frames = [first_frame] if first_frame is not None else None

        return DetectedObject(
            idx=idx,
            object_class=object_class,
            attributes=attributes or ObjectAttributes(),
            frames=frames,
        )
Исходный код (domain/factory/report_factory.py)
from dataclasses import dataclass

from domain.entity.report import (
    Report,
    ReportIdx,
    ReportType,
    TimeRange,
    ReportData,
)
from domain.service.object_analyzer import ObjectAnalyticsResult


@dataclass
class ReportFactory:
    """
    Фабрика отчётов.
    """

    def create_from_analytics(
        self,
        report_type: ReportType,
        time_range: TimeRange,
        analytics: ObjectAnalyticsResult,
    ) -> Report:
        idx = ReportIdx.new()

        data = ReportData(meta=analytics.meta)

        return Report(
            idx=idx,
            report_type=report_type,
            time_range=time_range,
            data=data,
            source_idx_list=analytics.source_idx_list,
            video_idx_list=analytics.video_idx_list,
            object_idx_list=analytics.object_idx_list,
        )

Вроде всё готово, но наши сущности сейчас — это не просто «мешки с данными», которые сами по себе ничего не делают. Можно, конечно, добавить им простые сеттеры и геттеры, но тогда они останутся просто контейнерами для данных и не будут отражать поведение объектов и их возможности. А нам ведь нужно в будущем поддерживать систему, поэтому лучше сразу думать о том, какие операции должны жить внутри самих сущностей.

  • Видеоисточник.

domain/entity/video_source.py
class RecordingSource:
    # старый код

    def update_extra(self, extra: RecordingSourceExtra) -> None:
        """
        Обновить дополнительные данные об источнике.
        """
        self.extra = extra
  • Видеофайл.

domain/entity/video_file.py
class VideoFile:
    # старый код

    def mark_ended(self, ended_at: Optional[datetime] = None) -> None:
        """
        Пометить видеозапись как завершённую.
        """
        new_ended_at = ended_at or datetime.utcnow()

        if new_ended_at < self.started_at:
            raise ValueError("ended_at cannot be earlier than started_at")

        if self.extra.ended_at is not None and new_ended_at <= self.extra.ended_at:
            return

        self.extra.ended_at = new_ended_at
        self.extra.duration_seconds = (new_ended_at - self.started_at).total_seconds()

    def update_duration(self, duration_seconds: float) -> None:
        """
        Обновить длительность видеозаписи в секундах.
        """
        if duration_seconds < 0:
            raise ValueError("duration_seconds cannot be negative")

        self.extra.duration_seconds = duration_seconds

    def update_technical_info(
        self,
        *,
        codec: Optional[str] = None,
        frame_width: Optional[int] = None,
        frame_height: Optional[int] = None,
        meta: Optional[Dict[str, Any]] = None,
    ) -> None:
        """
        Обновить технические параметры видеозаписи.
        """
        if frame_width is not None and frame_width <= 0:
            raise ValueError("frame_width must be positive")

        if frame_height is not None and frame_height <= 0:
            raise ValueError("frame_height must be positive")

        if codec is not None:
            self.extra.codec = codec

        if frame_width is not None:
            self.extra.frame_width = frame_width

        if frame_height is not None:
            self.extra.frame_height = frame_height

        if meta:
            self.extra.meta.update(meta)

    def update_extra(self, extra: VideoFileExtra) -> None:
        """
        Полностью заменить объект с дополнительными данными.
        """
        self.extra = extra
  • Кадр

domain/entity/frame.py
class Frame:
    # старый код

    def update_data(self, data: FrameData) -> None:
        """
        Заменить данные кадра.

        Смысл:
          - кадр мог быть перекодирован,
          - к нему могла быть применена анонимизация/маскирование,
          - данные могли быть уменьшены (даунскейл, JPEG и т.п.).

        Вместо прямого присваивания снаружи, всё изменение «сырых» данных
        кадра проходит через этот метод.
        """
        self.data = data

    def is_in_time_range(self, start: datetime, end: datetime) -> bool:
        """
        Проверить, попадает ли кадр во временной интервал [start, end).
        """
        return start <= self.captured_at < end
  • Объект

domain/entity/detected_object.py
class DetectedObject:
    # старый код

    def add_frame(self, frame_ref: FrameRef) -> None:
        """
        Зафиксировать, что объект был замечен ещё в одном кадре.
        """
        self.frames.append(frame_ref)

    def last_seen_at(self) -> Optional[datetime]:
        """
        Вернуть момент времени, когда объект был замечен последним.
        """
        if not self.frames:
            return None
        return max(ref.captured_at for ref in self.frames)

    def was_seen_in_range(self, start: datetime, end: datetime) -> bool:
        """
        Проверить, встречался ли объект в интервале [start, end).
        """
        return any(start <= ref.captured_at < end for ref in self.frames)

    def update_attributes(self, meta: Dict[str, Any]) -> None:
        """
        Обновить признаки объекта (attributes.meta).
        """
        self.attributes.meta.update(meta)
  • Отчет

domain/entity/report.py
class Report:
    # старый код

    def covers_moment(self, moment: datetime) -> bool:
        """
        Проверить, относится ли указанный момент ко времени этого отчёта.
        """
        return self.time_range.start <= moment < self.time_range.end

    def overlaps_with(self, start: datetime, end: datetime) -> bool:
        """
        Проверить, пересекается ли отчёт с временным интервалом [start, end).
        """
        r_start = self.time_range.start
        r_end = self.time_range.end
        return not (end <= r_start or r_end <= start)

    def add_source_idx(self, source_idx: "RecordingSourceIdx") -> None:
        """
        Добавить источник в список «пруфов» отчёта.
        """
        if source_idx not in self.source_idx_list:
            self.source_idx_list.append(source_idx)

    def add_video_idx(self, video_idx: "VideoFileIdx") -> None:
        """
        Добавить видеозапись в список «пруфов» отчёта.
        """
        if video_idx not in self.video_idx_list:
            self.video_idx_list.append(video_idx)

    def add_object_idx(self, object_idx: "ObjectIdx") -> None:
        """
        Добавить объект в список «пруфов» отчёта.
        """
        if object_idx not in self.object_idx_list:
            self.object_idx_list.append(object_idx)

На данном этапе мы ужасно далеки от создания чего-то работоспособного. Тогда зачем всё это нужно?

  1. Мы уже понимаем, что нам нужно делать, и можем прогнозировать, как все компоненты будут взаимодействовать друг с другом.

  2. У нас сейчас нет привязки к конкретным технологиям, мы даже не приступали к их анализу, но ЗАТО мы уже можем поэтапно разобрать обработку одного видеофайла.

  3. Наша архитектура может быть легко расширена под новые требования, когда заказчик согласится продолжить работу с нами.

  4. Мы можем без труда написать реализации наших абстрактных сервисов и репозиториев, используя встроенные типы данных (dict, list) или модули для работы с файлами.

Таким образом, у нас появился домен (Domain), который описывает предметную область нашей задачки и является центром всей системы: именно здесь задаются бизнес-правила и определяется поведение объектов. Далее нам предстоит описать, как компоненты домена взаимодействуют друг с другом (слой Application), — но об этом уже в следующей статье (если, конечно, она вам зайдёт).

P.S. Я намеренно полностью или почти опустил такие важные компоненты, как объекты-значения, доменные события, агрегаты, bounded context и т.п. По моему мнению, это тяжеловато даже для многих специалистов, а у нас всё-таки гайд для новичков. Но какие-то моменты постараюсь осветить в следующей статье.

P.P.S. Это моя первая статья, так что не бейте сильно. С удовольствием почитаю ваши пожелания и всё исправлю/дополню. Сразу оговорюсь: кода много, а старые проекты куда-то затерялись, поэтому в кодогенерации не обошлось без помощи GPT. Заранее спасибо за понимание.

Комментарии (2)


  1. enamored_poc
    02.12.2025 20:19

    Плохой рекламы не бывает) Спасибо за упоминания!


    1. lackyboye14 Автор
      02.12.2025 20:19

      Вам спасибо, что дали вдохновения стартануть)