Привет! Меня зовут Максим Чижов, я уже третий год работаю бэкенд-инженером в Авито. Когда только пришёл в компанию, я столкнулся с проблемой хранения больших объёмов информации. О том, как её решить, расскажу в статье.

Сервисы, которые создаёт наш юнит, работают по классической схеме ETL. Extractor извлекает сырые аналитические данные из внешнего источника, Transformer преобразует их в плоский вид и сохраняет в MongoDB. А Loader загружает трансформированные данные в хранилище Vertica.

Так выглядит схема ETL
Так выглядит схема ETL

Сейчас Авито хранит данные в шестой нормальной форме в Vertica.

Читать также: Vertica+Anchor Modeling = запусти рост своей грибницы

В чём проблема

Сырые данные хранятся в MongoDB. Несмотря на то, что мы должны хранить их не меньше трёх месяцев, их объём постоянно растет из-за добавления новых экстракторов. В какой-то момент прирост данных составил 5 ТБ в неделю. Проблема в том, что наша база данных представляет собой стандартную реплику из трёх баз данных — 1 master и 2 slave. При штатной работе репликация обрабатывала их без проблем. Но однажды replication lag начал зашкаливать из-за проблем с сетью. 

Oplog иногда превышал 20 часов — из-за этого репликация не успевала за новыми данными, и реплика развалилась
Oplog иногда превышал 20 часов — из-за этого репликация не успевала за новыми данными, и реплика развалилась

Единственным вариантом стало пересоздание реплики. В итоге, мы потеряли около 40 ТБ сырых данных — это было больно. Чтобы избежать повторения такой ситуации, мы стали действовать в двух направлениях:

  • Разносили данные по разным репликам.

  • Решили хранить часть данных в архиве, а при необходимости восстанавливать их в Mongo.

Как мы выбирали хранилище

На основании запросов заказчиков и необходимого объёма данных, мы выделили требования к хранилищу:

  • Сырые аналитические данные должны храниться не меньше года.

  • Объём данных хранилища — 800 ТБ.

  • В нём можно быстро восстанавливать данные в горячее хранилище.

  • Данные можно трансформировать также, как если бы они не были заархивированы.

  • Должна быть возможность делать запросы к холодному хранилищу, хотя бы с примитивными фильтрами.

В итоге выбирали между Ceph, Hadoop и обычными файлами. Чтобы сравнивать хранилища было удобнее, мы собрали таблицу:

Преимуществом хранилища Ceph было то, что Авито предоставляет для него интерфейс S3. С ним написано много библиотек, в том числе и на Python. Для наших целей была выбрана библиотека aioboto3

Результат работы — AaaS

Сервис предоставляет две ручки: архивации и восстановления. Под капотом у него работает стандартный архиватор pigz, который позволяет сжимать данные в четыре раза и уменьшать годовой объем данных до 200 ТБ. А еще он умеет работать в несколько потоков. Затем, на основе параметров от экстрактора формируется уникальный путь в хранилище. 

Система работает по такому алгоритму:

  1. Extractor по крону запускает выгрузку из внешнего источника и записывает данные в MongoDB.

  2. После окончания работы Extractor запускает worker архивации, который режет данные на батчи размером 1 ГБ.

  3. Каждый батч асинхронно передаётся в AaaS по websocket as is.

  4. На стороне сервиса архивов данные сжимаются и отправляются в Ceph.

  5. Extractor проставляет метаданные об успешной архивации extract_id.

Так выглядит схема работы всей системы
Так выглядит схема работы всей системы

Кроме этого, каждый день запускается крон, который удаляет устаревшие экстракты из горячего хранилища. Перед этим проверяется, заархивированы ли эти данные. Каждый экстрактор имеет свои настройки по TTL, обычно это 7 дней. В этот момент в метаданные записывается удаление extract_id.

Многие спросят, а почему нельзя было сделать так, чтобы каждый экстрактор сам архивировал данные и отправлял их в Ceph? Потому что для каждого изменения логики архивации пришлось бы обновлять каждый экстрактор — это дорого. Кроме того, другим типам сервисов, например, трансформерам, тоже может потребоваться архивация.

ETL с холодным хранилищем
ETL с холодным хранилищем
Объясню, как происходит разделение данных:
  1. Для каждой коллекции считается средний размер документа в гигабайтах. За это отвечает параметр collstats.avgObjSize.

  2. Исходя из размера батча (по умолчанию это 1 Гб), мы получаем его в записях. 

  3. Если знаем общее количество документов в коллекции для текущего extract_id, получаем количество батчей.

  4. В Redis записывается ключ EXTRACT_ID.batches_info с полученными цифрами. Это важный момент, так как средний размер документа часто меняется, особенно когда коллекция только создана. Если этого не сделать, то при повторной архивации номера записей в батчах будут разные и возникнет путница.

Что произошло с нагрузкой на сеть

Первый рабочий вариант подразумевал, что сервис архивов читал данные напрямую из MongoDB экстракторов. Но от этого варианта быстро отказались по нескольким причинам:

  1. Каждый раз при добавлении нового экстрактора нужно было править сервис архивов, чтобы указать ему путь к коллекциям и другие настройки.

  2. Нарушался один из основных принципов микросервисной архитектуры в Авито —  «1 база — 1 сервис». Это показатель качественной декомпозиции функционала отдельных сервисов.

Чтобы достигнуть нужной чистоты архитектуры, мы пожертвовали нагрузкой на сеть. Давайте посчитаем, как она возросла.

Было → стало
Было → стало
Так сжатие выглядит на графике
Так сжатие выглядит на графике

Сжатие, в среднем, в 4 раза — значит, 4Tx = Rx. Можем посчитать увеличение нагрузки на сеть по формуле:

Ch = (Rx + Rx + Tx) / (Rx + Tx) = 9/5

Проще говоря, нагрузка на сеть увеличилась на 80%. Это оказалось допустимым и не вызвало никаких проблем с перегрузкой сети.

Восстановление данных

Для восстановления данных используется ручка Restore, которая отдает их по websocket. Сначала мы решили, что достаточно будет восстанавливать их в горячее хранилище, а потом проводить над ним нужные операции. Это может быть, например, трансформация архивных данных. Как временное решение это работало неплохо, но оно нарушало принцип «1 база – 1 сервис».

Пользователи злоупотребляли возможностями и восстанавливали данные в продуктовую MongoDB. Чтобы решить эту проблему, мы добавили экстрактору возможность отдавать не только данные, которые ещё не удалены из горячего хранилища, но и архивные из Ceph. 

Схема трансформации данных из архива
Схема трансформации данных из архива

Объясню, что происходит на схеме:

  1. Трансформер идёт за данным в ручку экстрактора через extract_id.

  2. Экстрактор проверяет свои метаданные — архивированы и удалены ли данные этого extract_id.

  3. Если не архивированы и не удалены, то читает их из MongoDB.

  4. Если архивированы и удалены, то читает их из AaaS, который в свою очередь ищет данные в Ceph.

  5. Если же данные удалены из архива — выдаётся сообщение об ошибке.

Трансформер не подозревает о существовании сервиса архивов и может использовать данные за любой период. В случае, когда их нет в горячем хранилище, экстрактор работает как proxy.

Что в итоге

В результате мы получили стабильный размер горячего хранилища, которое держится в районе 20 ТБ. Это комфортное значение для репликации. Но даже если снова возникнет ситуация с критичным replication lag, мы можем просто безболезненно пересоздать реплику.

Теперь мы можем трансформировать данные за любой период хранения в холодном хранилище. Трансформер ничего не знает об архиве и идет напрямую в экстрактор. Такая схема позволяет быстро вносить правки и доставлять данные в Vertica, уменьшая при этом расход ресурсов. 

Размер горячего хранилища
Размер горячего хранилища

Несмотря на подводные камни, начиная от выбора способа хранения холодных дампов и заканчивая бесшовной трансформацией архивных данных, схема получилась довольно простой.

Мы получили чистую архитектуру, но увеличили нагрузку на сеть. Такие жертвы неизбежны, чтобы получить те качества системы, которые мы хотим достичь:

  • Горизонтальная масштабируемость. Можем уменьшать или увеличивать ресурсы AaaS. В нашем случае мы просто меняем количество подов в Kubernetes.

  • Взаимозаменяемость компонентов. Если вдруг вместо Ceph нам захочется использовать другое хранилище, мы можем внести изменения безболезненно для всех экстракторов, и они даже не заметят их.

Предыдущая статья: Trunk Based Development — кто такой и зачем нужен

Комментарии (6)


  1. vconst
    19.09.2022 21:32

    А где можно почитать про железо, которые вы используете для организации холодного, теплого и горячего хранения данных? В блоге заблудиться можно)


    1. fallenstorm Автор
      20.09.2022 11:05

      К сожалению, у меня нет ответа на этот вопрос. Подозреваю, что даже в блоге об этом не написано.


      1. vconst
        20.09.2022 12:36

        Хм. Это секрет или просто не задумывались над такой статьей?
        Интересно было бы почитать, на каком железе строится эта система и как она с ним связана. Какие SSD для горячих данных, какие HDD для холодных, это все живет в специализированном СХД или просто обычные серверы с кучей дисков и тд тп

        Мало вы про железо рассказываете :)


        1. fallenstorm Автор
          20.09.2022 14:01

          Я даже не представляю какое там железо, для меня это просто IP и порт


          1. vconst
            20.09.2022 14:42

            Это понятно. Но если бы редактор, ведущий ваш блог, нашел эксперта «с другой стороны порта» — было бы очень интересно почитать


  1. vconst
    19.09.2022 21:58

    Занятно наблюдать эволюцию по блогам
    В 17 году вы только тестировали Ceph, а сейчас уже выбрали его как основное решение