Мотивация
Сегодня я наткнулся на статью за авторством @enamored_poc. Увидев заголовок, я был в предвкушении: наконец-то кто-то взялся за этот гайд — в своё время я как раз искал что-то подобное. Однако, дочитав статью до конца, понял, что автор по сути просто пересказал раздел Bigger applications из официальной документации и лишь добавил пару замечаний оттуда же.
С одной стороны уже есть куча видео, статей, где можно изучить как пишутся правильно сервисы по всем заветам дядюшки Боба (автора "Чистой Архитектуры"). Но, с другой стороны, всё это обычно показывают на примере стандартного интернет-магазина или списка TODO, и в какой-то момент перестаёшь понимать, а зачем всё это вообще нужно.
Поэтому в этом гайде мы возьмём достаточно нетривиальную тему, которую я регулярно встречаю в вакансиях и на «галерах», и попробуем реализовать её на практике.
Постановка задачи
Владелец сети магазинов хочет загружать видео с камер видеонаблюдения на некоторый сервер. Это видео должно сохраняться в хранилище и покадрово обрабатываться нейросетями. В результате мы должны получить метаданные о том, сколько людей было в кадре, координаты их местоположения в кадре, а также их пол. Но больше всего владельцу нужно получать сводную информацию о том, сколько людей и какого пола фиксируется в разные моменты времени, чтобы он мог например менять выкладку продуктов. После сдачи MVP-версии заказчик может выдать нам контракт на дальнейшее развитие системы: анализ видео-трафика в реальном времени, распознавание лиц, выявление нарушителей, и, не дай бог, слежкой за сотрудниками и т.д.
Немного мотивации: в этой задачке (правда, не в этой части) мы затронем работу с RabbitMQ, Redis, S3, Postgres, а также пощупаем YOLO, opencv. Тем самым закроем большинство типичных требований к современным вакансиям. А еще эту задачку можно переложить на любую предметную область, связанную с видеоаналитикой.
Анализ задачи
Да, всё начинается с анализа предметной области, а не с создания структуры папок. Так что разложим задачку по полочкам.
На вход поступает видеофайл; при этом в будущем этот видеофайл будет загружаться в потоковом режиме или напрямую транслироваться с камеры видеонаблюдения.
Эти видеофайлы должны где-то и как-то храниться. Мы пока не можем спрогнозировать, сколько таких видео будет храниться, какие серверы готов выделить заказчик, какие требования будут к времени записи и чтения, будут ли SSD на этих серверах.
Все видеофайлы должны быть разбиты на кадры и скормлены нейросетям. Сейчас мы ничего не знаем о нейросетях (мы перекладыватели JSON, а не аналитики): о том, как быстро они способны обработать все кадры, как часто эти кадры нужно подсовывать и многое другое. Зато мы можем предположить, что пользователю не нужно ждать результата обработки в рамках одного запроса, так как его интересуют только аналитические данные.
Для каждого кадра нам необходимо выявить человека, его координаты в кадре, время и пол. В будущем заказчик хочет и распознавание воришек, а может, потом захочет следить и за кассирами, или за тем, какие тележки берут. Всё это тоже должно куда-то и в каком-то виде сохраняться. Мы слышали про NoSQL и Postgres, но пока не знаем, что лучше применить для будущих аналитических задач.
Вероятно, нужно подготовить REST API, которое позволит делать запросы о количестве людей в кадре и их поле, а также показывать красивые графики и таблицы для заказчика. Держим в голове, что позже появится ещё множество данных, которые он захочет получать.
Выглядит жутко, куча неизвестных... Хм.. С чего бы начать?? Может набросать endpoint-ы для загрузки видео? Или сделать endpoint "загрузить картинку", а потом запустить нейросеть, которую советует ChatGPT, все-равно же разбивать покадрово видео придется? А может спроектировать БД, где будут сохранятся методанные с видео, и уже от данных плясать?
НЕТ! На самом деле у нас уже есть всё необходимое, чтобы проектировать архитектуру, а все «неизвестные» — это всего лишь детали, которые будут сбивать вас с пути. Даже слово «endpoint» здесь лишнее, не говоря уже о FastAPI. Так что же делать дальше?
Выделение сущностей и постановка задач доменного слоя
Как уже было сказано выше, у нас достаточно информации, чтобы выделить ключевые сущности домена. Начнём с самой базовой.
Источник записи. Нам важно сохранять информацию о том, откуда поступает видеофайл: связан ли он с одной конкретной камерой, группой камер или, например, с целой сетью магазинов. Пользователь может захотеть загружать как одиночные записи, так и десятки небольших роликов с одной и той же камеры.
Исходный код (domain/entity/video_source.py)
from dataclasses import dataclass, field
from typing import Optional, Dict, Any
from uuid import UUID, uuid4
@dataclass(frozen=True)
class RecordingSourceId:
"""Уникальный идентификатор источника записи внутри нашей системы."""
value: UUID
@staticmethod
def new() -> "RecordingSourceId":
return RecordingSourceId(uuid4())
def __str__(self) -> str:
return str(self.value)
@dataclass(frozen=True)
class RecordingSourceExtra:
"""
Необязательные данные об источнике.
Сейчас здесь почти ничего нет, но именно сюда
позже поедут метаданные о местоположении камеры
"""
floor: Optional[int] = None
location_label: Optional[str] = None
meta: Dict[str, Any] = field(default_factory=dict)
class RecordingSource:
"""
Источник записи (камера/логический канал).
На уровне домена нам пока важно только то,
что есть его идентификатор и необязательные доп.данные.
"""
idx: RecordingSourceId
extra: RecordingSourceExtra = RecordingSourceExtra()
def __init__(
self,
idx: RecordingSourceId
extra: RecordingSourceExtra = RecordingSourceExtra()
) -> None:
self.idx: RecordingSourceId = idx
self.source_id: RecordingSourceExtra = extraВидеофайл. Некоторый файл; при этом нам сейчас не важно, передан ли он нам сразу или записывается в потоковом режиме. Самое главное, что он точно должен содержать: уникальный идентификатор, время начала, уникальный идентификатор источника. При этом такие атрибуты, как время окончания, длительность, кодеки, размер кадра, уже будут опциональными и сейчас нам не особенно нужны.
Исходный код (domain/entity/video_file.py)
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional, Dict, Any
from uuid import UUID, uuid4
from domain.entity.video_source import RecordingSourceId
@dataclass(frozen=True)
class VideoFileId:
"""Уникальный идентификатор видеозаписи внутри нашей системы."""
value: UUID
@staticmethod
def new() -> "VideoFileId":
return VideoFileId(uuid4())
def __str__(self) -> str:
return str(self.value)
@dataclass
class VideoFileExtra:
"""
Необязательные тех.детали видео.
Сейчас они нам не критичны, но сюда можно будет
добавить конец записи, кодеки, размер кадра и т.п.
"""
ended_at: Optional[datetime] = None
duration_seconds: Optional[float] = None
codec: Optional[str] = None
frame_width: Optional[int] = None
frame_height: Optional[int] = None
meta: Dict[str, Any] = field(default_factory=dict)
class VideoFile:
"""
Видеофайл/видеозапись.
Нас сейчас интересует:
- уникальный идентификатор видео;
- время начала записи;
- источник, с которого это видео пришло.
Остальное — опционально и уезжает в VideoFileExtra.
"""
def __init__(
self,
idx: VideoFileId,
source_id: "RecordingSourceId",
started_at: datetime,
extra: Optional[VideoFileExtra] = None,
) -> None:
self.idx: VideoFileId = idx
self.source_id: RecordingSourceId = source_id
self.started_at: datetime = started_at
self.extra: VideoFileExtra = extra or VideoFileExtra()
Кадр. Неважно, хранится ли он в виде картинки или нет, но у него точно будут следующие параметры: время, идентификатор видеофайла, сами данные в каком-то виде. Это всё, что нам пока нужно для идентификации; остальные атрибуты сейчас роли не играют.
Исходный код (domain/entity/frame.py)
from dataclasses import dataclass
from datetime import datetime
from typing import Any
from domain.entity.video_file import VideoFileIdx
@dataclass(frozen=True)
class FrameData:
"""
Важный value object: «сырые» данные кадра.
Сейчас нам не важно, это байты картинки, JPEG/PNG, numpy-массив
или что-то ещё — главное, что это единый объект данных кадра.
"""
value: Any
class Frame:
"""
Один кадр видеозаписи.
Для идентификации кадра в домене нам сейчас достаточно:
- времени кадра;
- идентификатора видеозаписи;
- самих данных кадра (в каком-то виде).
Остальные атрибуты (номер кадра, размер, формат и т.п.)
считаем деталями реализации и пока в модель не тащим.
"""
def __init__(
self,
video_idx: "VideoFileIdx",
captured_at: datetime,
data: FrameData,
) -> None:
self.video_idx = video_idx
self.captured_at = captured_at
self.data = dataОбъект. Как мы помним, заказчик на данном этапе требует просто находить людей в кадре, но в будущем это может быть что угодно (тележка, телефон в руках у кассира и т.д.). В кадрах мы будем идентифицировать именно объекты: у каждого объекта будет класс, дополнительные признаки (сейчас это только «пол»), а также ссылки на кадры, в которых этот объект был зафиксирован (заказчику ведь неинтересно, как клёво мы умеем работать с одной картинкой — ему нужны агрегированные данные).
Исходный код (domain/entity/detected_object.py)
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from typing import Dict, Any, List, Optional
from uuid import UUID, uuid4
from domain.entity.video_file import VideoFileIdx
@dataclass(frozen=True)
class ObjectIdx:
"""Уникальный идентификатор объекта (человек, тележка, телефон и т.д.)."""
value: UUID
@staticmethod
def new() -> "ObjectIdx":
return ObjectIdx(uuid4())
def __str__(self) -> str:
return str(self.value)
@dataclass(frozen=True)
class ObjectClass:
"""
Класс объекта:
- сейчас нас интересует 'person',
- в будущем: 'cart', 'phone', 'employee', 'thief' и т.д.
"""
value: str # например: "person", "cart", "phone"
@dataclass
class ObjectAttributes:
"""
Дополнительные признаки объекта.
Их мы будем заполнять пока в виде словаря,
но потом мы сможем лучше конкретизировать эти объекты.
Главное, что нам из-за этих изменений не придется переделывать
саму сущность.
"""
meta: Dict[str, Any] = field(default_factory=dict)
@dataclass(frozen=True)
class Coordinate:
"""
Координаты объекта в кадре
"""
x: int
y: int
width: int
height: int
@dataclass(frozen=True)
class FrameRef:
"""
Ссылка на кадр, в котором объект был зафиксирован.
Мы не навязываем отдельный идентификатор кадра, а ссылаемся
через пару (video_idx, captured_at), чего достаточно в домене,
чтобы однозначно указать кадр.
"""
video_idx: "VideoFileIdx"
captured_at: datetime
coordinate: Coordinate
# === Сущность ===
class DetectedObject:
"""
Объект, обнаруженный в видеопотоке.
На уровне домена нас интересует:
- уникальный идентификатор объекта;
- его класс (person/cart/phone/…);
- дополнительные признаки (сейчас — пол);
- список кадров, в которых этот объект встречается
(для дальнейшей агрегации и аналитики).
"""
def __init__(
self,
idx: ObjectIdx,
object_class: ObjectClass,
attributes: Optional[ObjectAttributes] = None,
frames: Optional[List[FrameRef]] = None,
) -> None:
self.idx = idx
self.object_class = object_class
self.attributes = attributes or ObjectAttributes()
self.frames: List[FrameRef] = frames or []Отчет. Результат для заказчика. Конечно, его можно формировать «на лету», но вдруг камеры и нейросети будут работать супербыстро, и мы получим 140 кадров в секунду. Тогда для формирования любого отчёта нам придётся каждый раз пережевывать большой объём данных. Поэтому набросаем, что нам нужно заранее: временные промежутки, вид отчёта (количество людей, половой состав). Ещё не хватает пруфов: заказчику нужно будет доказать работоспособность нашего сервиса, поэтому добавляем ссылки на источники, видеофайлы и объекты.
Исходный код (domain/entity/report.py)
from dataclasses import dataclass, field
from datetime import datetime
from typing import Dict, Any, List
from uuid import UUID, uuid4
from domain.entity.recording_source import RecordingSourceIdx
from domain.entity.video_file import VideoFileIdx
from domain.entity.detected_object import ObjectIdx
@dataclass(frozen=True)
class ReportIdx:
"""Уникальный идентификатор отчёта."""
value: UUID
@staticmethod
def new() -> "ReportIdx":
return ReportIdx(uuid4())
def __str__(self) -> str:
return str(self.value)
@dataclass(frozen=True)
class ReportType:
"""
Тип отчёта:
- сейчас достаточно, например, 'people_count', 'gender_distribution';
- в будущем: 'traffic_heatmap', 'theft_suspects', 'queue_length' и т.д.
"""
value: str
@dataclass(frozen=True)
class TimeRange:
"""Временной промежуток, за который строится отчёт."""
start: datetime
end: datetime
@dataclass
class ReportData:
"""
Агрегированные данные отчёта.
Сейчас оставляем это в виде произвольного словаря,
чтобы не блокировать развитие. Когда домен стабилизируется,
из meta можно будет вытащить отдельные данные
"""
meta: Dict[str, Any] = field(default_factory=dict)
# === Сущность ===
class Report:
"""
Отчёт для заказчика.
"""
def __init__(
self,
idx: ReportIdx,
report_type: ReportType,
time_range: TimeRange,
data: ReportData,
source_idx_list: List["RecordingSourceIdx"],
video_idx_list: List["VideoFileIdx"],
object_idx_list: List["ObjectIdx"],
) -> None:
self.idx = idx
self.report_type = report_type
self.time_range = time_range
self.data = data
self.source_idx_list = list(source_idx_list)
self.video_idx_list = list(video_idx_list)
self.object_idx_list = list(object_idx_list)
Ну всё, вроде бы понятно, ведь у нас есть набор сущностей. Следующий шаг: написать вокруг этого эндпоинты или даже набросать БД, раз всё это уже можно сохранять. Опять нет!
Сначала попробуем выделить всё, что происходит вокруг этих сущностей: какие действия с ними выполняются и какие задачи решают разные части системы. На этом этапе мы будем определять сервисы. А чтобы описать их поведение абстрактно, нам поможет ABC из стандартной библиотеки, позволяющий задавать интерфейсы через абстрактные базовые классы. Начнём:
Сервис сохранения файлов. Задача нетривиальная: можно записывать файл в потоковом режиме, можно сразу копировать его на диск, но это детали — сейчас они нам не нужны. Самое главное — результат: файл сохранён или нет.
Исходный код (domain/service/file_storage.py)
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Optional, BinaryIO
from domain.entity.video_file import VideoFileIdx
@dataclass(frozen=True)
class FileLocation:
"""
Абстрактное местоположение файла.
Это может быть путь на диске, ключ в объектном хранилище,
URL и т.д. Конкретику решит инфраструктура.
"""
value: str
@dataclass(frozen=True)
class FileSaveResult:
"""
Результат сохранения файла.
Главное — удалось или нет. Остальное опционально.
"""
success: bool
location: Optional[FileLocation] = None
error_message: Optional[str] = None
class FileStorage(ABC):
"""
Абстрактный сервис сохранения файлов.
Детали (стриминг, буферизация, ретраи, конкретное хранилище)
остаются за пределами домена. Здесь нас интересует только:
«попробуй сохранить» → FileSaveResult.
"""
@abstractmethod
def save_video_content(
self,
video_idx: "VideoFileIdx",
content: BinaryIO,
) -> FileSaveResult:
"""
Сохранить бинарное содержимое видеозаписи,
связав его с доменной сущностью VideoFile.
Аргумент `content` — абстрактный поток байт:
это может быть открытый файл, сетевой стрим и т.п.
"""
raise NotImplementedErrorСервис нарезки кадров из видео. Опять же, мы не знаем, какие-либо библиотеки существуют для этого, какой размер кадра нам нужен на выходе и т.д. Но зато мы знаем, что в результате работы сервиса мы должны получить исчерпывающие данные для формирования сущности «Кадр».
Исходный код (domain/service/frame_extractor.py)
from abc import ABC, abstractmethod
from typing import Iterable
from domain.entity.video_file import VideoFileIdx
from domain.entity.frame import Frame
class FrameExtractor(ABC):
"""
Абстрактный сервис нарезки кадров из видео.
На уровне домена нас интересует только то, что
на вход подаётся идентификатор видеозаписи, а на выходе
мы получаем набор сущностей Frame, достаточный для
дальнейшей обработки и аналитики.
"""
@abstractmethod
def extract_frames(self, video_idx: "VideoFileIdx") -> Iterable["Frame"]:
"""
Извлечь кадры для указанного видео.
Конкретная реализация сама решает:
- как прочитать файл (локальный диск, object storage и т.п.);
- как именно и с какими параметрами вырезать кадры.
"""
raise NotImplementedErrorСервис обработки кадров. Мы не знаем, какие именно нейросети нужны; возможно, придётся запускать несколько моделей друг за другом. Но мы точно можем сказать, что на выходе должны быть получены списки классов, признаков и координат в кадре. Эти данные, в свою очередь, нужны для создания и обновления сущности «Объект».
Исходный код (domain/service/frame_processor.py)
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import List
from domain.entity.frame import Frame
from domain.entity.detected_object import ObjectClass, ObjectAttributes, Coordinate
@dataclass(frozen=True)
class RawDetection:
"""
Результат работы нейросетей по одному объекту на кадре.
На уровне сервиса обработки кадров нам достаточно:
- класса объекта;
- набора признаков (пока в виде ObjectAttributes);
- координат объекта в кадре.
"""
object_class: "ObjectClass"
attributes: "ObjectAttributes"
coordinate: "Coordinate"
class FrameProcessor(ABC):
"""
Абстрактный сервис обработки кадров нейросетями.
Мы не знаем:
- какие именно нейросети будут использоваться;
- сколько их будет и в каком порядке они запустятся;
- на каком железе это всё будет крутиться.
"""
@abstractmethod
def process_frame(self, frame: "Frame") -> List[RawDetection]:
"""
Обработать кадр и вернуть список найденных объектов
с их классами, признаками и координатами.
"""
raise NotImplementedError
Сервис классификации объектов. Проблема в том, что кадров у нас много, и объекты на кадрах нужно как-то связать. Мы пока не знаем как, но, как подсказывает логика, следует анализировать предыдущие кадры и пытаться понять, нужно ли создавать новый объект или можно обновить существующий.
Исходный код (domain/service/object_classifier.py)
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import List
from domain.entity.frame import Frame
from domain.entity.detected_object import ObjectIdx, FrameRef
from domain.service.frame_processor import RawDetection
@dataclass(frozen=True)
class ObjectClassificationDecision:
"""
Результат классификации одного «сыро обнаруженного» объекта.
На уровне домена нас интересует:
- какой доменной сущности DetectedObject он соответствует
(существующий объект или новый);
- в каком кадре и с какими координатами он был обнаружен;
- исходные данные детекции (класс, признаки, координаты).
"""
existing_object_idx: Optional["ObjectIdx"] # None, если объект новый
frame_ref: "FrameRef"
raw_detection: "RawDetection"
@property
def is_new(self) -> bool:
"""True, если по этой детекции нужно создать новый объект."""
return self.existing_object_idx is None
class ObjectClassifier(ABC):
"""
Абстрактный сервис классификации/сопоставления объектов по кадрам.
Его задача — только принять решение:
- к какому уже существующему ObjectIdx отнести детекцию;
- по каким детекциям нужно создать НОВЫЙ объект.
Как именно это делается (трекеры, эвклидово расстояние, нейросети и т.п.) —
детализация инфраструктуры, домен этого не знает.
"""
@abstractmethod
def classify_objects(
self,
frame: "Frame",
detections: List["RawDetection"],
) -> List[ObjectClassificationDecision]:
"""
На основании набора детекций на кадре решить,
какие из них принадлежат уже существующим объектам,
а для каких потребуется создать новый объект.
"""
raise NotImplementedErrorСервис анализа найденных объектов. Заказчику всё-таки нужно получать аналитические данные, и мы уже определились, что формировать их «по запросу» не стоит. Поэтому нам нужен сервис, который анализирует уникальные объекты и старается добыть данные, необходимые для построения отчёта.
Исходный код (domain/service/object_analyzer.py)
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import Dict, Any, List
from domain.entity.detected_object import DetectedObject, ObjectIdx
from domain.entity.video_file import VideoFileIdx
from domain.entity.recording_source import RecordingSourceIdx
@dataclass
class ObjectAnalyticsResult:
"""
Результат анализа уникальных объектов.
Здесь мы собираем всё, что нужно для дальнейшего построения отчёта:
- агрегированные метрики (meta),
- «пруфы» — ссылки на источники, видео и объекты.
"""
meta: Dict[str, Any] = field(default_factory=dict)
source_idx_list: List["RecordingSourceIdx"] = field(default_factory=list)
video_idx_list: List["VideoFileIdx"] = field(default_factory=list)
object_idx_list: List["ObjectIdx"] = field(default_factory=list)
class ObjectAnalyzer(ABC):
"""
Абстрактный сервис анализа найденных (уже классифицированных) объектов.
Его задача:
- взять набор уникальных объектов (DetectedObject),
- «переварить» их,
- выдать агрегированные данные, которые потом
пойдут в построение доменной сущности отчёта.
ВАЖНО: сервис ничего не знает о том, как будут выглядеть
конкретные отчёты, он лишь добывает сырые аналитические данные
и пруфы.
"""
@abstractmethod
def analyze_objects(
self,
objects: List["DetectedObject"],
) -> ObjectAnalyticsResult:
"""
Проанализировать набор уникальных объектов и вернуть
агрегированную информацию, пригодную для построения отчёта.
"""
raise NotImplementedError
Теперь поговорим о сохранении, извлечении и поиске сущностей. И здесь нас снова выручает модуль abc, с помощью которого мы напишем абстрактный интерфейс для работы с хранилищем (Repository):
Репозиторий источников. Нам нужно всё-таки вести учёт источников и связывать их с видеофайлами.
Исходный код (domain/repository/recording_source_repository.py)
from abc import ABC, abstractmethod
from typing import Optional, List
from domain.entity.recording_source import RecordingSource, RecordingSourceIdx
class RecordingSourceRepository(ABC):
"""
Абстрактный репозиторий источников записи.
Детали хранилища (Postgres, NoSQL, файлики, in-memory)
здесь не определяются.
"""
@abstractmethod
def save(self, source: "RecordingSource") -> None:
"""
Сохранить или обновить источник записи.
"""
raise NotImplementedError
@abstractmethod
def get_by_idx(self, idx: "RecordingSourceIdx") -> Optional["RecordingSource"]:
"""
Получить источник по его доменному идентификатору.
"""
raise NotImplementedError
@abstractmethod
def list_all(self) -> List["RecordingSource"]:
"""
Вернуть все известные источники.
(Пагинация уже как домашнее задание)
"""
raise NotImplementedError
Репозиторий видеофайлов. Должен позволять сохранять важные данные о сущности «Видеофайл», включая её идентификатор, чтобы мы могли легко находить видеофайлы и работать с ними из сервиса сохранения видеофайлов.
Исходный код (domain/repository/video_file_repository.py)
from abc import ABC, abstractmethod
from typing import Optional, List
from domain.entity.video_file import VideoFile, VideoFileIdx
from domain.entity.recording_source import RecordingSourceIdx
class VideoFileRepository(ABC):
"""
Абстрактный репозиторий видеозаписей.
Задачи:
- сохранить/обновить сущность VideoFile;
- уметь по доменному идентификатору VideoFileIdx быстро её найти;
- при необходимости находить все видео, привязанные к источнику.
"""
@abstractmethod
def save(self, video: "VideoFile") -> None:
"""
Сохранить или обновить видеозапись.
"""
raise NotImplementedError
@abstractmethod
def get_by_idx(self, idx: "VideoFileIdx") -> Optional["VideoFile"]:
"""
Найти видеозапись по её доменному идентификатору.
"""
raise NotImplementedError
@abstractmethod
def list_by_source_idx(self, source_idx: "RecordingSourceIdx") -> List["VideoFile"]:
"""
Получить все видеозаписи, пришедшие с указанного источника.
"""
raise NotImplementedError
Репозиторий кадров. ММы хотим сохранять пруфы нашей работы, поэтому не забываем и про хранилище кадров, которые уже были проанализированы. При этом, нужно ли сохранять сами бинарные данные кадров, можно решить позже.
Исходный код (domain/repository/frame_repository.py)
from abc import ABC, abstractmethod
from datetime import datetime
from typing import Optional, List
from domain.entity.frame import Frame
from domain.entity.video_file import VideoFileIdx
class FrameRepository(ABC):
"""
Абстрактный репозиторий кадров.
Задачи:
- сохранять проанализированные кадры как «пруф» работы системы;
- уметь находить кадры по видеозаписи и времени;
- уметь получать набор кадров по видеозаписи и/или диапазону времени.
"""
@abstractmethod
def save(self, frame: "Frame") -> None:
"""
Сохранить или обновить кадр.
"""
raise NotImplementedError
@abstractmethod
def get_by_video_and_time(
self,
video_idx: "VideoFileIdx",
captured_at: datetime,
) -> Optional["Frame"]:
"""
Найти конкретный кадр по видеозаписи и времени съёмки.
Эта пара (video_idx, captured_at).
"""
raise NotImplementedError
@abstractmethod
def list_by_video_idx(
self,
video_idx: "VideoFileIdx",
*,
start_time: Optional[datetime] = None,
end_time: Optional[datetime] = None,
) -> List["Frame"]:
"""
Получить кадры для указанного видео.
"""
raise NotImplementedError
Репозиторий объектов. Нужен набор методов для сохранения объектов и их поиска по различным параметрам, которые могут потребоваться сервисам.
Исходный код (domain/repository/object_repository.py)
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional, List, Dict, Any
from domain.entity.detected_object import DetectedObject, ObjectIdx, ObjectClass
from domain.entity.recording_source import RecordingSourceIdx
from domain.entity.video_file import VideoFileIdx
@dataclass
class ObjectSearchCriteria:
"""
Критерии поиска объектов.
Это абстрактный фильтр, который можно постепенно расширять
по мере появления новых сценариев в сервисах.
"""
object_classes: Optional[List["ObjectClass"]] = None
source_idx: Optional["RecordingSourceIdx"] = None
video_idx: Optional["VideoFileIdx"] = None
# Временной диапазон, в котором объект хоть раз появлялся
appeared_from: Optional[datetime] = None
appeared_to: Optional[datetime] = None
# Фильтрация по признакам (ключи/значения в ObjectAttributes.meta)
attributes: Dict[str, Any] = field(default_factory=dict)
class ObjectRepository(ABC):
"""
Абстрактный репозиторий объектов (DetectedObject).
"""
@abstractmethod
def save(self, obj: "DetectedObject") -> None:
"""
Сохранить или обновить объект.
"""
raise NotImplementedError
@abstractmethod
def get_by_idx(self, idx: "ObjectIdx") -> Optional["DetectedObject"]:
"""
Получить объект по доменному идентификатору.
"""
raise NotImplementedError
@abstractmethod
def search(
self,
criteria: ObjectSearchCriteria,
*,
limit: Optional[int] = None,
offset: Optional[int] = None,
) -> List["DetectedObject"]:
"""
Поиск объектов по набору критериев.
"""
raise NotImplementedError
Репозиторий отчетов. Сохраняем отчёты; при этом мы пока даже не выбираем временные промежутки их хранения — нам важен только результат. Также не забываем про методы извлечения отчётов.
Исходный код (domain/repository/report_repository.py)
from abc import ABC, abstractmethod
from datetime import datetime
from typing import Optional, List
from domain.entity.report import Report, ReportIdx, ReportType
class ReportRepository(ABC):
"""
Абстрактный репозиторий отчётов.
"""
@abstractmethod
def save(self, report: "Report") -> None:
"""
Сохранить или обновить отчёт.
"""
raise NotImplementedError
@abstractmethod
def get_by_idx(self, idx: "ReportIdx") -> Optional["Report"]:
"""
Получить отчёт по его доменному идентификатору.
"""
raise NotImplementedError
@abstractmethod
def list_by_type(
self,
report_type: "ReportType",
*,
start_time: Optional[datetime] = None,
end_time: Optional[datetime] = None,
limit: Optional[int] = None,
offset: Optional[int] = None,
) -> List["Report"]:
"""
Получить отчёты указанного типа.
Временной промежуток задаёт интересующий нас диапазон
по времени, за который построены отчёты.
Реализация сама решает:
- как интерпретировать start_time/end_time относительно
поля TimeRange у отчёта;
- как оптимизировать выборку и пагинацию.
"""
raise NotImplementedError
@abstractmethod
def list_all(
self,
*,
limit: Optional[int] = None,
offset: Optional[int] = None,
) -> List["Report"]:
"""
Вернуть все отчёты (с возможной пагинацией).
"""
raise NotImplementedErrorВообще, сущности хорошо бы не создавать «голыми руками». Поэтому лучше сразу позаботиться о фабриках и не конструировать их напрямую. Да, сначала это кажется лишней работой, но по мере усложнения сущностей вы всё равно к этому придёте: формат входных данных будет меняться. Давайте сразу заложим фабрики для всех ключевых сущностей.
Исходный код (domain/factory/recording_source_factory.py)
from dataclasses import dataclass
from typing import Optional
from domain.entity.recording_source import (
RecordingSource,
RecordingSourceIdx,
RecordingSourceExtra,
)
@dataclass
class RecordingSourceFactory:
"""
Фабрика источников записи.
"""
def create(
self,
extra: Optional["RecordingSourceExtra"] = None,
) -> "RecordingSource":
idx = RecordingSourceIdx.new()
return RecordingSource(
idx=idx,
extra=extra or "RecordingSourceExtra"()
)
Исходный код (domain/factory/video_file_factory.py)
from dataclasses import dataclass
from datetime import datetime
from typing import Optional
from domain.entity.video_file import VideoFile, VideoFileIdx, VideoFileExtra
from domain.entity.recording_source import RecordingSourceIdx
@dataclass
class VideoFileFactory:
"""
Фабрика видеозаписей.
"""
def create(
self,
source_idx: RecordingSourceIdx,
started_at: datetime,
extra: Optional["VideoFileExtra"] = None,
) -> "VideoFile":
video_idx = VideoFileIdx.new()
return VideoFile(
idx=video_idx,
source_idx=source_idx,
started_at=started_at,
extra=extra or VideoFileExtra(),
)
Исходный код (domain/factory/frame_factory.py)
from dataclasses import dataclass
from datetime import datetime
from typing import Any
from domain.entity.frame import Frame, FrameData
from domain.entity.video_file import VideoFileIdx
@dataclass
class FrameFactory:
"""
Фабрика кадров.
"""
def create(
self,
video_idx: VideoFileIdx,
captured_at: datetime,
raw_data: Any,
) -> "Frame":
frame_data = FrameData(value=raw_data)
return Frame(
video_idx=video_idx,
captured_at=captured_at,
data=frame_data,
)Исходный код (domain/factory/detected_object_factory.py)
from dataclasses import dataclass
from typing import Optional
from domain.entity.detected_object import (
DetectedObject,
ObjectIdx,
ObjectClass,
ObjectAttributes,
FrameRef,
)
@dataclass
class DetectedObjectFactory:
"""
Фабрика доменных объектов (DetectedObject).
"""
def create_new(
self,
object_class: ObjectClass,
attributes: Optional[ObjectAttributes] = None,
first_frame: Optional[FrameRef] = None,
) -> DetectedObject:
idx = ObjectIdx.new()
frames = [first_frame] if first_frame is not None else None
return DetectedObject(
idx=idx,
object_class=object_class,
attributes=attributes or ObjectAttributes(),
frames=frames,
)
Исходный код (domain/factory/report_factory.py)
from dataclasses import dataclass
from domain.entity.report import (
Report,
ReportIdx,
ReportType,
TimeRange,
ReportData,
)
from domain.service.object_analyzer import ObjectAnalyticsResult
@dataclass
class ReportFactory:
"""
Фабрика отчётов.
"""
def create_from_analytics(
self,
report_type: ReportType,
time_range: TimeRange,
analytics: ObjectAnalyticsResult,
) -> Report:
idx = ReportIdx.new()
data = ReportData(meta=analytics.meta)
return Report(
idx=idx,
report_type=report_type,
time_range=time_range,
data=data,
source_idx_list=analytics.source_idx_list,
video_idx_list=analytics.video_idx_list,
object_idx_list=analytics.object_idx_list,
)
Вроде всё готово, но наши сущности сейчас — это не просто «мешки с данными», которые сами по себе ничего не делают. Можно, конечно, добавить им простые сеттеры и геттеры, но тогда они останутся просто контейнерами для данных и не будут отражать поведение объектов и их возможности. А нам ведь нужно в будущем поддерживать систему, поэтому лучше сразу думать о том, какие операции должны жить внутри самих сущностей.
Видеоисточник.
domain/entity/video_source.py
class RecordingSource:
# старый код
def update_extra(self, extra: RecordingSourceExtra) -> None:
"""
Обновить дополнительные данные об источнике.
"""
self.extra = extraВидеофайл.
domain/entity/video_file.py
class VideoFile:
# старый код
def mark_ended(self, ended_at: Optional[datetime] = None) -> None:
"""
Пометить видеозапись как завершённую.
"""
new_ended_at = ended_at or datetime.utcnow()
if new_ended_at < self.started_at:
raise ValueError("ended_at cannot be earlier than started_at")
if self.extra.ended_at is not None and new_ended_at <= self.extra.ended_at:
return
self.extra.ended_at = new_ended_at
self.extra.duration_seconds = (new_ended_at - self.started_at).total_seconds()
def update_duration(self, duration_seconds: float) -> None:
"""
Обновить длительность видеозаписи в секундах.
"""
if duration_seconds < 0:
raise ValueError("duration_seconds cannot be negative")
self.extra.duration_seconds = duration_seconds
def update_technical_info(
self,
*,
codec: Optional[str] = None,
frame_width: Optional[int] = None,
frame_height: Optional[int] = None,
meta: Optional[Dict[str, Any]] = None,
) -> None:
"""
Обновить технические параметры видеозаписи.
"""
if frame_width is not None and frame_width <= 0:
raise ValueError("frame_width must be positive")
if frame_height is not None and frame_height <= 0:
raise ValueError("frame_height must be positive")
if codec is not None:
self.extra.codec = codec
if frame_width is not None:
self.extra.frame_width = frame_width
if frame_height is not None:
self.extra.frame_height = frame_height
if meta:
self.extra.meta.update(meta)
def update_extra(self, extra: VideoFileExtra) -> None:
"""
Полностью заменить объект с дополнительными данными.
"""
self.extra = extraКадр
domain/entity/frame.py
class Frame:
# старый код
def update_data(self, data: FrameData) -> None:
"""
Заменить данные кадра.
Смысл:
- кадр мог быть перекодирован,
- к нему могла быть применена анонимизация/маскирование,
- данные могли быть уменьшены (даунскейл, JPEG и т.п.).
Вместо прямого присваивания снаружи, всё изменение «сырых» данных
кадра проходит через этот метод.
"""
self.data = data
def is_in_time_range(self, start: datetime, end: datetime) -> bool:
"""
Проверить, попадает ли кадр во временной интервал [start, end).
"""
return start <= self.captured_at < endОбъект
domain/entity/detected_object.py
class DetectedObject:
# старый код
def add_frame(self, frame_ref: FrameRef) -> None:
"""
Зафиксировать, что объект был замечен ещё в одном кадре.
"""
self.frames.append(frame_ref)
def last_seen_at(self) -> Optional[datetime]:
"""
Вернуть момент времени, когда объект был замечен последним.
"""
if not self.frames:
return None
return max(ref.captured_at for ref in self.frames)
def was_seen_in_range(self, start: datetime, end: datetime) -> bool:
"""
Проверить, встречался ли объект в интервале [start, end).
"""
return any(start <= ref.captured_at < end for ref in self.frames)
def update_attributes(self, meta: Dict[str, Any]) -> None:
"""
Обновить признаки объекта (attributes.meta).
"""
self.attributes.meta.update(meta)
Отчет
domain/entity/report.py
class Report:
# старый код
def covers_moment(self, moment: datetime) -> bool:
"""
Проверить, относится ли указанный момент ко времени этого отчёта.
"""
return self.time_range.start <= moment < self.time_range.end
def overlaps_with(self, start: datetime, end: datetime) -> bool:
"""
Проверить, пересекается ли отчёт с временным интервалом [start, end).
"""
r_start = self.time_range.start
r_end = self.time_range.end
return not (end <= r_start or r_end <= start)
def add_source_idx(self, source_idx: "RecordingSourceIdx") -> None:
"""
Добавить источник в список «пруфов» отчёта.
"""
if source_idx not in self.source_idx_list:
self.source_idx_list.append(source_idx)
def add_video_idx(self, video_idx: "VideoFileIdx") -> None:
"""
Добавить видеозапись в список «пруфов» отчёта.
"""
if video_idx not in self.video_idx_list:
self.video_idx_list.append(video_idx)
def add_object_idx(self, object_idx: "ObjectIdx") -> None:
"""
Добавить объект в список «пруфов» отчёта.
"""
if object_idx not in self.object_idx_list:
self.object_idx_list.append(object_idx)
На данном этапе мы ужасно далеки от создания чего-то работоспособного. Тогда зачем всё это нужно?
Мы уже понимаем, что нам нужно делать, и можем прогнозировать, как все компоненты будут взаимодействовать друг с другом.
У нас сейчас нет привязки к конкретным технологиям, мы даже не приступали к их анализу, но ЗАТО мы уже можем поэтапно разобрать обработку одного видеофайла.
Наша архитектура может быть легко расширена под новые требования, когда заказчик согласится продолжить работу с нами.
Мы можем без труда написать реализации наших абстрактных сервисов и репозиториев, используя встроенные типы данных (
dict,list) или модули для работы с файлами.
Таким образом, у нас появился домен (Domain), который описывает предметную область нашей задачки и является центром всей системы: именно здесь задаются бизнес-правила и определяется поведение объектов. Далее нам предстоит описать, как компоненты домена взаимодействуют друг с другом (слой Application), — но об этом уже в следующей статье (если, конечно, она вам зайдёт).
P.S. Я намеренно полностью или почти опустил такие важные компоненты, как объекты-значения, доменные события, агрегаты, bounded context и т.п. По моему мнению, это тяжеловато даже для многих специалистов, а у нас всё-таки гайд для новичков. Но какие-то моменты постараюсь осветить в следующей статье.
P.P.S. Это моя первая статья, так что не бейте сильно. С удовольствием почитаю ваши пожелания и всё исправлю/дополню. Сразу оговорюсь: кода много, а старые проекты куда-то затерялись, поэтому в кодогенерации не обошлось без помощи GPT. Заранее спасибо за понимание.
enamored_poc
Плохой рекламы не бывает) Спасибо за упоминания!
lackyboye14 Автор
Вам спасибо, что дали вдохновения стартануть)