В современных ЦП очень много ядер. Годами приложения посылали запросы в базы данных параллельно. Если это отчетный запрос ко множеству строк в таблице, он выполняется быстрее, когда задействует несколько ЦП, и в PostgreSQL это возможно, начиная с версии 9.6.


Понадобилось 3 года, чтобы реализовать функцию параллельных запросов — пришлось переписать код на разных этапах выполнения запросов. В PostgreSQL 9.6 появилась инфраструктура для дальнейшего улучшения кода. В последующих версиях и другие типы запросов выполняются параллельно.


Ограничения


  • Не включайте параллельное выполнение, если все ядра уже заняты, иначе другие запросы будут тормозить.
  • Самое главное, параллельная обработка с высокими значениями WORK_MEM задействует много памяти — каждое хэш-соединение или сортировка занимают память в объеме work_mem.
  • Запросы OLTP с низкой задержкой невозможно ускорить параллельным выполнением. А если запрос возвращает одну строку, параллельная обработка его только замедлит.
  • Разработчики любят использовать бенчмарк TPC-H. Может, у вас есть похожие запросы для идеального параллельного выполнения.
  • Только запросы SELECT без предикатной блокировки выполняются параллельно.
  • Иногда правильная индексация лучше последовательного сканирования таблицы в параллельном режиме.
  • Приостановка запросов и курсоры не поддерживаются.
  • Оконные функции и агрегатные функции упорядоченных наборов не параллельны.
  • Вы ничего не выигрываете в рабочей нагрузке ввода-вывода.
  • Параллельных алгоритмов сортировки не бывает. Но запросы с сортировками могут выполняться параллельно в некоторых аспектах.
  • Замените CTE (WITH …) на вложенный SELECT, чтобы включить параллельную обработку.
  • Обертки сторонних данных пока не поддерживают параллельную обработку (а могли бы!)
  • FULL OUTER JOIN не поддерживается.
  • max_rows отключает параллельную обработку.
  • Если в запросе есть функция, не помеченная как PARALLEL SAFE, он будет однопоточным.
  • Уровень изоляции транзакции SERIALIZABLE отключает параллельную обработку.

Тестовая среда


Разработчики PostgreSQL попытались урезать время отклика запросов бенчмарка TPC-H. Загрузите бенчмарк и адаптируйте его к PostgreSQL. Это неофициальное использование бенчмарка TPC-H — не для сравнения баз данных или оборудования.


  1. Загрузите TPC-H_Tools_v2.17.3.zip (или версию поновее) с офсайта TPC.
  2. Переименуйте makefile.suite в Makefile и измените, как описано здесь: https://github.com/tvondra/pg_tpch . Скомпилируйте код командой make.
  3. Сгенерируйте данные: ./dbgen -s 10 создает базу данных на 23 ГБ. Этого хватит, чтобы увидеть разницу в производительности параллельных и непараллельных запросов.
  4. Конвертируйте файлы tbl в csv с for и sed.
  5. Клонируйте репозиторий pg_tpch и скопируйте файлы csv в pg_tpch/dss/data.
  6. Создайте запросы командой qgen.
  7. Загрузите данные в базу командой ./tpch.sh.

Параллельное последовательное сканирование


Оно может быть быстрее не из-за параллельного чтения, а потому что данные разбросаны по многим ядрам ЦП. В современных ОС файлы данных PostgreSQL хорошо кэшируются. С упреждающим чтением можно получить из хранилища блок больше, чем запрашивает демон PG. Поэтому производительность запроса не ограничена вводом-выводом диска. Он потребляет циклы ЦП, чтобы:


  • читать строки по одной со страниц таблицы;
  • сравнивать значения строк и условия WHERE.

Выполним простой запрос select:


tpch=# explain analyze select l_quantity as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day;
QUERY PLAN
--------------------------------------------------------------------------------------------------------------------------
Seq Scan on lineitem (cost=0.00..1964772.00 rows=58856235 width=5) (actual time=0.014..16951.669 rows=58839715 loops=1)
Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)
Rows Removed by Filter: 1146337
Planning Time: 0.203 ms
Execution Time: 19035.100 ms

Последовательный скан дает слишком много строк без агрегации, так что запрос выполняется одним ядром ЦП.


Если добавить SUM(), видно, что два рабочих процесса помогут ускорить запрос:


explain analyze select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day;
QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------------
Finalize Aggregate (cost=1589702.14..1589702.15 rows=1 width=32) (actual time=8553.365..8553.365 rows=1 loops=1)
-> Gather (cost=1589701.91..1589702.12 rows=2 width=32) (actual time=8553.241..8555.067 rows=3 loops=1)
Workers Planned: 2
Workers Launched: 2
-> Partial Aggregate (cost=1588701.91..1588701.92 rows=1 width=32) (actual time=8547.546..8547.546 rows=1 loops=3)
-> Parallel Seq Scan on lineitem (cost=0.00..1527393.33 rows=24523431 width=5) (actual time=0.038..5998.417 rows=19613238 loops=3)
Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)
Rows Removed by Filter: 382112
Planning Time: 0.241 ms
Execution Time: 8555.131 ms

Параллельная агрегация


Нода "Parallel Seq Scan" производит строки для частичной агрегации. Нода "Partial Aggregate" урезает эти строки с помощью SUM(). В конце счетчик SUM из каждого рабочего процесса собирается нодой "Gather".


Итоговый результат рассчитывается нодой "Finalize Aggregate". Если у вас свои функции агрегации, не забудьте пометить их как "parallel safe".


Количество рабочих процессов


Количество рабочих процессов можно увеличить без перезапуска сервера:


alter system set max_parallel_workers_per_gather=4;
select * from pg_reload_conf();

Теперь мы видим 4 воркера в выводе explain:


tpch=# explain analyze select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day;
QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------------
Finalize Aggregate (cost=1440213.58..1440213.59 rows=1 width=32) (actual time=5152.072..5152.072 rows=1 loops=1)
-> Gather (cost=1440213.15..1440213.56 rows=4 width=32) (actual time=5151.807..5153.900 rows=5 loops=1)
Workers Planned: 4
Workers Launched: 4
-> Partial Aggregate (cost=1439213.15..1439213.16 rows=1 width=32) (actual time=5147.238..5147.239 rows=1 loops=5)
-> Parallel Seq Scan on lineitem (cost=0.00..1402428.00 rows=14714059 width=5) (actual time=0.037..3601.882 rows=11767943 loops=5)
Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)
Rows Removed by Filter: 229267
Planning Time: 0.218 ms
Execution Time: 5153.967 ms

Что здесь происходит? Рабочих процессов стало в 2 раза больше, а запрос стал всего в 1,6599 раз быстрее. Расчеты интересные. У нас было 2 рабочих процесса и 1 лидер. После изменения стало 4+1.


Наше максимальное ускорение от параллельной обработки: 5/3 = 1,66(6) раз.


Как это работает?


Процессы


Выполнение запроса всегда начинается с лидирующего процесса. Лидер делает все непараллельное и часть параллельной обработки. Другие процессы, выполняющие те же запросы, называются рабочими процессами. Параллельная обработка использует инфраструктуру динамических фоновых рабочих процессов (с версии 9.4). Раз другие части PostgreSQL используют процессы, а не потоки, запрос с 3 рабочими процессами мог быть в 4 раза быстрее традиционной обработки.


Взаимодействие


Рабочие процессы общаются с лидером через очередь сообщений (на основе общей памяти). У каждого процесса 2 очереди: для ошибок и для кортежей.


Сколько нужно рабочих процессов?


Минимальное ограничение задает параметр max_parallel_workers_per_gather. Потом исполнитель запросов берет рабочие процессы из пула, ограниченного параметром max_parallel_workers size. Последнее ограничение — это max_worker_processes, то есть общее число фоновых процессов.


Если не удалось выделить рабочий процесс, обработка будет однопроцессной.


Планировщик запросов может сократить рабочие процессы в зависимости от размера таблицы или индекса. Для этого есть параметры min_parallel_table_scan_size и min_parallel_index_scan_size.


set min_parallel_table_scan_size='8MB'
8MB table => 1 worker
24MB table => 2 workers
72MB table => 3 workers
x => log(x / min_parallel_table_scan_size) / log(3) + 1 worker

Каждый раз, когда таблица в 3 раза больше, чем min_parallel_(index|table)_scan_size, Postgres добавляет рабочий процесс. Количество рабочих процессов не основано на затратах. Круговая зависимость затрудняет сложные реализации. Вместо этого планировщик использует простые правила.


На практике эти правила не всегда годятся для продакшена, так что можно изменить количество рабочих процессов для конкретной таблицы: ALTER TABLE … SET (parallel_workers = N).


Почему параллельная обработка не используется?


Кроме длинного списка ограничений есть еще проверки затрат:


parallel_setup_cost — чтобы обойтись без параллельной обработки коротких запросов. Этот параметр прикидывает время на подготовку памяти, запуск процесса и начальный обмен данными.


parallel_tuple_cost: общение лидера с рабочими может затягиваться пропорционально количеству кортежей от рабочих процессов. Этот параметр считает затраты на обмен данными.


Соединения вложенных циклов — Nested Loop Join


PostgreSQL 9.6+ может выполнять вложенные циклы параллельно — это простая операция.

explain (costs off) select c_custkey, count(o_orderkey)
                from    customer left outer join orders on
                                c_custkey = o_custkey and o_comment not like '%special%deposits%'
                group by c_custkey;
                                      QUERY PLAN
--------------------------------------------------------------------------------------
 Finalize GroupAggregate
   Group Key: customer.c_custkey
   ->  Gather Merge
         Workers Planned: 4
         ->  Partial GroupAggregate
               Group Key: customer.c_custkey
               ->  Nested Loop Left Join
                     ->  Parallel Index Only Scan using customer_pkey on customer
                     ->  Index Scan using idx_orders_custkey on orders
                           Index Cond: (customer.c_custkey = o_custkey)
                           Filter: ((o_comment)::text !~~ '%special%deposits%'::text)

Сбор происходит на последнем этапе, так что Nested Loop Left Join — это параллельная операция. Parallel Index Only Scan появился только в версии 10. Он работает аналогично параллельному последовательному сканированию. Условие c_custkey = o_custkey считывает один порядок для каждой клиентской строки. Так что оно не параллельно.


Хэш-соединение — Hash Join


Каждый рабочий процесс создает свою хэш-таблицу до PostgreSQL 11. И если этих процессов больше четырех, производительность не повысится. В новой версии хэш-таблица общая. Каждый рабочий процесс может использовать WORK_MEM, чтобы создать хэш-таблицу.


select
        l_shipmode,
        sum(case
                when o_orderpriority = '1-URGENT'
                        or o_orderpriority = '2-HIGH'
                        then 1
                else 0
        end) as high_line_count,
        sum(case
                when o_orderpriority <> '1-URGENT'
                        and o_orderpriority <> '2-HIGH'
                        then 1
                else 0
        end) as low_line_count
from
        orders,
        lineitem
where
        o_orderkey = l_orderkey
        and l_shipmode in ('MAIL', 'AIR')
        and l_commitdate < l_receiptdate
        and l_shipdate < l_commitdate
        and l_receiptdate >= date '1996-01-01'
        and l_receiptdate < date '1996-01-01' + interval '1' year
group by
        l_shipmode
order by
        l_shipmode
LIMIT 1;
                                                                                                                                    QUERY PLAN
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Limit  (cost=1964755.66..1964961.44 rows=1 width=27) (actual time=7579.592..7922.997 rows=1 loops=1)
   ->  Finalize GroupAggregate  (cost=1964755.66..1966196.11 rows=7 width=27) (actual time=7579.590..7579.591 rows=1 loops=1)
         Group Key: lineitem.l_shipmode
         ->  Gather Merge  (cost=1964755.66..1966195.83 rows=28 width=27) (actual time=7559.593..7922.319 rows=6 loops=1)
               Workers Planned: 4
               Workers Launched: 4
               ->  Partial GroupAggregate  (cost=1963755.61..1965192.44 rows=7 width=27) (actual time=7548.103..7564.592 rows=2 loops=5)
                     Group Key: lineitem.l_shipmode
                     ->  Sort  (cost=1963755.61..1963935.20 rows=71838 width=27) (actual time=7530.280..7539.688 rows=62519 loops=5)
                           Sort Key: lineitem.l_shipmode
                           Sort Method: external merge  Disk: 2304kB
                           Worker 0:  Sort Method: external merge  Disk: 2064kB
                           Worker 1:  Sort Method: external merge  Disk: 2384kB
                           Worker 2:  Sort Method: external merge  Disk: 2264kB
                           Worker 3:  Sort Method: external merge  Disk: 2336kB
                           ->  Parallel Hash Join  (cost=382571.01..1957960.99 rows=71838 width=27) (actual time=7036.917..7499.692 rows=62519 loops=5)
                                 Hash Cond: (lineitem.l_orderkey = orders.o_orderkey)
                                 ->  Parallel Seq Scan on lineitem  (cost=0.00..1552386.40 rows=71838 width=19) (actual time=0.583..4901.063 rows=62519 loops=5)
                                       Filter: ((l_shipmode = ANY ('{MAIL,AIR}'::bpchar[])) AND (l_commitdate < l_receiptdate) AND (l_shipdate < l_commitdate) AND (l_receiptdate >= '1996-01-01'::date) AND (l_receiptdate < '1997-01-01 00:00:00'::timestamp without time zone))
                                       Rows Removed by Filter: 11934691
                                 ->  Parallel Hash  (cost=313722.45..313722.45 rows=3750045 width=20) (actual time=2011.518..2011.518 rows=3000000 loops=5)
                                       Buckets: 65536  Batches: 256  Memory Usage: 3840kB
                                       ->  Parallel Seq Scan on orders  (cost=0.00..313722.45 rows=3750045 width=20) (actual time=0.029..995.948 rows=3000000 loops=5)
 Planning Time: 0.977 ms
 Execution Time: 7923.770 ms

Запрос 12 из TPC-H наглядно показывает параллельное хэш-соединение. Каждый рабочий процесс участвует в создании общей хэш-таблицы.


Соединение слиянием — Merge Join


Соединение слиянием непараллельно по своей природе. Не переживайте, если это последний этап запроса, — он все равно может выполняться параллельно.


-- Query 2 from TPC-H
explain (costs off) select s_acctbal, s_name, n_name, p_partkey, p_mfgr, s_address, s_phone, s_comment
from    part, supplier, partsupp, nation, region
where
        p_partkey = ps_partkey
        and s_suppkey = ps_suppkey
        and p_size = 36
        and p_type like '%BRASS'
        and s_nationkey = n_nationkey
        and n_regionkey = r_regionkey
        and r_name = 'AMERICA'
        and ps_supplycost = (
                select
                        min(ps_supplycost)
                from    partsupp, supplier, nation, region
                where
                        p_partkey = ps_partkey
                        and s_suppkey = ps_suppkey
                        and s_nationkey = n_nationkey
                        and n_regionkey = r_regionkey
                        and r_name = 'AMERICA'
        )
order by s_acctbal desc, n_name, s_name, p_partkey
LIMIT 100;
                                                QUERY PLAN
----------------------------------------------------------------------------------------------------------
 Limit
   ->  Sort
         Sort Key: supplier.s_acctbal DESC, nation.n_name, supplier.s_name, part.p_partkey
         ->  Merge Join
               Merge Cond: (part.p_partkey = partsupp.ps_partkey)
               Join Filter: (partsupp.ps_supplycost = (SubPlan 1))
               ->  Gather Merge
                     Workers Planned: 4
                     ->  Parallel Index Scan using <strong>part_pkey</strong> on part
                           Filter: (((p_type)::text ~~ '%BRASS'::text) AND (p_size = 36))
               ->  Materialize
                     ->  Sort
                           Sort Key: partsupp.ps_partkey
                           ->  Nested Loop
                                 ->  Nested Loop
                                       Join Filter: (nation.n_regionkey = region.r_regionkey)
                                       ->  Seq Scan on region
                                             Filter: (r_name = 'AMERICA'::bpchar)
                                       ->  Hash Join
                                             Hash Cond: (supplier.s_nationkey = nation.n_nationkey)
                                             ->  Seq Scan on supplier
                                             ->  Hash
                                                   ->  Seq Scan on nation
                                 ->  Index Scan using idx_partsupp_suppkey on partsupp
                                       Index Cond: (ps_suppkey = supplier.s_suppkey)
               SubPlan 1
                 ->  Aggregate
                       ->  Nested Loop
                             Join Filter: (nation_1.n_regionkey = region_1.r_regionkey)
                             ->  Seq Scan on region region_1
                                   Filter: (r_name = 'AMERICA'::bpchar)
                             ->  Nested Loop
                                   ->  Nested Loop
                                         ->  Index Scan using idx_partsupp_partkey on partsupp partsupp_1
                                               Index Cond: (part.p_partkey = ps_partkey)
                                         ->  Index Scan using supplier_pkey on supplier supplier_1
                                               Index Cond: (s_suppkey = partsupp_1.ps_suppkey)
                                   ->  Index Scan using nation_pkey on nation nation_1
                                         Index Cond: (n_nationkey = supplier_1.s_nationkey)

Нода "Merge Join" находится над "Gather Merge". Так что слияние не использует параллельную обработку. Но нода "Parallel Index Scan" все равно помогает с сегментом part_pkey.


Соединение по секциям


В PostgreSQL 11 соединение по секциям отключено по умолчанию: у него очень затратное планирование. Таблицы со схожим секционированием можно соединять секция за секцией. Так Postgres будет использовать хэш-таблицы поменьше. Каждое соединение секций может быть параллельным.


tpch=# set enable_partitionwise_join=t;
tpch=# explain (costs off) select * from prt1 t1, prt2 t2
where t1.a = t2.b and t1.b = 0 and t2.b between 0 and 10000;
                    QUERY PLAN
---------------------------------------------------
 Append
   ->  Hash Join
         Hash Cond: (t2.b = t1.a)
         ->  Seq Scan on prt2_p1 t2
               Filter: ((b >= 0) AND (b <= 10000))
         ->  Hash
               ->  Seq Scan on prt1_p1 t1
                     Filter: (b = 0)
   ->  Hash Join
         Hash Cond: (t2_1.b = t1_1.a)
         ->  Seq Scan on prt2_p2 t2_1
               Filter: ((b >= 0) AND (b <= 10000))
         ->  Hash
               ->  Seq Scan on prt1_p2 t1_1
                     Filter: (b = 0)
tpch=# set parallel_setup_cost = 1;
tpch=# set parallel_tuple_cost = 0.01;
tpch=# explain (costs off) select * from prt1 t1, prt2 t2
where t1.a = t2.b and t1.b = 0 and t2.b between 0 and 10000;
                        QUERY PLAN
-----------------------------------------------------------
 Gather
   Workers Planned: 4
   ->  Parallel Append
         ->  Parallel Hash Join
               Hash Cond: (t2_1.b = t1_1.a)
               ->  Parallel Seq Scan on prt2_p2 t2_1
                     Filter: ((b >= 0) AND (b <= 10000))
               ->  Parallel Hash
                     ->  Parallel Seq Scan on prt1_p2 t1_1
                           Filter: (b = 0)
         ->  Parallel Hash Join
               Hash Cond: (t2.b = t1.a)
               ->  Parallel Seq Scan on prt2_p1 t2
                     Filter: ((b >= 0) AND (b <= 10000))
               ->  Parallel Hash
                     ->  Parallel Seq Scan on prt1_p1 t1
                           Filter: (b = 0)

Главное, соединение по секциям бывает параллельным, только если эти секции достаточно большие.


Параллельное дополнение — Parallel Append


Parallel Append может использоваться вместо разных блоков в разных рабочих процессах. Обычно это бывает с запросами UNION ALL. Недостаток — меньше параллелизма, ведь каждый рабочий процесс обрабатывает только 1 запрос.


Здесь запущено 2 рабочих процесса, хотя включено 4.


tpch=# explain (costs off) select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day union all select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '2000-12-01' - interval '105' day;
                                           QUERY PLAN
------------------------------------------------------------------------------------------------
 Gather
   Workers Planned: 2
   ->  Parallel Append
         ->  Aggregate
               ->  Seq Scan on lineitem
                     Filter: (l_shipdate <= '2000-08-18 00:00:00'::timestamp without time zone)
         ->  Aggregate
               ->  Seq Scan on lineitem lineitem_1
                     Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)

Самые важные переменные


  • WORK_MEM ограничивает объем памяти для каждого процесса, не только для запросов: work_mem процессы соединения = очень много памяти.
  • max_parallel_workers_per_gather — сколько рабочих процессов исполняющая программа будет использовать для параллельной обработки из плана.
  • max_worker_processes — подстраивает общее число рабочих процессов под число ядер ЦП на сервере.
  • max_parallel_workers — то же, но для параллельных рабочих процессов.

Итоги


Начиная с версии 9.6 параллельная обработка может серьезно улучшить производительность сложных запросов, которые сканируют много строк или индексов. В PostgreSQL 10 параллельная обработка включена по умолчанию. Не забывайте отключать ее на серверах с большой рабочей нагрузкой OLTP. Последовательные сканы или сканы индексов потребляют очень много ресурсов. Если вы не выполняете отчет по всему набору данных, запросы можно сделать производительнее, просто добавив недостающие индексы или используя правильное секционирование.


Ссылки


Комментарии (1)


  1. mspain
    05.04.2019 09:51

    Как активный пользователь монги, недавно обнаружил, что в некоторых вещах слон технически лучше. Создание индексов в монге однопоточная операция. Причём они уже 9 лет соответствующий тикет в жире маринуют.

    И в целом, bulk insert с помощью copy может обогнать монговский insertMany (и там и там счёт на миллионы строчек в сек).

    Нормальную репликацию слону бы.
    Глядишь, через 10 лет можно будет закопать стюардессу монгу обратно.

    А, ещё ORM на базе JSONB был бы бомбой. Жалко, что рынок так не считает.