Дисклеймер
Всем привет! В данной статье я поделюсь своим опытом написания сервиса. Я не являюсь опытным или профессиональным разработчиком, я пишу свой проект и мои решения могут быть не самыми оптимальными. Эта статья состоит в основном из моих решений при написании сервиса, что могут быть не идеальными. Мой путь не является правильным и потому - судите "строго". Так же порекомендую прочитать предыдущие мои статьи.)
Новая архитектура проекта
После совершения множества ошибок, я решил слегка изменить структуру проекта. Для напоминания, как выглядело всё раньше

Как здесь все происходило.
Пользователь приходит на фронтенд и делает свои грязные дела;
Фронтенд кидает запрос по API на сервис-контроллер, который делает запись в бд и складывает файлы в S3, по необходимости, а так же может ставить задачи в бэк (допустим задача на консервацию файла);
Сервис-контроллер по API на backend, где есть все необходимые данные, что за задача, какую информацию брать;
Бэкенд принимает запрос, и пишет в базу данных создавая очередь;
Согласно очереди бэк обрабатывает задачи;
Как только задача завершена, бэк кидает по API в сервис-контроллер результат обработки;
Сервис-контроллер обновляет данные в таблице.
Вроде бы все хорошо и логично, но есть пару НО.
Бэкенд выполняет тяжелые задачи, из-за чего сильно нагружает систему, а асинхронно слушать запросы по API, писать задачи в БД, что очень быстро приводило к тому, что ресурсы заканчивались и задачи терялись.
У сервиса-контроллера появлялись дополнительные API, что мне лично не нравилось.
Да и в целом заводить дополнительную БД чтобы хранить очередь, такое себе. (Лично мне не понравилось данное решение)
Поэтому было решено изменить слегка структуру приложения, чтобы было стабильнее.

Что было изменено и из каких соображений.
Переименование controller-service в middleware-service, далее mid. (Просто так захотелось)
Mid и backend общаются друг с другом по средствам Кафки (Да, лучше бы было использовать rabbitMQ, но хотелось потрогать именно кафку)
Backend кладет свои файлы в свой бакет s3. (Чтобы не переправлять конвертированные файлы в mid, а просто кидать ссылку на файл, да и бэку легче тянуть из своего бакета файлы.)
Вот в целом и все интересные изменения.
Обновление схемы БД
Ну и чтобы было понятно, приведу в пример старую схему БД.


Что не так в старой схеме:
Все задачи связаны с совещанием, да легкий поиск, но в случае создания новой транскрипции, нужно было решать проблему чтобы во всех базах почистить. Получается при удалении конвертации, необходимо было пробежать все остальные таблицы и так же очистить.
Нельзя было отслеживать статус задачи, завершен или ошибка.
ДВЕ БАЗЫ ДАННЫХ, которые нужно было правильно вести, иначе данные могут быть неконсистенты и т.п.
Итак, представляю вам новую схему БД:

Да, по данной схеме сложно что-то понять, но я постараюсь объяснить:
При создании совещания, приложенный файл кладется в S3, и происходит запись в таблицу Conversations
(Все задачи при создании, создают запись в таблице Task и связаны с ней)
Далее при нажатии на кнопку конвертации - создается запись в таблице Task и таблице Convert, Convert ссылается на Conversation
После успешной конвертации запись обновляется и создается новая задача в Task и DIarize, последний ссылается так же на Convert
После успешной диаризации в таблице Segment создаются записи с сегментами и спикерами, так же создаются записи в Conversation_speaker, которая ссылается на Conversations
После создания сегментов, создаются записи в таблице Transcription, которые связаны со своим сегментом.
Далее можно подвести итоги и потом сделать отчет, или сразу сделать отчет, тут уже на свое усмотрение, поэтому каждая из этих таблиц завязана с Conversation
Внедрение кафки, позволило мне избавиться от второй базы, и хранить все данные в одной базе данных. Таблица Task позволяет отслеживать, какие задачи успешны, а какие нет, и где возникла ошибка.
Что было дальше...
Модернизация сервиса на Go
В первую очередь я решил сделать нормальную работу с базой и кафкой, для этого я сделал интерфейс по работе с базой:
func (d *PGDatabase) StartTransaction(ctx context.Context) (pgx.Tx, error) {
tx, err := d.connPool.Begin(ctx)
if err != nil {
return nil, err
}
return tx, nil
}
func (d *PGDatabase) StartNestedTransaction(ctx context.Context, tx pgx.Tx) (pgx.Tx, error) {
return tx.Begin(ctx)
}
func (d *PGDatabase) CommitTransaction(ctx context.Context, tx pgx.Tx) error {
return tx.Commit(ctx)
}
func (d *PGDatabase) RollbackTransactionIfExist(ctx context.Context, tx pgx.Tx) error {
return tx.Rollback(ctx)
}
Простой интерфейс, который в базе PG стартует транзакции, делает commit или rollback.
Далее для работы сервиса сделал некую прокладку, которая стартует транзакцию и завершает её, потому что не хотел каждый раз в сервисе стартовать новую транзакцию, и делать все условия, потому что структура всегда была бы одинаковой
func (d *PGDatabase) WithTx(ctx context.Context, fn func(tx pgx.Tx) error) error {
tx, err := d.StartTransaction(ctx)
if err != nil {
return err
}
if err := fn(tx); err != nil {
d.RollbackTransactionIfExist(ctx, tx)
return err
}
return d.CommitTransaction(ctx, tx)
}
Поэтому в сервисе я использовал простой интерфейс:
type TxManager interface {
WithTx(ctx context.Context, fn func(tx pgx.Tx) error) error
}
И вот простой пример использования:
func (s *ParticipantService) CreateParticipant(
ctx context.Context,
participantPayload models.Participant,
) error {
return s.TxManager.WithTx(ctx, func(tx pgx.Tx) error {
return s.Repo.CreateParticipant(ctx, tx, &participantPayload.Name, participantPayload.Email)
})
}
То есть я внутрь функции по транзакции вкладываю свою новую функцию, которую описываю тут же, по идее, можно было бы разделить, вынеся эту функцию отдельно, но там много параметров передавать нужно было бы.
Далее я решил реализовать общение между mid и back через Kafka, поэтому решил создать контрмера и продюсера.
Ах да, забыл сказать, для адекватной работы решил использовать Proto, что было удобнее, не переписывать структуры миллион раз для разных языков.
У продюсера сделал просто метод по отправке сообщения, которым и отсылал сообщения
func (p *Producer) SendMessage(ctx context.Context, key string, message proto.Message) error {
value, err := proto.Marshal(message)
if err != nil {
return fmt.Errorf("failed to marshal message: %w", err)
}
return p.writer.WriteMessages(ctx, kafka.Message{
Topic: p.topic,
Key: []byte(key),
Value: value,
})
}
Далее консюмер, который постоянно читал сообщения из кафки и делал это в асинхрон:
func (c *Consumer) Start(ctx context.Context) {
go func() {
for {
logger := logger.GetLoggerFromContext(ctx)
m, err := c.reader.ReadMessage(ctx)
if err != nil {
time.Sleep(1 * time.Second)
continue
}
var task messages.WrapperResponse
if err := proto.Unmarshal(m.Value, &task); err != nil {
continue
}
if err := c.handler.HandleTask(ctx, &task); err != nil {
logger.Error("exec: Consumer\ntask handle error", interfaces.LogField{
Key: "error",
Value: err.Error(),
})
}
}
}()
}
HandleTask - функция сервиса которая разбирает сообщения из proto. Соответственно в proto было поле Oneof, которое позволяло определять, какого типа задача пришла и не писать дополнительное поле с типом задачи в proto.
Вот, общение между сервисам работает, так же реализовал все основные задачи и т.п. Так же сделал API, чтобы фронту было куда обращаться.
Вывод
В данной статье я затронул только моменты с обновлением mid сервиса, который позволил мне удобно работать с базой, общаться с бэком и писать задачи в кафку и читать из неё. Я не затронул фронт, потому как его делал чисто в стиле вайб-кодинга, потому что фронт писать я совсем не умею.
В итоге вышло полноценный красивый сервис, который использует интересные технологии, а я получил кучу опыта, которым и хотел поделиться.
Буквально на днях я сделал первый релиз, который можно собрать и запустить. В следующей статье опишу, что нужно для запуска, потому что это 3 разнесенных сервиса, которые требуют БД, кафку и S3, бэк так же требует наличия Ollama, ffmpeg, ключа с hugginface.co, и еще множество подводных камней.)