За последние три года размер данных Notion увеличился в 10 раз из‑за роста количества пользователей и объёмов контента, с которым они работают. Удвоение этого показателя происходило каждые 6–12 месяцев. Нам нужно было справиться со стремительным ростом размеров данных, соответствуя при этом постоянно растущим требованиям, которые выдвигали критически важные сценарии использования наших продуктов и аналитических систем. Особенно это справедливо в применении к новым функциям Notion AI. Для того чтобы решить эти задачи нам нужно было создать озеро данных Notion и обеспечить его масштабирование. Вот как мы это сделали.

Модель данных и рост Notion

Всё, что вы видите в Notion — тексты, изображения, заголовки, списки, строки баз данных, страницы и прочее, несмотря на разные фронтенд‑представления и разное поведение всего этого, в бэкенде моделируется в виде сущности, называемой «блоком». Блоки хранятся в базе данных Postgres, которая обладает подходящими для их хранения структурой, схемой данных, а так же — метаданными, связанными с контентом (здесь можно найти подробности о модели данных Notion).

Everything in Notion is a block, and these blocks are made up of data. Lots and lots of data.
Всё в Notion — это блок, а сами блоки состоят из данных. Из огромного множества данных

Все эти хранящиеся в блоках данные удваиваются каждые 6–12 месяцев. Причина этого — в деятельности пользователей и в создании контента. В начале 2021 года у нас было более 20 миллиардов строк Postgres, хранящих блоки. Эта цифра с тех пор выросла до более чем двухсот миллиардов блоков — до объёма данных, который, даже с учётом сжатия, составляет сотни терабайтов.

Для того чтобы справляться с таким ростом данных, обеспечивая при этом комфортную работу пользователей, мы стратегически расширили нашу инфраструктуру Postgres. А именно — перешли от одного экземпляра Postgres к более сложной схеме, в которой применяется шардинг. Мы начали в 2021 году, применив горизонтальный шардинг базы данных Postgres. У нас получилось 32 физических экземпляра БД, каждый из которых делился на 15 логических шардов. В 2023 году мы продолжили движение в этом направлении, увеличив количество физических экземпляров БД до 96. В каждом из экземпляров имелось пять логических шардов. В результате мы поддерживали, в общей сложности, 480 логических шардов. Это обеспечило нам, в долгосрочной перспективе, масштабируемые возможности по управлению данными и по организации доступа к ним.

Data lake flowchart
Эволюция подхода к управлению данными в Notion

По состоянию на 2021 год Postgres была ядром нашей продакшн‑инфраструктуры. Эта СУБД отвечала за решение всех задач — от обработки трафика онлайн‑пользователей, до оффлайн‑аналитики и нужд машинного обучения. По мере того, как росли требования к обработке онлайн‑ и оффлайн‑данных, мы поняли, что крайне важным для нас было создание отдельной инфраструктуры обработки данных, которая позволила бы обрабатывать оффлайновые данные, не мешая при этом работе с онлайн‑трафиком.

Инфраструктура обработки данных Notion в 2021 году

В 2021 году мы начали движение в сторону выделенной инфраструктуры обработки данных. Сначала мы внедрили конвейер ELT (Extract, Load, Transform — Извлечение, Загрузка, Преобразование). Он использовал сторонний инструмент Fivetran для поглощения данных из Postgres WAL (Write Ahead Log, журнал предзаписи) и их передачи в Snowflake. Мы настроили 480 постоянно работающих коннекторов для 480 шардов. Коннекторы были рассчитаны на запись в такое же количество обычных таблиц Snowflake. Затем мы объединяли эти таблицы в одну большую таблицу для целей аналитики, для формирования отчётов, для нужд машинного обучения.

Data lake image 2
Инфраструктура обработки данных Notion в 2021 году

Проблемы масштабирования

По мере того, как объём данных, хранимых в Postgres, рос, мы столкнулись с несколькими проблемами, касающимися масштабирования нашей системы.

Удобство и простота использования

При таком подходе нужно обеспечить управление 480 коннекторами Fivetran и их мониторинг. К этому добавляется их ресинхронизация при решардинге и обновлении Postgres, а так же — в периоды обслуживания СУБД. Всё это создаёт очень большую дополнительную нагрузку на персонал, который вынужден находиться в состоянии постоянной готовности к решению каких‑либо задач.

Скорость, актуальность данных и стоимость

Поглощение и передача данных в Snowflake стали медленнее и дороже. Произошло это, преимущественно, из‑за уникальной нагрузки, которую Notion создаёт на системы хранения данных. Эта нагрузка характеризуется большим объёмом работ по обновлению данных. Пользователи Notion обновляют существующие блоки (тексты, заголовки, названия, маркированные списки, строки базы данных и прочее) гораздо чаще, чем создают новые. Это приводит к тому, что данные блоков, в основном, подвергаются обновлению. 90% UPSERT‑операций в Notion — это операции обновления блоков. Большинство хранилищ данных, включая Snowflake, оптимизированы под рабочие нагрузки, ориентированные на вставку данных. Это сильно усложняет задачу достойной поддержки такими хранилищами работы с данными блоков.

Поддержка особых сценариев работы с данными

Логика преобразования данных стала сложнее и требовательнее к ресурсам. Для её реализации не хватало возможностей стандартного интерфейса SQL, предлагаемого готовыми решениями для управления данными.

  • Один из важных сценариев работы — это конструирование денормализованных представлений данных блоков Notion для наших ключевых продуктов (например AI и Search). Данные о разрешениях, например, обеспечивают то, что читать или менять блок могут только те пользователи, которым это позволено (в этом материале обсуждается модель разрешений Notion). Но разрешения, связанные с блоками, не хранятся в статическом виде в Postgres — они должны создаваться динамически, посредством ресурсоёмких вычислений, выполняемых при работе алгоритма обхода дерева сущностей Notion.

  • В следующем примере block_1, block_2 и block_3 наследуют разрешения от своих непосредственных родителей (page_3 и page_2) и предков (page_1 и workspace_a). Для того чтобы сформировать данные о разрешениях для каждого из этих блоков, мы должны полностью, до корневого узла (workspace_a) обойти дерево их предков. Это позволит обеспечить полноту данных о разрешениях. При наличии сотен миллиардов блоков, глубина дерева предков которых варьируется от нескольких до десятков уровней, подобные вычисления окажутся весьма ресурсозатратными. Выполнение их в системе, в которой используется Snowflake, просто привело бы к срабатыванию тайм‑аута.

Constructing permission data for each block requires traversing the entire ancestor tree.
Конструирование данных о разрешениях для каждого блока требует обхода всего дерева предков

Все эти проблемы и привели к тому, что мы начали изучать вопрос создания собственного озера данных.

Создание и масштабирование собственного озера данных Notion

Вот цели, которые мы преследовали, собираясь создавать собственное озеро данных:

  • Создание репозитория данных, поддерживающего масштабирование, способного хранить как необработанные, так и обработанные данные.

  • Обеспечение работы быстрого, масштабируемого, удобного, рентабельного механизма поглощения и обработки данных, подходящего для любых рабочих нагрузок, а в особенности — для работы с блоками Notion, данные которых интенсивно обновляются.

  • Подготовка условий для эффективной реализации сценариев использования возможностей Notion (среди которых — AI и Search), требующих доступности денормализованных данных.

Но, хотя наше озеро данных — это большой шаг вперёд — важно прояснить вопрос о том, на решение каких задач оно не рассчитано:

  • Полная замена Snowflake. Мы продолжим пользоваться полезными и удобными возможностями Snowflake и экосистемы этого продукта, применяя его для большинства других рабочих нагрузок. В особенности это касается тех нагрузок, которые интенсивно используют вставку данных и не требуют крупномасштабных обходов деревьев с целью денормализации данных.

  • Полная замена Fivetran. Мы продолжим пользоваться эффективными механизмами Fivetran при работе с таблицами, не требующей их интенсивного обновления, при поглощении маленьких наборов данных, при взаимодействии со сторонними системами и источниками данных.

  • Поддержка онлайновых сценариев использования продукта, которая выдвигает требования к задержкам передачи данных, соответствующие уровню L2, или более строгие. Озеро данных Notion, в основном, будет нацелено на оффлайновые сценарии работы, для которых приемлемы задержки, измеряемые минутами или даже часами.

Общий обзор архитектуры нашего озера данных

С 2022 года мы используем архитектуру озера данных, показанную ниже. Мы поглощаем инкрементально обновлённые данные, направляя их из Postgres в Kafka, используя CDC‑коннекторы Debezium. Далее — мы, для переноса обновлений из Kafka в S3, применяем Apache Hudi — опенсорсный фреймворк для обработки и хранения данных. Затем мы можем преобразовывать, денормализовывать (например — при обходе деревьев и при конструировании разрешений для каждого из блоков) и обогащать эти необработанные данные. Обработанные данные мы снова сохраняем в S3 или в системах более низкого уровня, рассчитанных на приём таких данных. Они могут использоваться при формировании аналитики и отчётов, а так же — в других продуктах, наподобие AI и Search.

Notion’s in-house data lake is built on Debezium CDC connector, Kafka, Hudi, Spark, and S3.
Собственное озеро данных Notion построено на основе CDC-коннекторов Debezium, Kafka, Hudi, Spark и S3

Далее опишем и проиллюстрируем архитектурные принципы и решения, к которым мы пришли после серьёзных исследований, дискуссий и работ по прототипированию будущей системы.

Архитектурное решение №1: выбор репозитория данных и озера данных

Наше первое архитектурное решение заключалось в том, чтобы использовать в роли репозитория данных и озера данных S3. Там планировалось хранить все необработанные и обработанные данные. Все другие хранилища данных решено было считать системами более низкого уровня. Сюда относятся и системы для работы с данными, с которыми взаимодействуют наши продукты. Среди этих систем — ElasticSearch, векторная база данных, хранилище данных типа ключ‑значение. Мы приняли это решение по двум причинам:

  • Оно согласовывалось с технологическим стеком AWS, используемым в Notion. Например — наша база данных Postgres основана на AWS RDS, а возможность этой системы по экспорту в S3 (описанная ниже) позволяет нам легко инициализировать таблицы в S3.

  • Сервис Amazon S3 доказал свою стабильность при хранении больших объёмов данных, продемонстрировал возможности экономичной поддержки различных движков для обработки данных (вроде Spark).

Мы передали в S3 «тяжёлые» операции по поглощению и обработке данных. Мы значительно улучшили масштабируемость, скорость, рентабельность используемых систем обработки данных. Произошло это благодаря тому, что мы поглощаем лишь хорошо очищенные данные, а так же — данные, играющие ключевую роль в обеспечении работы наших продуктов, попадающие в Snowflake и в хранилища, взаимодействующие с Notion.

Архитектурное решение №2: выбор движка для обработки данных

Мы, в качестве основного движка для обработки данных, выбрали Spark. Сделано это из‑за того, что Spark — это опенсорсный фреймворк. Его можно было быстро настроить и убедиться в том, что он соответствует нашим нуждам по преобразованию данных. Spark отличается четырьмя ценными для нас особенностями:

  • Spark, помимо SQL, поддерживает широкий спектр встроенных функций и UDF (User Defined Function, функция, определяемая пользователем). Это позволяет реализовать сложную логику обработки данных наподобие той, о которой мы уже говорили, в виде обхода дерева и денормализации данных блоков.

  • Он предлагает удобный для использования фреймворк PySpark, позволяющий реализовать самые простые сценарии, а так же продвинутый фреймворк Scala Spark, рассчитанный на «тяжёлые» сценарии обработки данных, в которых нужна высокая производительность.

  • Он поддерживает распределённую обработку больших объёмов данных (например — миллиарды блоков и сотни терабайтов). Spark даёт доступ к обширным настройкам, что позволяет нам влиять на такие вещи, как разделение объектов на части, неравномерность распределения данных и выделение ресурсов. Он, кроме того, позволяет нам разбивать сложные задания на небольшие задачи и оптимизировать ресурсы, необходимые для каждой из задач. Это помогает нам в достижении приемлемого времени выполнения задач, не выделяя при этом неоправданно больших ресурсов и не тратя ресурсы впустую.

  • И наконец — опенсорсная природа Spark открывает для нас разные возможности экономии.

Архитектурное решение №3: выбор инкрементального поглощения данных вместо создания дампа снепшота

После того, как мы выбрали хранилище для озера данных и движок для обработки данных, мы исследовали решения, позволяющие поглощать данные Postgres и передавать их в S3. Мы оказались перед двумя вариантами: инкрементальное поглощение изменённых данных и периодическое создание полного снепшота таблиц Postgres. В итоге, основываясь на сравнении производительности и стоимости этих решений, мы остановились на гибридной архитектуре:

  • В процессе обычного функционирования системы мы инкрементально поглощаем и непрерывно загружаем изменённые данные Postgres в S3.

  • В редких случаях мы однократно делаем полный снепшот данных Postgres и инициализируем таблицы в S3.

Инкрементальный подход обеспечивает работу с более свежими данными при более низких затратах и с минимальными задержками (от считанных минут до нескольких часов, что зависит от размера таблицы). Создание полного снепшота и его сброс в S3, с другой стороны, занимает более 10 часов и стоит вдвое дороже. Поэтому мы прибегаем к этому подходу редко, а именно — тогда, когда инициализируем в S3 новые таблицы.

Архитектурное решение №4: организация потокового инкрементального поглощения данных

CDC-коннектор Kafka для Postgres и перемещение данных в Kafka

Мы решили использовать CDC‑коннектор (Change Data Capture — захват изменения данных) Debezium для Kafka, который позволяет публиковать инкрементальные изменения данных Postgres в Kafka. Это похоже на механизм поглощения данных, используемый в Fivetran. Мы, вместе с Kafka, решили использовать и платформу Fivetran из‑за того, что она хорошо масштабируется и легко настраивается, а так же — из‑за её тесной интеграции с уже имеющейся у нас инфраструктурой.

Hudi для Kafka и перемещение данных в S3

Мы рассматривали три замечательных инструмента для перемещения инкрементальных данных из Kafka в S3. Это — Apache Hudi, Apache Iceberg и DataBricks Delta Lake. В итоге мы выбрали Hudi из‑за прекрасной производительности этого решения, которую оно показывает на наших рабочих нагрузках, сильно завязанных на обновление данных. Платформа Hudi, кроме того, привлекла нас своей опенсорсной природой и нативной интеграцией с CDC‑сообщениями Debezium.

Iceberg и Delta Lake, с другой стороны, когда мы рассматривали их в 2022 году, не были оптимизированы для нашей «обновляющей» рабочей нагрузки. В Iceberg, кроме того, не было стандартного решения, позволяющего воспринимать сообщения Debezium. В Delta Lake такое решение было, но эта системе не является опенсорсной. Нам пришлось бы реализовывать собственного Debezium‑потребителя в том случае, если бы мы выбрали одно из этих решений.

Архитектурное решение №5: поглощение необработанных данных до их обработки

И наконец — мы решили отправлять необработанные Postgres‑данные в S3 без их обработки «на лету». Сделано это для того, чтобы организовать единый источник достоверных данных и упростить отладку всего конвейера обработки данных. После того, как необработанные данные попадают в S3, мы занимаемся их преобразованием, денормализацией, обогащением, выполняем какую‑то другую их обработку. Мы храним промежуточные данные, опять же, в S3. А системы более низкого уровня поглощают лишь хорошо очищенные, структурированные данные, и данные, важные для обеспечения работы Notion. Эти данные мы используем для формирования аналитических сведений, для создания отчётов, для нужд наших продуктов.

Масштабирование и использование нашего озера данных

Мы экспериментировали со множеством детально проработанных конфигураций для того чтобы найти подход к решению проблемы масштабирования, связанной с постоянно увеличивающимся объёмом данных Notion. Вот рассказ о том, что мы пробовали, и о том, к чему это привело.

1. CDC-коннектор и настройка Kafka

Мы настроили по одному CDC‑коннектору Debezium на хост Postgres и развернули их в кластере AWS EKS. Из‑за того, что Debezium и EKS — это зрелые системы, и из‑за того, что Kafka отличается хорошей масштабируемостью, нам, в прошедшие два года, понадобилось лишь несколько раз обновить кластеры EKS и Kafka. По состоянию на май 2024 года всё это безупречно работает, обеспечивая обновление строк Postgres на скоростях в десятки мегабайт в секунду.

Мы, кроме того, создали по одному топику Kafka на одну таблицу Postgres, и позволили всем коннекторам, потребляющим данные из 480 шардов, писать в один и тот же топик, предназначенный для каждой из таблиц. Эта конструкция значительно снизила сложность поддержки 480 топиков для каждой из таблиц и упростила операции, производимые на более низком уровне нашей системы, которые заключаются в поглощении данных Hudi и в передаче их в S3. Это очень сильно снизило дополнительную нагрузку на нас, связанную с обеспечением работы системы.

2. Настройка Hudi

Мы использовали Apache Hudi Deltastreamer — утилиту для поглощения данных, основанную на Spark. Она потребляет сообщения Kafka и реплицирует состояние таблицы Postgres в S3. Нам, после нескольких раундов тюнинга производительности, удалось выйти на быстрый и масштабируемый механизм поглощения данных, обеспечивающий актуальность данных. Этот механизм, для большинства таблиц, обеспечивает задержку всего в несколько минут. В случае с самой большой таблицей, таблицей блоков, задержка может составлять несколько часов (смотрите иллюстрацию ниже).

  • Мы используем таблицу стандартного типа Hudi COPY_ON_WRITE с операцией UPSERT, что соответствует нашей рабочей нагрузке, ориентированной на обновление данных.

  • Для того чтобы обеспечить более эффективное управление данными и минимизировать «коэффициент расширения записи» (то есть — количество файлов, обновляемых при запуске пакетного поглощения данных), мы подвергли тонкой настройке три параметра:

    • Данные разделов/шардов используют одни и те же схемы шардов. За это отвечает параметр hoodie.datasource.write.partitionpath.field: db_schema_source_partition. Благодаря этому набор данных S3 разбивается на 480 шардов, которым назначаются имена от shard0001 до shard0480. Это повышает вероятность того, что пакет входящих изменений будет соответствовать одному и тому же набору файлов из одного и того же шарда.

    • Данные сортируются на основании времени последнего обновления (event_lsn), что обеспечивается параметром field: event_lsn. Это сделано по результатам наших наблюдений, которые показали, что более свежие блоки с большей долей вероятности будут обновлены. Это позволяет нам убирать файлы, содержащие лишь устаревшие блоки.

    • В качестве типа индекса выбран фильтр Блума, что сделано с помощью параметра hoodie.index.type: BLOOM. Это позволяет осуществить дальнейшую оптимизацию рабочей нагрузки.

Hudi Deltastreamer setup for the block table.
Настройка Hudi Deltastreamer для таблицы блоков

3. Настройка обработки данных в Spark

Для решения большинства задач по обработке данных мы используем PySpark. Этот фреймворк отличается сравнительно низкой кривой обучаемости, что делает его доступным для множества членов нашей команды. Решая более сложные задачи, такие, как обход дерева и денормализация данных, мы задействуем замечательную производительность Spark в нескольких ключевых областях:

  • Мы извлекаем выгоду из эффективности работы Scala Spark.

  • Мы более эффективно управляем данными, обрабатывая большие и маленькие шарды раздельно (вспомните — мы поддерживаем в S3 одну и ту же схему, соответствующую 480 шардам, чтобы обеспечить согласованность с Postgres). В случае с маленькими шардами — все их данные загружаются в память контейнера задачи Spark ради ускорения обработки этих данных. А при работе с большими шардами, размеры которых превышают ёмкость памяти, применяется временное хранение данных на диске.

  • Мы используем многопоточную и параллельную обработку данных, что позволяет ускорить работу с 480 шардами и помогает оптимизировать время выполнения кода и его эффективность.

4. Настройка инициализации таблиц

Вот как мы инициализируем новые таблицы:

  • Сначала мы настраиваем коннектор Debezium, выполняя поглощение изменений и их передачу в Kafka.

  • Мы, начиная с временной метки t, запускаем задачу AWS RDS по экспорту данных в S3 для сохранения самого свежего снепшота таблиц Postgres в S3. Затем мы создаём задачу Spark для чтения этих данных из S3 и записи их в формате таблицы Hudi.

  • И наконец — мы обеспечиваем захват всех изменений, сделанных в процессе создания снепшота. Делается это путём настройки Deltastreamer на чтение данных из сообщений Kafka начиная с временной метки t. Этот шаг чрезвычайно важен для поддержки полноты и целостности данных.

Благодаря хорошей масштабируемости Spark и Hudi эти три шага обычно выполняются в течение 24 часов. Это позволяет нам выполнять повторную инициализацию таблиц в приемлемое время, подстраиваясь под требования новых таблиц, под обновления Postgres и под операции решардинга.

Выигрыш: меньше денег, больше времени, более мощная инфраструктура для AI

Мы приступили к разработке инфраструктуры нашего озера данных весной 2022 года и завершили их к осени прошлого года. Благодаря присущей этой инфраструктуре масштабируемости, мы смогли непрерывно оптимизировать и расширять EKS‑кластеры Debezium, кластеры Kafka, механизмы Deltastreamer и Spark. Это позволило нам успевать за удвоением данных Notion, происходящим каждые 6–12 месяцев, не прибегая при этом к существенной модернизации наших систем. Всё это дало нам огромную выгоду:

  • Перемещение нескольких больших, очень важных наборов данных Postgres (размер некоторых из них составляет десятки терабайт) в озеро данных дало нам больше миллиона долларов чистой экономии за 2022 год и пропорционально более высокий уровень экономии в 2023 и 2024 годах.

  • При работе с этими наборами данных полное время их поглощения и переноса из Postgres в S3 и в Snowflake уменьшилось. Раньше речь шла о временном промежутке, превышающим 24 часа. Теперь же, в случае с маленькими таблицами, оно уменьшилось до нескольких минут. Для обработки больших таблиц теперь нужно несколько часов. Ресинхронизация, при необходимости, может быть завершена в пределах 24 часов без перегрузки активных баз данных.

  • И что ещё важнее — изменение инфраструктуры дало нам серьёзную экономию, относящуюся к хранилищу данных, к вычислениям, к обеспечению актуальности данных. Причина этой экономии заключается в оптимизации запросов, выполняемых для решения задач формирования аналитических сведений и задач обеспечения работы нашего продукта. Это позволило нам успешно развернуть в 2023–2024 году возможности Notion AI. Ожидайте подробного материала о наших проектах Search и AI Embedding RAG, построенных на базе озера данных!

Нам хотелось бы поблагодарить OneHouse и опенсорсное сообщество Hudi за их прекрасную и своевременную поддержку. Использование опенсорсных решений стало одной из важнейших причин того, что мы смогли развернуть озеро данных всего за несколько месяцев.

По мере того, как растут и развиваются наши нужды, мы продолжаем расширять наше озеро данных, создавая автоматизированные и самообслуживающиеся фреймворки. Они дают возможность большему количеству инженеров конструировать сценарии использования нашего продукта, основанные на данных.

О, а приходите к нам работать? ? ?

Мы в wunderfund.io занимаемся высокочастотной алготорговлей с 2014 года. Высокочастотная торговля — это непрерывное соревнование лучших программистов и математиков всего мира. Присоединившись к нам, вы станете частью этой увлекательной схватки.

Мы предлагаем интересные и сложные задачи по анализу данных и low latency разработке для увлеченных исследователей и программистов. Гибкий график и никакой бюрократии, решения быстро принимаются и воплощаются в жизнь.

Сейчас мы ищем плюсовиков, питонистов, дата-инженеров и мл-рисерчеров.

Присоединяйтесь к нашей команде

Комментарии (1)


  1. IgorAlentyev
    16.07.2024 20:20

    Внушительно на словах, но на деле грузится все очень долго. Из за этого так и не смог перетащить свою бд из трелло в ноушен.