Последние несколько лет мы в Postgres Professional активно занимаемся разработкой своего решения для горизонтального масштабирования PostgreSQL. Пользователям нужен был простой способ увеличить производительность путем добавления узлов. Традиционно для веба в таких случаях просто брали NoSQL базы или шардировали вручную, позже появились распределенные SQL‑решения с поддержкой ACID‑транзакций. Тем не менее терялась часть возможностей и достоинств PostgreSQL. Корпоративный рынок тяжелых вертикальных решений также сильно ограничен как ценой, так и доступностью. Поэтому исследованиями в области распределенных СУБД в компании занимались еще с 2017 года, а в 2020 началась работа над коммерческим продуктом.
В этой статье я расскажу про технические детали реализации и почему был сделан такой выбор технологий. Опишу, какие направления нам показались преждевременными и их пришлось отложить, а также что мы ожидаем в будущем.
Предыстория шардинга в PostgreSQL
К 2020 году было уже много подходов к шардингу в PostgreSQL. Несколько компаний делали свои продукты, в нашей — тоже велись эксперименты над различными частями распределенной системы. Стоит отметить, что проект одной из целей ставил развитие шардинга непосредственно в ядре PostgreSQL. На тот момент в сообществе не было согласия по поводу того, в какую сторону идти. Создавались огромные дизайн документы вроде таких, велись обсуждения, как делать распределенные транзакции, подходит ли подсистема fdw для соединения узлов и тому подобное.
Уже тогда в компании были написаны первые патчи, реализующие протокол распределенных транзакций на базе физического времени. Был прототип распределенного планировщика, использующий механизмы динамического перешардирования. А эксперименты с логической репликацией для реализации отказоустойчивости показали, что, несмотря на свою гибкость, она является достаточно медленной по сравнению с физической.
Отсутствие консенсуса в сообществе не отменяло того факта, что базовые инструменты для реализации шардинга в PostgreSQL на тот момент у нас были. Причем основа — это секционирование и fdw, активно развивались и были доступны на протяжении нескольких мажорных релизов PostgreSQL. Это сильно отличалось от ситуации, когда начинались такие проекты как Greenplum или Citus — им пришлось многое реализовывать с нуля. Также мы решили фокусироваться на транзакционной нагрузке, для которой PostgreSQL предназначен в первую очередь. Все это позволило начать работу над проектом, а насколько далеко технологии секционирования и fdw позволят зайти, предстояло проверить.
Требования к будущему решению
При разработке распределенной СУБД мы ориентировались на следующий список требований.
Прозрачное горизонтальное масштабирование. При повышении нагрузки на сервер мы хотим, чтобы один сервер мог превращаться в несколько, при этом данные автоматически распределялись по кластеру.
Ориентация на OLTP-нагрузку. Несмотря на всё возрастающий интерес к аналитическим запросам в PostgreSQL, OLTP остаётся самым востребованным видом нагрузки.
Все узлы равнозначные, доступ к кластеру осуществляется через любой узел.
Должна быть реализована отказоустойчивость, чтобы выход из строя одного сервера не приводил к обрушению всего кластера.
Строгие гарантии согласованности операций в кластере. Для конечного пользователя база должна выглядеть как находящаяся на одиночном сервере с поддержкой ACID-транзакций.
Компоненты кластера и архитектура
Сейчас Shardman основан на коде PostgreSQL 14 с дополнительными патчами. Это уже достаточно старый релиз, но мы следили, чтобы актуальная для нас функциональность, которая касается распределенного планирования и выполнения запросов, попадала из свежих версий ванильного постгреса. Забегая вперед, стоит сказать, что мы занимаемся переносом на PostgreSQL 17, на которой должен выйти следующий мажорный релиз. Также мы поддерживаем некоторую функциональность из СУБД Postgres Pro Enterprise, такую как CFS, PTRACK и probackup.
Кроме изменений в самом PostgreSQL были разработаны утилиты, которые реализуют функциональность по управлению кластером и обеспечению отказоустойчивости. На каждом узле кластера работает управляющий демон shardmand, а инструмент командной строки shardmanctl используется для конфигурации. В качестве источника правды о состоянии кластера выступает Etcd.
Кластер Shardman состоит из отдельных шардов, каждый шард хранит и отвечает за свою часть данных. Например, это могут быть части большой распределенной таблицы. В свою очередь каждый шард имеет один ведущий сервер PostgreSQL и произвольное число реплик. Распределение ведущих серверов и реплик по узлам может быть произвольным. Две возможные стратегии размещения ведущих серверов и реплик — это кросс‑репликация, когда ведущие сервера одних шардов размещаются рядом с репликами других, или топология с отдельно стоящими репликами.
Все ведущие сервера шардов связаны между собой специальным транспортом (изображен зеленой стрелкой выше), он обеспечивает быстрый обмен данными при выполнении распределенных запросов. В основе шардирования и выполнения распределенных запросов лежат стандартные механизмы секционирования и postgres fdw (foreign data wrapper). Подключившись к любому ведущему серверу, можно видеть все данные с помощью механизма fdw, который обычно используется для обращения к внешним системам, в частности, и другим серверам PostgreSQL. На рисунке ниже показана распределенная (шардированная) таблица, размещённая на двух шардах. Зелёные поля — это секции, хранящие данные непосредственно на этом шарде, а жёлтые — это fdw‑ссылки, указывающие, что секции с данными находятся на другом шарде.
Замечание: Число секций фиксировано и задается при создании таблицы. Их должно быть не меньше, чем потенциальное число шардов. Имеет смысл создавать больше одной секции на шард, потому что они являются единицей параллельной работы. Но слишком большое число секций будет замедлять планирование и иметь дополнительные накладные расходы при выполнении.
Несмотря на то, что механизмы репликации, секционирования и fdw являются хорошей базой для построения распределенной системы, пользоваться ими в оригинальном варианте чрезвычайно неудобно, скорость большинства операций будет достаточно низкой, а согласованность операций не гарантируется. В результате нам пришлось доработать некоторые подсистемы СУБД, о чем мы подробнее расскажем далее.
Утилиты и отказоустойчивость
Задача развернуть PostgreSQL на десятках узлов в отказоустойчивой конфигурации не является простой. К этому стоит добавить возможность изменения конфигурации кластера на лету (например, добавление или уменьшение числа шардов с перебалансировкой данных или изменение количества реплик), обновления параметров PostgreSQL, создания бэкапа, быстрой загрузки данных или обновление версии СУБД. Этим занимаются утилиты, которые написаны на языке Go и, как было сказано выше, состоят из сервиса shardmand, который запускается на каждом узле кластера и инструмента командной строки shardmanctl, через который можно выполнять любые перечисленные операции.
Сервис также отвечает за обеспечение автоматической отказоустойчивости. Через конфигурацию в etcd он определяет, какие ведущие сервера и какие реплики должны работать на данном узле и в случае необходимости (failover) или по запросу (switchover) переключает роли реплики и ведущего сервера. Для тех, кто знаком со Stolon, проще всего это будет описать так, что shardmand запускает компоненты Stolon в отдельных легковесных нитях (goroutines), не требуя использования дополнительных сервисов. Также Shardman не требует использования промежуточных прокси (stolon‑proxy) или балансировщиков для подключения к кластеру. Клиент указывает адреса всех ведущих серверов и их реплик в строке подключения, а современные драйверы обеспечивают балансировку подключений, а также при повышении реплики до ведущего сервера — автоматическое переключение между ними на стороне клиента. В случае смены ведущего сервера для одного или нескольких шардов, остальные шарды согласовано увидят изменения топологии. Отсутствие промежуточных узлов упрощает обслуживание, снижает задержки и исключает узкие места.
Глобальный DDL
В распределенной системе необходимо уметь согласованно создавать объекты на всех узлах кластера. Такую согласованную группу локальных объектов назовем глобальными объектами. Некоторые глобальные объекты еще можно создать вручную при определенных усилиях, например, шардированные таблицы, а вот глобальную последовательность на весь кластер с высокой скоростью генерации чисел сделать будет достаточно затруднительно. Для создания объектов в SQL существует отдельная группа команд, их называют DDL (Data Definition Language). В некоторых случаях специального синтаксиса или логики для создания глобальных объектов не нужно. Однако, как правило, создание глобальных объектов требует дополнительных операций помимо создания соответствующих локальных объектов на каждом узле кластера. Для определения, что нужно создать именно глобальный объект, а также для описания его дополнительных свойств (например, количества секций в шардированной таблице), часто необходимо использовать расширенный синтаксис DDL.
FDW и секционирование
Шардирование строится на связке секционированных таблиц и fdw. Стандартный планировщик уже поддерживал такие важные техники для распределенной системы как устранение секций (partition pruning) и удаленное выполнение (push down) фильтров, соединений и агрегатов. А вот сама реализация fdw для обращения к внешним серверам PostgreSQL (расширение postgres_fdw), хоть и дает строить распределенную систему, но получается она не очень эффективной. Определенная часть логики postgres_fdw направлена на обеспечение возможности работы между произвольными версиями PostgreSQL, а производительность и согласованность не являются высоким приоритетом. В результате в расширение postgres_fdw пришлось внести много изменений, часть из которых может войти в следующие версии PostgreSQL.
На самом деле расширение postgres_fdw определяет работу сразу нескольких подсистем:
Сетевой транспорт и модель выполнения запросов
Распределенное планирование
Распределенные транзакции
В качестве транспорта используется стандартный клиентский протокол, что также определяет и классическую модель выполнения PostgreSQL: по процессу на клиента. Такая конфигурация в кластере может приводить к большому числу внутрикластерных подключений и процессов, а также к неэффективному использованию сетевого канала и процессора. По этой причине был разработан свой мультиплексирующий транспорт (мультиплексор), а в качестве исполнителей внутренних запросов в кластере на каждом узле используется постоянное число рабочих процессов.
Планировщик тоже работал не совсем так, как хотелось. В shared nothing архитектурах, к которым относится и Shardman, применяется подход, когда максимальное число вычислений стараются делать рядом с данными, а обмен по сети минимизировать. Такое планирование требует хорошей поддержки техник, которые позволяют делать устранение секций и удаленное выполнение. Несмотря на их базовую поддержку, ее было явно недостаточно и пришлось много над этим поработать.
Последнее, что важно — это распределенные транзакции. Можно сказать, что они не поддерживались совсем. При выполнении распределенного запроса через postgres_fdw создавались новые транзакции на участвующих шардах, которые никак не были связаны с основной транзакцией. Нами был реализован протокол распределенных транзакций на базе физического времени и его поддержка была интегрирована на транспортном уровне.
Мультиплексор
Как уже говорилось выше, у postgres_fdw есть ряд недостатков при использовании его для построения кластера из однотипных серверов.
Самое очевидное — это большое количество соединений между узлами кластера. Рассмотрим пример чтения данных из шардированной таблицы. Если запрос требует данные с соседнего шарда, то к нему будет установлено подключение. В худшем случае данные нужны со всех шардов, значит будет установлено столько подключений, сколько шардов в кластере, а общее количество подключений в кластере будет равно N * M, где N — количество шардов, а M ‑количество внешних клиентских подключений. С процессами будет то же самое. Это на больших инсталляциях приводит к неэффективному использованию сетевого канала, конкуренции за ресурсы, большому числу переключений контекста и значительному потреблению памяти.
Попытка использовать клиентский протокол в целом приводила к тому, что передача запросов и получение результатов выполнялись неэффективно. Любой запрос через postgres_fdw требовал минимум пяти команд, передаваемых по сети. Для выполнения запроса необходимо было начать транзакцию, открыть курсор, выполнить запрос, закрыть курсор, завершить транзакцию. Если запрос был коротким, то задержка увеличивалась в несколько раз. Получение результатов запросов к нескольким секциям, лежащим на одном узле, возможно было только последовательно (т.к. работа выполнялась в рамках одной сессии). Сами результаты приходилось конвертировать в текст, а потом обратно, потому что так был устроен клиентский протокол. Что касается поддержки распределенных транзакций, то ее в протоколе предусмотрено не было.
Для решения этих проблем была разработана специальная подсистема, которая включала мультиплексирующий транспорт и новую модель выполнения удаленных запросов на фиксированном числе процессов обработчиков. На каждом шарде работает специальный роутер запросов (процесс мультиплексора), к которому обращаются локальные бэкенды СУБД PostgreSQL через разделяемую память, если им нужно отправить запрос на другие шарды. Процессы‑мультиплексоры разных шардов связаны друг с другом одним подключением. Как только мультиплексор другого шарда получает запрос, то передает его одному из обработчиков через разделяемую память. Ответ передается в обратной последовательности.
Такая схема позволила значительно сократить количество используемых соединений. В общем случае формула для расчёта числа подключений в кластере выглядит как N * (N - 1) + M, где N –количество шардов, а M – количество внешних клиентских подключений. Кроме сокращения числа подключений и процессов, также был оптимизирован сетевой протокол, появилось параллельное выполнение внутренних запросов по числу секций на шарде, а также в протоколе начал передаваться снимок распределенной транзакции, который представляет собой просто 64-битное число, о чем расскажем в следующей части.
Отдельный рассказ мог бы получиться про то, как мы интегрировали новую подсистему. Выше упоминалось, что в компании разрабатывался экспериментальный распределенный планировщик и исполнитель, который использовал механизм custom nodes для интеграции. Несмотря на большие возможности такого подхода он оказался чрезвычайно сложным и было принято решение в качестве интерфейса к планировщику и исполнителю оставить fdw и само расширение postgres_fdw, но заменить используемый протокол и транспорт. Это было тоже не самой простой задачей, особенно в области реализации асинхронного выполнения, которое поддерживает postgres_fdw, но дало нам сразу все его возможности. Мы получили поддержку техники удаленного выполнения (push down) и надежную передачу внутренних запросов через механизм депарсинга частей глобального плана обратно в sql.
Распределенные транзакции и CSN
Для управления конкурентным доступом PostgreSQL использует MVCC модель на базе снимков данных. Каждая транзакция имеет свой снимок, который определяет ее видимость и назначается в момент старта транзакции. В Shardman в качестве такого снимка используется 64-битный счетчик CSN — commit sequence number, который является производным значением от времени и в некотором приближении можно считать временем старта транзакции. Назовем CSN, определяющий снимок транзакции, snapshot CSN. При коммите текущее значение этого счетчика назначается в соответствие локальному идентификатору транзакции (xid). Видимость определяется очень просто, транзакция видит все данные, у которых значение CSN для создавшей их транзакции (xmin) меньше snapshot CSN, а значение CSN удалившей их транзакции (xmax) не задан или больше snapshot CSN.
Проблема в том, что в реальной жизни время на каждом узле течёт немного по‑разному, а распределенный коммит не происходит моментально на всех шардах. Поэтому для реализации атомарности и изоляции (буквы A и I в ACID) необходим специальный протокол, который будет учитывать особенности физического времени.
Назовем координатором транзакции узел, с которого данная транзакция была инициирована клиентским приложением. В начале транзакции именно координатор назначает транзакции snapshot CSN, который потом используется на всех узлах кластера для проверки видимости.
Рассмотрим кластер из трёх шардов, каждый шард на своем узле (со своим временем и счетчиком CSN). На второй узел пришёл клиентский запрос, результатом которого должно стать изменение данных на всех трёх шардах кластера.
В ходе выполнения клиентского запроса с координатора отправляются запросы на изменение данных на двух других шардах. Эти запросы инициируют локальные транзакции, которым независимо присваиваются локальные идентификаторы (xid). При успешном завершении распределенной транзакции необходимо согласованно вычислить CSN — единый для всех шардов номер, который присваивается локальным транзакциям.
Рассмотрим подробнее момент, обозначенный на схеме выше как Commit in progress, и разберем взаимодействие двух узлов.
Запрос через fdw‑подключение меняет данные на первом шарде и у себя, а дальше делает двухфазный коммит, необходимый для обеспечения атомарности изменений. На первой фазе посылается специальное сообщение prepare, в ответ на которое ожидается статус выполнения операции. В Shardman в ответ на prepare дополнительно генерируется и возвращается текущий CSN участника. Координатор, если меняет данные локально, также генерирует CSN. После выполнения prepare и генерации локального CSN транзакция переводится в InDoubt состояние. Затем координатор, в нашем случае второй шард, сравнивает полученные CSN со своим, выбирает максимальный и коммитит транзакцию с этим номером у себя и на остальных шардах. Для этого уже вместе с сообщением commit prepared передается значение максимального CSN на все шарды.
Для обеспечения изоляции уровня repeatable read или snapshot, кроме выбора максимального CSN и двухфазного коммита, необходимы еще два условия:
Если с шарды пытаются читать данные, у которых владелец — транзакция в статусе Indoubt, то читатель будет ждать, пока статус транзакции не разрешится.
Если время начала транзакции на координаторе было в будущем относительно шарда участника, на который она пришла, то требуется подождать, пока на этом шарде время догонит время начала транзакции.
В нераспределенной СУБД возможные аномалии определяются уровнем изоляции транзакций. В распределенных системах возможны дополнительные аномалии. Поэтому для них вводится понятие уровня распределенной консистентности. Аномалии в распределенной системе связаны с отсутствием единого хода часов на узлах и с естественной упорядоченностью операций относительно реального времени.
Простой пример: клиент подключился к узлу 2, у которого часы идут впереди относительно других узлов, зафиксировал на нем транзакцию, а потом успел подключиться к другому узлу с отстающими часами и не смог прочитать свои же данные (т.к. snapshot CSN транзакции получился меньше CSN, соответствующего xmin транзакции, добавившей новые записи). Формально уровень изоляции не нарушен (т.к. такая аномалия не рассматривается для традиционных СУБД), но снимок не включает самые свежие данные! Эта аномалия называется stale reads и возможна в распределенных системах.
Чтобы учитывать расхождение часов на узлах, в Shardman есть параметр csn_commit_delay. Это искусственно вводимая задержка на известный промежуток времени (в наносекундах). В примере ниже мы ждём, что шарды 1 и 2 дойдут до момента времени пять, чтобы транзакция, запущенная в момент 3 и закончившаяся в 4 на втором шарде, для всех других гарантированно оказалась в прошлом. Конечно, разбег по времени лучше минимизировать и следить за синхронизацией. Если его величина будет меньше времени посылки сообщения по сети между шардами, то ожидание не требуется вовсе.
В реальной OLTP-нагрузке распределённых транзакций, меняющих данные сразу на нескольких узлах кластера, скорее лучше избегать. В случае, когда двухфазный коммит не является необходимым, он не будет использован, что позволяет повысить скорость выполнения транзакций.
Одним из дополнительных преимуществ использования CSN в качестве снимков является возможность создания согласованной резервной копии всего кластера в неблокирующем режиме. Для этого в Shardman реализована функция создания так называемой точки синхронизации (syncpoint), которая выводит текущий CSN и позицию LSN внутри каждого шарда, которая соответствует этому CSN. А дальше уже дело техники снять бэкап по полученным LSN или восстановиться на заданную точку синхронизации.
Монитор
Для фиксации распределенных транзакций используется протокол двухфазной фиксации. Во время любой из промежуточных стадий двухфазного коммита может возникнуть сбой по множеству случайных внешних причин (например, сетевые проблемы или отключение питания узла кластера). Специальный процесс СУБД — монитор — работает на каждом шарде и периодически проверяет статус подготовленных на нем транзакций. Может произойти авария на координаторе транзакции, либо одном из участников — во всех случаях монитор доведет дело до конца, как только аварийный узел восстановится.
Кроме разрешения статуса распределенных транзакций, монитор еще занимается обнаружением распределенных дедлоков и синхронизацией самого старого CSN в кластере, на который какая‑либо транзакция может запросить данные. Последнее необходимо для работы очистки отношений БД (vacuum).
Начальная настройка и управление кластером Shardman
Установка кластера происходит следующим образом. В начале настраивается кластер etcd. Рекомендуется это делать на отдельных серверах, поскольку etcd довольно чувствителен к задержкам. После этого осуществляется установка пакетов на узлы кластера Shardman.
Инициализация кластера выполняется посредством утилиты shardmanctl:
shardmanctl config generate — создает шаблон стандартного конфигурационного файла;
shardmanctl init — инициализирует конфигурацию кластера в etcd с использованием указанного конфига;
shardmanctl nodes add — добавляет узлы в кластер Shardman.
По завершению настройки можно использовать команду shardmanctl getconnstr для получения строки подключения к кластеру, которую дальше можно использовать, например, в psql.
Здесь можно ознакомиться с полной документацией для утилиты shardmanctl.
Немного примеров
В Shardman предусмотрено два вида распределенных таблиц: шардированные и глобальные.
Рассмотрим пример создания БД. Эта БД описывает компанию с филиалами в разных городах. Для начала создадим глобальную таблицу со списком городов.
Глобальные таблицы размещаются на всех шардах и содержат одни и те же данные. Соответственно, изменение данных в глобальной таблице на одном шарде приводит к обновлению данных на остальных шардах. Новые города у нас появляются редко, список их конечен и не сильно большой, так что это хороший пример данных для глобальной таблицы.
Механизм репликации глобальных таблиц использует первичный ключ, поэтому необходимо его указать при создании таблицы. Для создания глобальной таблицы используйте конструкцию «WITH (global)».
CREATE TABLE cities (
id int primary key,
...
)
WITH (global);
Теперь переходим к созданию шардированной таблицы. В ней мы будем хранить список организаций.
Шардированные таблицы разбиты на секции по хэшу ключа шардирования, который требуется указать при создании таблицы.
CREATE TABLE orgs (
id int primary key,
town_id int references cities(id)
)
WITH (distributed_by = 'id', num_parts = 3);
В этом примере тоже есть первичный ключ, хотя здесь он не строго обязателен, но главная особенность в том, что мы говорим о связи с глобальной таблицей по полю cities(id). Параметр distibuted_by задаёт ключ шардирования.
При создании шардированной таблицы важное значение играет колокация. Коллокация двух шардированных таблиц позволяет указать, что данные с тем же ключем шардирования должны располагаться на тех же шардах. Это сильно повышает производительность в случае операций соединения этих таблиц с условием равенства ключей шардирования, поскольку операция может полностью выполниться в рамках шарда.
Покажем, как задать колокацию на примере таблицы со списком сотрудников:
CREATE TABLE employees (
id int not null,
orgs_id int references orgs(id),
...
primary key (orgs_id, id)
)
WITH (distributed_by = 'orgs_id', colocate_with = 'orgs', num_parts = 3);
Здесь секции с информацией о сотрудниках располагаются на тех же шардах, что и информация о компании, за которой они числятся. Соответственно, здесь мы указываем, что ключ шардирования у нас составной и секции таблицы должны быть размещены (ключевое слово colocate_with) рядом с соответствующими секциями таблицы orgs.
Замечание: кроме таблиц, еще табличные пространства, последовательности и роли имеют специфическую логику в распределенном случае и требуют указания специальных параметров при создании. Остальные объекты создаются путем распространения одинакового DDL по всему кластеру. Это делается автоматически при установленном гуке shardman.broadcast_ddl либо с помощью вспомогательной функции, которая позволяет любую команду распространить по кластеру.
Если посмотреть на список таблиц в psql, то их будет видно как стандартные секционированные таблицы:
ppostgres=# \d
List of relations
Schema | Name | Type | Owner
--------+-------------------------------+-------------------+--------
public | cities | table | dmitry
public | employees | partitioned table | dmitry
public | employees_0 | table | dmitry
public | employees_1_fdw | foreign table | dmitry
public | employees_2_fdw | foreign table | dmitry
public | orgs | partitioned table | dmitry
public | orgs_0 | table | dmitry
public | orgs_1_fdw | foreign table | dmitry
public | orgs_2_fdw | foreign table | dmitry
public | pgpro_stats_archiver | view | dmitry
public | pgpro_stats_info | view | dmitry
public | pgpro_stats_inval_status | view | dmitry
public | pgpro_stats_metrics | view | dmitry
public | pgpro_stats_sdm_statements | view | dmitry
public | pgpro_stats_sdm_stats_updated | view | dmitry
public | pgpro_stats_statements | view | dmitry
public | pgpro_stats_totals | view | dmitry
public | pgpro_stats_vacuum_database | view | dmitry
public | pgpro_stats_vacuum_indexes | view | dmitry
public | pgpro_stats_vacuum_tables | view | dmitry
(20 rows)
При создании шардированных таблиц был явно указан параметр num_parts = 3, таким образом таблицы будут содержать три секции, наших шардов будет так же три и каждая секция будет располагаться на своем шарде
Выполним следующий запрос, чтобы наглядно посмотреть как выполняется распределенный запрос:
postgres=# explain verbose select count() from employees join orgs on employees.orgs_id=orgs.id join cities on orgs.town_id=cities.id where cities.name = 'Moscow';
------------------------------------------------------------------------------------------------------------------------------------------------
------------------------------------------------------------------------------
Finalize Aggregate (cost=71.66..71.67 rows=1 width=8)
Output: count()
-> Append (cost=69.64..71.66 rows=3 width=8)
-> Partial Aggregate (cost=69.64..69.65 rows=1 width=8)
Output: PARTIAL count()
-> Nested Loop (cost=26.11..69.61 rows=11 width=0)
-> Hash Join (cost=25.95..64.50 rows=11 width=4)
Output: orgs.id
Inner Unique: true
Hash Cond: (orgs.town_id = cities.id)
-> Seq Scan on public.orgs_0 orgs (cost=0.00..32.60 rows=2260 width=8)
Output: orgs.id, orgs.town_id
-> Hash (cost=25.88..25.88 rows=6 width=4)
Output: cities.id
-> Seq Scan on public.cities (cost=0.00..25.88 rows=6 width=4)
Output: cities.id
Filter: (cities.name = 'Moscow'::text)
-> Index Only Scan using employees_0_pkey on public.employees_0 employees (cost=0.15..0.35 rows=11 width=4)
Output: employees.orgs_id
Index Cond: (employees.orgs_id = orgs.id)
-> Async Foreign Scan (cost=0.99..1.00 rows=1 width=8)
Output: (PARTIAL count())
Relations: Aggregate on (((public.orgs_1_fdw orgs_1) INNER JOIN (public.cities)) INNER JOIN (public.employees_1_fdw employees_1))
Remote SQL: SELECT count() FROM ((public.orgs_1 r10 INNER JOIN public.cities r4 ON (((r10.town_id = r4.id)) AND ((r4.name = 'Mos
cow'::text)))) INNER JOIN public.employees_1 r7 ON (((r10.id = r7.orgs_id))))
-> Async Foreign Scan (cost=0.99..1.00 rows=1 width=8)
Output: (PARTIAL count())
Relations: Aggregate on (((public.orgs_2_fdw orgs_2) INNER JOIN (public.cities)) INNER JOIN (public.employees_2_fdw employees_2))
Remote SQL: SELECT count() FROM ((public.orgs_2 r11 INNER JOIN public.cities r4 ON (((r11.town_id = r4.id)) AND ((r4.name = 'Mos
cow'::text)))) INNER JOIN public.employees_2 r8 ON (((r11.id = r8.orgs_id))))
Query Identifier: -4889949602910852807
(29 rows)
Здесь видно, как работает агрегация и соединение по ключу шардирования двух шардированных таблиц и одной глобальной. Запрос распадается на аналогичные асинхронные подзапросы для каждой секции, в том числе удаленные, с финальной агрегацией на координаторе. В плане выполнения запроса в качестве листьев дерева плана мы видим локальное сканирование таблиц orgs_0, cities и employees_0 (для последней выполняется сканирование только индекса (Index Only Scan)). Локальные таблицы соединяются посредством хэш-соединения (Hash Join), а затем с использованием вложенных циклов (Nested Loop). Над соединением выполняется операция частичного вычисления агрегатов ( в данном случае - count()). На что стоит обратить внимание - соединение секций таблицы, расположенных на других серверах, а также частичная агрегация результатов, выполняется удаленно. Мы это видим по полю Relations узла Async Foreign Scan. Также в VERBOSE режиме EXPLAIN мы видим запрос, выполняющийся на удаленном узле. Стоящий над соединениями узел Append собирает результаты соединений секций таблиц. При этом узлы Foreign Scan помечены как асинхронные (Async). Это означает, что узел Append выполняет нижестоящие операции не последовательно, а переключается между ними по мере получения результатов какой-либо из нижестоящих операций. В корне дерева плана мы видим операцию окончательного вычисления результатов агрегации на базе результатов выполнения частичных операций агрегации.
Дальнейшие планы
Одной из наших ближайших технических целей является переход на PostgreSQL 17, а также использование нашего решения для встроенной отказоустойчивости BiHA. Несмотря на то, что мы научились работать со Stolon, полностью и незаметно интегрировали его в наши утилиты, он все равно остается внешней зависимостью в такой критической части как отказоустойчивость системы. Переход на собственные инструменты позволит повысить экспертизу и уровень интеграции в Shardman.
Несомненно мы будем продолжать работать над эффективностью и функциональностью системы, обсуждать идеи с сообществом и предлагать патчи, заниматься развитием шардинга в PostgreSQL.
Комментарии (7)
CrushBy
16.05.2024 15:30А что в такой схеме будет со временными таблицами ? Вот создалось подключение, в нем CREATE TEMPORARY TABLE, на каком сервере она создастся ? Что будет с последующими запросами к ней в этом же подключении ?
fdmitry Автор
16.05.2024 15:30+1Временная таблица будет создана на сервере, к которому установлено подключение, последующие запросы к ней будут работать стандартным образом в рамках сессии. Исключением являются транзакции, которые одновременно меняют и распределенные и временные таблицы. Такая транзакция требует двухфазного коммита, который сейчас не работает с временными таблицы. По той же причине не поддерживаются распределенные временные таблицы.
NitroJunkie
16.05.2024 15:30Ну при хоть сколько сложной транзакции на запись без временных таблиц очень сложно обойтись.
Впрочем масштабирование транзакций записи (то есть master-master) вообще штука такая себе, по двум причинам. а) Теорему САР никто не отменял. б) Тяжело представить систему, во всяком случае со сложными транзакциями (аля ERP), где с современными серверами можно уткнуться в проблему вертикального масштабирования на мастере (предполагая что там только транзакции записи будут, остальные на слейвах). Естественно предполагая что запросы эффективны (то есть все подперто индексами и т.п.).
ЗЫ: А хотя туплю, речь в данном случае же про шардинг, а не зеркалирование. Тут если честно не помню, как теорема САР работает.
breninsul
16.05.2024 15:30не могу представить ситуации когда без временных таблиц не обойтись.
CTE, функции
NitroJunkie
16.05.2024 15:30Ну как сказать, временные таблицы это своего рода локальные переменные. А вы предлагаете их заменить на использование других функций. Мало того что это не производительно, так это ещё тупо неудобно.
vlad4kr7
Как у вас с эффективностью параллельного выполнение запросов?
Если сделать ванильные RO async реплики и FDW partition, то можно SQL аналитические запросы гонять к большим данным за приемлемое время?
Бенчмарки никто не делал вертикальный VS горизонтальный scale?
fdmitry Автор
Для планирования параллельного выполнения используются расширенные техники partitionwise join/aggregate. Можем выполнять параллельно соединения по ключу разбиения шардированных таблиц, произвольное соединение шардированных и глобальных таблиц, частичные сортировки-лимиты, агрегации. Но шаффл не поддерживается (например, параллельное соединение не по ключу шардированных таблиц), что может являться ограничением для аналитических запросов.
Пока поддерживаются только синхронные реплики, в основном для целей отказоустойчивости. На async репликах надо будет решить вопросы распределенной видимости, но идея понятная, да.
У себя, конечно, делали много разных бенчмарков, постараемся в отдельной статье показать результаты.