Приветствую всех читателей! Меня зовут Николай Самсонов. Я являюсь руководителем платформы данных на Учи.ру. В своей работе часто сталкиваюсь с ситуацией, когда бизнесу нужны метрики и показатели здесь и сейчас, в то время как автоматизация получения и обработки терабайт данных для их расчета может занимать значительное количество времени. Сделать «здесь и сейчас» можно всегда, но чем дольше продолжается использование данных, тем больше в дальнейшем потребуется ресурсов и времени на оптимизацию при масштабировании нагрузки или внедрении новых источников в хранилище.

Правильный стек, правильная архитектура и правильное видение процесса ELT — залог успешной аналитики, с этим никто не спорит. Но как прийти к ним и как найти баланс между затратами времени на исследование и поддержкой уже сделанного в бесконечном потоке A/B-тестирований, дашбордов, метрик и Ad hoc-запросов для руководства?

Точного ответа у меня нет, но могу рассказать про наш опыт: мы смогли качнуть баланс равновесия между задачами операционными (Run) и связанными с изменениями (Change) в пользу вторых, используя переезд хранилища данных. Если в двух словах, то это будет история о том, как мы прокачали технологии и скиллы людей, которые занимаются построением DWH, в условиях двух переездов хранилища данных за два года. Буду говорить о преимуществах и недостатках такого способа прокачки.

Переезд начинается

До 2022 года весь стек DP работал в сервисах AWS достаточно беспроблемно, но при этом SLA доезда прода был размыт и с ручным подключением дежурного дата-инженера занимал время с 13:00 до 15:00 ежедневно.

Как мы поняли в дальнейшем (и это небольшой спойлер всей истории), мощности AWS позволяли сгладить многие шероховатости и неоптимальность кода. На нашем объеме, а это порядка 300 TB данных с ежедневным приростом в 16 TB, все проблемы были связаны исключительно с этой неоптимальностью и человеческим фактором (последствиями неудачных внедрений).

В 2022 году было принято решение о переводе данных в РФ. Сначала сетевое окружение и виртуальные роутеры просто не были готовы. Когда мы от тестовых ограниченных запусков перешли к полноценному нагрузочному тестированию, то стали постоянно ловить S3 throttling алерты. Затем нам перенастроили сеть, дали более мощную ноду под виртуальный роутер — и все наши Big Data поехали. 

Появились рандомные ошибки при записи файлов в S3-бакеты. При этом, если мы ловили их на Core-таблицах нижнего слоя, то ломались все витрины верхних слоев, и восстановление согласованных данных было возможно только полной перегрузкой прода (которая занимала практически весь следующий день и оставляла бизнес не только без актуальных данных за прошлый день, но и ломала на время фуловые таблицы).

По итогам множества встреч с представителями облачного хранилища и постоянных пересылок многогигабайтных логов мы получили достаточно честный диагноз происходящего от самого провайдера: «Нашли косвенные доказательства того, что наш сервис вернул неполный листинг в момент интенсивной записи в контейнер. Мы храним листинги в реплицированной БД, и такое поведение возможно, если при создании объекта не обновилась одна из реплик, а при чтении листинга запрос оказался именно на ней. Это соответствует гарантиям Eventual Consistency, которые дает наше хранилище, в отличие от Amazon, который гарантирует “строгую” консистентность».

Мы стали размышлять, как справиться с этой проблемой, чтобы не страдал бизнес, и какие действия стоит предпринять в дальнейшем — конечно, помимо переезда в новое облачное хранилище, который все-таки случится чуть позже (не столько из-за вышеуказанных трудностей, сколько по корпоративным соображениям).

Мы начали с того, что проапгрейдили нашу систему data quality, а именно:

  • фреймворк сравнения количества записываемых в моменте файлов Spark с количеством файлов в S3 после записи вместе со связкой ретраев (на практике вышли на достаточность трех);

  • систему ранних алертов, которая позволяла понять, что в данный конкретный день все пропало система ретраев по той или иной причине не дала результат (с дальнейшими действиями на выбор дежурного DE — от точечного перезапуска дагов до остановки прогрузки в CH в состоянии целостности данных в нем на Т-2 и одновременным перезапуском всего прода).

Затем мы сформулировали план по глобальному изменению инженерной архитектуры, чтобы обезопасить бизнес от потерь данных. Он был связан с внедрением одного из S3-коммитеров, которые должны были снизить нагрузку на хранилище при записи, либо должен был осуществить переход от листинга файлов из папки S3 к использованию меты записанных файлов (фактическая реализация с помощью Apache Iceberg). Внедрение такого инструмента, как Iceberg, требует множества изменений, ведь меняется сам подход работы с файлами. Как следствие, требуется перерасчет всех слоев DWH. В дальнейшем в этой статье я опишу, как и с каким эффектом мы его внедрили.

Для решения проблемы потери данных мы пробовали применять несколько различных коммитеров:

  • Дефолтный S3-коммитер. Он медленный и требует строгой консистентности данных. В случае с нашим провайдером он только увеличил время сохранения файлов в S3 и не решил проблему потерь данных.

  • Staging directory-коммитер. С ним запись осуществляется быстрее, чем с дефолтным, но его нельзя использовать в партицированных таблицах. Он значительно сократил время записи файлов по сравнению с дефолтным, но мы продолжали периодически ловить ошибки.

  • Magic-коммитер. Он гораздо бодрее и надежней, однако при его использовании остались проблемы с партициями (их постоянный пересчет и пуш в Hive Metastore в больших таблицах мог занимать до 10–15 минут).

Итак, после использования коммитеров наша проблема потери данных в хранилище, не поддерживающем строгую согласованность, уменьшилась, но полностью не исчезла. Это было связано с тем, что коммитер должен хранить информацию о том, что записывают другие воркеры в S3. И если при команде ls драйвер не видит чего-либо, значит, для него это и не существует.

В итоге частичные проблемы с потерей данных мы испытывали до второго переезда.

Что изменилось в архитектуре DWH при наших переездах

Архитектура хранения данных развивалась в Учи.ру несколько сумбурно: когда в компании еще не было платформы данных, первые аналитики установили древнюю версию AirFlow. Таким образом, вся архитектура состояла из того самого первого подхода (сделать «здесь и сейчас») и последующих итераций на эту же тему.

При переезде мы оставили общий подход хранения данных с разделением его на «холодное хранение» (будь то наши parquet’ы в S3 или не часто используемые Jupyter Notebook) и «горячее хранение» в виде ClickHouse на SSD, на который смотрят BI-дашборды Tableau или техническая учетка А/Б-тестера.

Сам S3-бакет данных в облаке делится на RAW (грузим как есть из источников) и STORAGE (трансформируем и рассчитываем). Также  в STORAGE выделяется слой data mart (из которого load-даги уже производят выгрузку в Clickhouse). Помимо data mart существуют продуктовые «песочницы» — следствие микросервисной архитектуры (созданы для данных, которые нет необходимости мерджить с данными других источников).

Итак, что мы изменили в архитектуре хранения. 

  • Во-первых, в Storage был невыраженный слой плоских данных и вырожденный слой произвольного набора агрегатов. Первому мы дали основной приоритет (сделали условный DDS) с вынесением в него многих плоских витрин из data mart, второй полностью заменили на слой основных переиспользуемых данных — CDM.

  • Во-вторых, мы постарались по максимуму убрать таблицы транзакций из data mart. Сейчас data mart используется только для агрегатов, на которых строятся дашборды в Tableau. Ранее дашборды в основном создавались с помощью Custom SQL на самом сервере Tableau. Мы максимально ввели в продакшен экстракты Tableau и написали фреймворк, чтобы они загружались на сервер по мере готовности витрин, которые для них используются. Это значительно улучшило качество жизни финальных пользователей Tableau (NPS мы, правда, не измеряли, но «спасибо» получали).

  • В-третьих, с помощью Grafana мы поигрались с очередями запуска слоев и распределением дагов между слоями. Итого получилось пять основных слоев DWH: raw, ushi (DDS), dict, cdm, data mart. 

  • Кроме этого, мы привнесли на прод возможность делать версионированные таблицы (Slowly Changing Dimensions).

Что изменилось в стеке DWH при наших переездах

Наши инженеры и DevOPS-инженеры значительно видоизменили и улучшили многие инстансы и инструменты стека.

1. Заменили YARN на K8s

Эта идея была бы осуществлена нами и без переезда — в силу преимуществ, которые дает «кубер» по сравнению с «еще одним менеджером ресурсов». Он позволяет:

  • подключать дополнительные ноды из квоты провайдера при высокой нагрузке с последующим их отключением; 

  • запускать разные версии Spark и Python в одном кластере и обеспечивать беспроблемное совместное использование ресурсов;

  • поддерживать множество фреймворков, а также гибко их настраивать.

Дополнительно нужно сказать, что весь кейс с K8s явился триггером множества других доработок, о которых будет написано ниже.

2. Обновили ClickHouse до последней версии

А также отточили и оптимизировали настройки балансировщика, плюс — подняли небольшой тестовый ClickHouse рядом с основным (как расширенную песочницу для аналитиков и пробных обновлений версий самого CH).

3. Усовершенствовали Zeppelin

Мы сделали две ноды в нод-группе, которая предназначена для Zeppelin (вместо одной), а также установили лимиты на отдельные интерпретаторы, чтобы они не потребляли всю доступную память ноды (ранее при этом периодически страдала работоспособность самого сервера Zeppelin). В данный момент, если даже произойдет подобная ситуация на одной из нод, сервер должен пересоздаться на второй и продолжить работу бесшовно для пользователя этого инстанса.

4. Внедрили OpenLDAP-сервер

У нас достаточно много сервисов для работы с Big Data, и в каждом есть свои отдельные скрипты и файлы конфигурации для добавления новых пользователей. Это показалось нам неудобным: хотелось найти инструмент, который собрал бы всех пользователей, а сервисы могли обращаться к нему за аутентификацией. Удобным казался Google SSO, однако не все приложения его поддерживают (или нужно было доработать его какими-то сторонними решениями). Зато LDAP-интеграция была у всех приложений из коробки. Дело было за малым: написали Ansible, протестировали и внедрили OpenLDAP-сервер.

Также с момента появления LDAP (а с ним пришла на прод и возможность заведения ролей с помощью SQL вместо config-файла) сделали автоматический revoke и grant для учеток аналитиков в ClickHouse в связи с тем, что CH по дефолту не поддерживает acid (кроме экспериментальных функций). Это существенно упростило жизнь аналитиков, которые могли получать все время разные данные днем в момент, когда мы шатали отдельные таблицы на проде по тикетам других аналитиков, а также жизнь дата-инженеров, которые обычно в этом случае писали предупреждающие сообщения в Slack.

5. Задеплоили Vault

Для хранения паролей или секретов в CI/CD мы использовали GitHub Secrets. Однако это не решало проблему хранения секретов вне CI/CD. Мы задеплоили Vault в Kubernetes-кластер и используем Vault-CRD для чтения секретов подами прямо из Vault DB. А для «человеческого» доступа используем Ingress и аутентификацию через токен.

6. Усовершенствовали JupyterLab

После переезда в Kubernetes пользователям JupyterLab часто не хватало выделенного ресурса для хранения данных, а вечно делать ресайз тома не казалось удобным решением. Что хотят пользователи, хранящие гигабайтные архивы и не желающие в гит? Правильно — безразмерное объектное хранилище S3. 

Из коробки JupyterLab не позволял монтировать S3-объекты в корень рабочей директории, поэтому мы взяли драйвер S3 от Yandex, добавили StorageClass и в деплойменте описали новый PV. Теперь пользователи JupyterLab могут хранить свои ноутбуки в папке S3 безразмерно и вечно.

7. Заменили Thrift на Kyuubi

Для работы с данными в S3 первоначально использовался такой инструмент, как Thrift-сервер, поднятый на отдельных виртуалках. Но у него были критические недостатки: медленная скорость работы даже на простых запросах и катастрофическая отказоустойчивость. Запустив Kyuubi, мы получили масштабируемость и возможность автовосстановления сервиса, а также улучшили перфоманс до 3–4 раз на больших запросах.

8. Обновили AirFlow

Заменили старую версию на последнюю, что существенно отразилось как на скорости (таски стали запускаться сразу же, а не через 2–3 минуты), так и на основном изменении de-стека — появились возможности интеграции AF c «кубером». Изменение версии AF (как и его работа внутри K8s) повлекло за собой изменение в написании декораторов и подхода к деплою.

Как мы внедряли Iceberg

Apache Iceberg — это, грубо говоря, плагин к Spark, который позволяет работать с данными в особом формате Apache Iceberg Open Table Format. Его основными особенностями являются отдельное хранение меты и версионность (со своим Rollback и Time Travel). Внутри каждого каталога появляется два подкаталога — дата и метадата, где в последней лежат файлы json и avro, описывающие parquet’ы в первой.

Последнее обстоятельство делает этот инструмент необходимым в случаях, когда требуются быстрые откаты к предыдущему состоянию таблиц. Это важно аналитикам, работающим в Data Lake с A/B-тестированиями на больших объемах данных. Мы в основном используем его, чтобы убедиться, что все наши данные «доедут до цели».

Одновременно хочу заметить, что коммитер, находящийся под капотом в Apache Iceberg, дал значительный прирост скорости. А его мета позволила хранить историю апдейтов и откатов. Из относительных сложностей стоит упомянуть необходимость дополнительных настроек хранения меты (забор актуальных данных из других сервисов) и периодической чистки истории (снэпшотов), это позволяет не раздувать S3 до бесконечности.

При сверке таблиц, загруженных традиционным способом в старое хранилище и с помощью Iceberg, мы заметили, что некоторые наши источники достаточно сильно меняют информацию задним числом. При том, что инструмент по версионированию у нас был разработан, но не получил большой поддержки снизу у непосредственных заказчиков, каждый случай такого расхождения был разобран как отдельный кейс с вариантами решения: обновить, перенести как есть, выявить причину у разработчиков.

Итоги

Итак, подвожу итоги и отвечаю на вопрос, вынесенный в заглавие статьи: почему два переезда — это не «пожар», а уникальный опыт для прокачки стека?

Большинство улучшений в стеке (появление новых инструментов и оптимизация существующих настроек), произведенных нами при переезде, были ответной реакцией на новую среду. Их можно условно разделить на две большие группы:

1. Раз переезжаем, то сразу берем самую новую версию инструмента.

2. Попробовали как раньше, не вышло, начали искать лучший вариант. 

Очень сомневаюсь, что при обычном Run-процессе можно было бы бесшовно и безболезненно поменять столь многое.

Переезд потребовал большего перекоса в сторону Change-задач, а также более долгосрочного и прицельного планирования, чем просто работа по спринтам. Не скажу, что мы полностью перешли на каскадную разработку: скорее, стали использовать разные методологии для разных типов задач. А еще мы неожиданно поняли, что благодаря стоящим перед нами вызовам удалось привлечь специалистов высокого уровня, которым были интересны очень нетривиальные задачи. 

Любой переезд DWH не может быть долгим, ведь это период двойных костов для компании, которая продолжает тратить ресурсы на старый прод, для непрерывного получения актуальной информации в старом хранилище, и на установку и перенос данных в новое хранилище.

Также при одновременном заборе данных из определенных внешних реплик мы были вынуждены выбирать окна и перестраивать процесс, чтобы не привести их к состоянию даунтайма. Когда что-то делаешь второй раз, то уже становится понятно, куда нужно подстелить соломку.

Важно отметить, что при переезде невозможно значительно оптимизировать архитектуру хранения данных. Да, мы сделали все для снижения SLA доезда прода, создали тестовые пространства с расширенными правами для аналитиков и внедрили SCD. Но большинство таблиц нашего хранилища пока далеки от третьей нормальной формы, о Data Vault говорить не приходится. 

Чтобы этого достичь, надо:

  • на время отойти от переездов и изменения стека, чтобы перестраивать (согласно правильным подходам) давно существующие витрины, на которых держится большинство дашбордов;

  • после этого постепенно переключать их со старых витрин на новые. 

То есть, осуществить такой «внутренний переезд». Мы ждем окончания внешнего переезда, чтобы вплотную заняться этим вопросом.


Хотите стать частью Учи.ру? Присоединяйтесь к нашей команде, ведь мы продолжаем развиваться и ищем новых сотрудников.

Комментарии (6)


  1. GTS
    21.10.2023 19:59

    На нашем объеме, а это порядка 300 TB данных с ежедневным приростом в 16 TB,

    Тут нет ошибки?


    1. nickgreenman Автор
      21.10.2023 19:59

      нет, здесь ошибки нет, учитывая то, то ежедневная дельта приведена с учетом фуловых загрузок (тех таблиц, которые часто меняют на источнике данные задним числом и перезагружаются полностью)


      1. DaemonGloom
        21.10.2023 19:59

        Возможно, стоило сказать "ежедневным изменением"? Или у вас действительно через неделю будет не 300, а 412TB?


        1. nickgreenman Автор
          21.10.2023 19:59

          Не, не будет. Размер и "биг датность" любого двх определяется как общим объемом, так и ежедневной дельтой, именно на этом фокус здесь. А статья в принципе о другом.


  1. idcore
    21.10.2023 19:59

    Откуда берутся 300ТБ.данных с приростом 16 ТБ? Кажется есть пространство для оптимизации.


    1. nickgreenman Автор
      21.10.2023 19:59

      Ответил выше.