Предисловие

За последние 2 недели я увидел как минимум четыре российских бигтеха, которые выставляют на большие хакатоны кейсы, связанные с постепенной заменой data-engineer`а. Видимо, бизнес потихоньку осознает, что SQL - это не очень сложно, а написать ETL для перекладки из точки А в точку GreenPlum может даже бездушная машина. Это все очень грустно и печально, и у нас с вами есть несколько путей развития:

  1. Луддизм. Выбрасываем телефоны, выключаем компьютеры из розетки и начинаем выбивать селектики на глиняных табличках. Вариант спорный, как вписаться в стартап - может выгорит, может нет.

  2. Начинаем все резко развиваться, учим новые технологии, дорабатываем наконец GP7. Мы должны обогнать и превзойти машину. Вариант посложнее, может случится, что все станут слишком умными, концепт умного человека потеряется в море умников и машина снова победит.

  3. И вариант №3. Сидим читаем на хабре статью про алгоритмы дистрибуции в Greenplum и смотрим, что будет дальше. Вариант беспроигрышный, как по мне.

Ни на чем не настаиваю, господа и дамы. Выбор всегда за вами.

Введение

Исходя из названия, в данном материале я немного погружусь в замечательный и не всеми любимый Greenplum. Постараюсь докопаться до глубин, а именно, в вопрос "какие алгоритмы использует GreenPlum, чтобы делать свою дистрибуцию и какие есть альтернативы".

Что мы знаем о GreenPlum

Greenplum - это MPP система на базе PostgreSQL, которая нужна, чтобы работать с большими объемами данных и делать OLAP. За счет чего GP хорошо готовит аналитику? Хороший вопрос. За счет MPP архитектуры (и shared-nothing, но об этом сами почитайте). Под капотом у данной системы лежит несколько сегментов (инстансов) реляционной базы данных PostgreSQL, на которые распределяются данные. На каждом сегменте обрабатывается свой кусочек данных, и за счет параллельной обработки небольшого объема на множестве сегментов получается быстрая аналитика. Отсюда мы подходим к понятию дистрибуция.

Что такое дистрибуция?

Дистрибуция(от англ. distribution [распределение]) -- это процесс распределения данных на сегменты.
Допустим, на нашем кластере Greenplum есть 10 сегментов (каждый из них - это отдельный маленький Postgresql), когда мы делаем операцию вставки, например, 1000 записей, GP должен распределить данные и, желательно, сделать это равномерно. То есть на каждом сегменте в идеальном варианте должно оказаться 100 записей.
Что будет если распределить неравномерно? Например, на одном будет 910 записей, а на остальных по 10. В таком случае наш GP потеряет часть своей производительности. Когда мы начнем что-то считать, 9 сегментов обработают по 10 записей и начнут ожидать пока последний обработает 910.

Какие есть политики дистрибуции в GP

  1. Distribution by key(hash distribution)

  2. Random distribution

  3. Replicated distribution

Поговорим про каждую отдельно, а именно о том, какие алгоритмы лежат внутри дистрибуции.

Hash Distribution

Как ясно из названия, это дистрибуция с использованием хэш-функции, она же, дистрибуция по ключу.

CREATE TABLE test_distr_key (a INT, b TEXT)
DISTRIBUTED BY (a); 

Создадим таблицу test_distr_key с дистрибуцией по атрибуту A. Теперь, когда мы вставляем записи в эту таблицу, к каждому значению А применяется хэш-функция, и в зависимости от результата, данные отправляются на один из сегментов. Как обсуждалось ранее, крайне желательно, чтобы записи распределились равномерно.

Критически важно, чтобы при добавлении записей с одинаковыми значениями, они попали на один сегмент.

Одно из ключевых свойства хэш-функции - детерминированность. Для одинаковых входных данных (ключа) хэш-функция всегда должна выдавать одно и то же значение.

То есть:

Сегодня мы сделали:
1.INSERT into test_distr_key(1,'foo')
2.INSERT into test_distr_key(2,'bar')
3.INSERT into test_distr_key(3,'baz')
4.INSERT into test_distr_key(1,'azz')
-------------------------------
Запись номер 1 отправилась на 1 сегмент 
Запись номер 2 отправилась на 2 сегмент 
Запись номер 3 отправилась на 3 сегмент 
Запись номер 4 также отправилась на 1 сегмент 

Завтра мы сделаем 
1.INSERT into test_distr_key(2,'zzz')
2.INSERT into test_distr_key(3,'zzv')
--------------------------------
Мы уверенны, что:   
Запись номер 1  снова отправилась на 2 сегмент 
Запись номер 2 снова отправилась на 3 сегмент 

Если мы сделаем 
SELECT * FROM test_distr_key WHERE a = 2
Система выполнив хэш-функцию к a = 2  должна быть уверенна,
что на 2м сегменте найдет все a = 2 

Для этих условий нам нужен:

a) Ключ или набор ключей с уникальными или почти уникальными значениями для равномерного распределения;

б) Алгоритм, который распределит записи.

Выбор верного ключа на совести разработчика, а вот с алгоритмом давайте разбираться. В Greenplum такой алгоритм называется jump_consistent_hash .
Кто-то спросит почему бы просто не использовать остаток от деления для определения сегмента?

hash(key) mod num_segments

Потому что при изменении num_segments почти все ключи изменят свое местоположение, а перемещать миллионы/миллиарды записей по системе - дело долгое, ресурсоемкое, а самое главное неблагодарное.
Для примера:
>Вариант 1: 500 записей, 10 сегментов

Записи распределились равномерно на 10 сегментов. На первом сегменте у нас записи 10,20 и т.д. Теперь наша компания расширилась, мы добавили 1 сегмент, и вот нам нужно пересчитать и, главное, переместить все записи с учетом, что сегментов уже 11.

Вариант 1: Seg0[10,     20,     30,     40,     50,
    		60,     70,     80,     90,     100 ... ]
    		
Добавили сегмент: Seg0[11,     22,     33,     44,     55,
   			66,     77,     88,     99,     110 ... ]

Такой вариант не очень подходит для системы, одно из преимуществ которой, масштабируемость. Для решения проблемы в 2014 году два умных господина из Гугла предложили очаровательный в своей простоте алгоритм.

jump_consistent_hash(uint64_t key, int32_t num_buckets)
{
	int64_t b = -1, j = 0; 
	while (j < num_buckets)
	{
		b = j;
		key = key * 2862933555777941757ULL + 1;
		j = (b + 1) * ((double)(1LL << 31) / (double)((key >> 33) + 1));
	}
	return b;
}

Вот так его реализация выглядит в коде Greenplum. Как он работает на простом языке:

Представьте, что есть число key (например, хеш строки). Мы хотим назначить ему "корзину" из диапазона [0, N-1].
1. Алгоритм "перепрыгивает" через последовательность потенциальных корзин.
2. Для каждого шага использует псевдослучайное число, вычисленное из ключа.
3. В итоге возвращает номер "корзины", в которой ключ стабильно останется при большинстве изменений числа корзин.

По-моему, выглядит красиво.Преимущества такого подхода:

  • O(log N) по времени(не константа конечно, но тоже довольно быстро);

  • Не требует дополнительной памяти;

  • Минимальные миграции: при увеличении num_buckets с N до N+1 только ~1/(N+1) ключей перемещаются;

  • Равномерность: распределение близко к идеальному;

  • Простота: всего несколько строк кода, не нужны большие таблицы или кольца (как в классическом consistent hashing).

Но несмотря на всю красоту, есть очевидные минусы:

  • Для такого алгоритма все сегменты равны, то есть нельзя задать вес и на слабый сегмент класть меньше записей. Благо, внутренние процессы GP создают сегменты равными;

  • При изменениях (добавлении/удалении узлов) всё же перемещает данные, и больше, чем некоторые конкуренты (например, AnchorHash или Rendezvous), что может вызвать доребалансировку и дополнительную нагрузку.

Какие могли быть альтернативы если не jump_consistent_hash?

  1. Consistent Hashing (Классическое консистентное хеширование)

    хэш-кольцо
    хэш-кольцо

    Представляет ключи и узлы на кольце хеш-пространства. Ключи назначаются ближайшему по часовой стрелке узлу.

    • Позволяет минимально перераспределять ключи при добавлении/удалении узлов, но может страдать от нерегулярного распределения и требует виртуальных узлов для равномерности.

  2. Rendezvous Hashing (Алгоритм с наибольшим случайным весом, HRW)

    • Для каждого ключа считается хеш с каждым узлом, выбирается узел с максимальным значением хеша.

    • Обеспечивает очень равномерное распределение и прост в реализации.

    • Подходит для динамических систем с частыми изменениями.

  3. Maglev Hashing

    • Используется в крупных инфраструктурах (например, Google), эффективно обеспечивает равномерное распределение и высокую скорость вычисления.

    • Работает на основе прореженного списка с минимальными перемещениями данных при изменениях.

И еще, как минимум, пара/тройка вариантов. Вот тут можно почитать дипломную работу какого-то человека с тестами и сравнением. А вот обновленный мною код другого человека , если хочется сравнить алгоритмы самостоятельно.

Если хочется потестировать
#!/usr/bin/env python3
from collections import Counter
from datetime import datetime
import jump
import math
import bisect
import hashlib
import mmh3
from dataclasses import dataclass
import numpy as np
from tabulate import tabulate


def compare_shards(
    title: str, a_items: list[tuple[int, int]], b_items: list[tuple[int, int]]
):
    same_cnt = 0
    for a_i, b_i in zip(a_items, b_items):
        assert a_i[0] == b_i[0]
        same_cnt += 1 if a_i[1] == b_i[1] else 0

    shards_cnt_before = Counter([s for _, s in a_items])
    shards_cnt = Counter([s for _, s in b_items])
    shards = np.array(list(shards_cnt.values()))
    # print(shards)

    overlap = same_cnt / len(a_items) * 100
    return {
        "Название": title,
        "Процент оставшихся на месте ключей": overlap,
        "Сегменты (до)": len(shards_cnt_before),
        "Сегменты (после)": len(shards_cnt),
        "Записей на сегмент": shards.mean(),
        "Записей на сегмент(стандартное отклонение)": shards.std(),
    }


def naive_modulo(data: list[int], shards: int) -> list[tuple[int, int]]:
    return [(i, i % shards) for i in data]


def jumping_hashing(data: list[int], shards: int) -> list[tuple[int, int]]:
    return [(i, jump.hash(i, shards)) for i in data]


def fibonacci_hash(i: int, shards: int):
    return (i * int(2654435769)) >> (32 - int(math.log2(shards)))


def fibonacci_hash_integer(x: int, M: int, w=32):
    A = (5**0.5 - 1) / 2
    K = int(A * (1 << w))  # Scale A to an integer constant
    hashed_value = (x * K) % (1 << w)
    return hashed_value % M


def fibonacci_hashing(data: list[int], shards: int) -> list[tuple[int, int]]:
    return [(i, fibonacci_hash_integer(i, shards)) for i in data]


class ConsistentHashRing:
    def __init__(self, nodes=None, replicas=100):
        self.replicas = replicas  # Number of virtual nodes per bucket
        self.ring = dict()
        self._sorted_keys = []
        if nodes:
            for node in nodes:
                self.add_bucket(node)

    def _hash(self, key):
        h = hashlib.md5(key.encode("utf-8")).hexdigest()
        return int(h, 16)

    def add_bucket(self, node):
        for i in range(self.replicas):
            virtual_node_key = f"{node}:{i}"
            hash_value = self._hash(virtual_node_key)
            self.ring[hash_value] = node
            bisect.insort(self._sorted_keys, hash_value)

    def remove_bucket(self, node):
        for i in range(self.replicas):
            virtual_node_key = f"{node}:{i}"
            hash_value = self._hash(virtual_node_key)
            del self.ring[hash_value]
            index = bisect.bisect_left(self._sorted_keys, hash_value)
            del self._sorted_keys[index]

    def get_bucket(self, key):
        if not self.ring:
            return None
        hash_value = self._hash(key)
        index = bisect.bisect_right(self._sorted_keys, hash_value)
        if index == len(self._sorted_keys):
            index = 0
        return self.ring[self._sorted_keys[index]]


def consistent_hashing(data: list[int], shards: int, replicas) -> list[tuple[int, int]]:
    ring = ConsistentHashRing(range(shards), replicas=replicas)
    return [(i, int(ring.get_bucket(f"item-{i}"))) for i in data]


def hash_to_unit_interval(s: str) -> float:
    """Hashes a string onto the unit interval (0, 1]"""
    return (mmh3.hash128(s) + 1) / 2**128


@dataclass
class Node:
    """Class representing a node that is assigned keys as part of a weighted rendezvous hash."""

    name: str
    weight: float

    def compute_weighted_score(self, key: str):
        score = hash_to_unit_interval(f"{self.name}: {key}")
        log_score = 1.0 / -math.log(score)
        return self.weight * log_score


def determine_responsible_node(nodes: list[Node], key: str):
    """Determines which node of a set of nodes of various weights is responsible for the provided key."""
    return max(nodes, key=lambda node: node.compute_weighted_score(key), default=None)


def rendezvous_hashing(data: list[int], shards: int) -> list[tuple[int, int]]:
    nodes_map = {f"node-{i}": (i, Node(f"node-{i}", 100)) for i in range(shards)}
    nodes = [n for _, n in nodes_map.values()]
    return [
        (i, nodes_map[determine_responsible_node(nodes, f"item-{i}").name][0])
        for i in data
    ]


class MaglevHash:
    def __init__(self, nodes):
        self.nodes = nodes
        self.node_count = len(nodes)
        self.permutation_table = []
        self.lookup_table = []
        self.generate_lookup_table()

    def generate_lookup_table(self):
        # Size of lookup table should be a prime number much larger than node count
        self.table_size = self.find_next_prime(self.node_count * 100)
        
        # Generate permutation table for each node
        self.permutation_table = []
        for node in self.nodes:
            offset = self.hash1(node) % self.table_size
            skip = self.hash2(node) % (self.table_size - 1) + 1
            self.permutation_table.append((offset, skip))
        
        # Generate lookup table
        self.lookup_table = [None] * self.table_size
        next_node = [0] * self.node_count
        filled = 0
        
        while filled < self.table_size:
            for node_idx in range(self.node_count):
                offset, skip = self.permutation_table[node_idx]
                pos = (offset + skip * next_node[node_idx]) % self.table_size
                
                while self.lookup_table[pos] is not None:
                    next_node[node_idx] += 1
                    pos = (offset + skip * next_node[node_idx]) % self.table_size
                
                self.lookup_table[pos] = node_idx
                next_node[node_idx] += 1
                filled += 1
                
                if filled == self.table_size:
                    break

    def hash1(self, key):
        return int(hashlib.md5(key.encode('utf-8')).hexdigest(), 16)

    def hash2(self, key):
        return int(hashlib.sha256(key.encode('utf-8')).hexdigest(), 16)

    def find_next_prime(self, n):
        def is_prime(num):
            if num < 2:
                return False
            for i in range(2, int(math.sqrt(num)) + 1):
                if num % i == 0:
                    return False
            return True
        
        candidate = n
        while True:
            if is_prime(candidate):
                return candidate
            candidate += 1

    def get_node(self, key):
        if not self.nodes:
            return None
        pos = self.hash1(key) % self.table_size
        return self.nodes[self.lookup_table[pos]]


def maglev_hashing(data: list[int], shards: int) -> list[tuple[int, int]]:
    nodes = [f"node-{i}" for i in range(shards)]
    maglev = MaglevHash(nodes)
    return [(i, int(maglev.get_node(f"item-{i}").split("-")[1])) for i in data]


def compare_with_time(title, func, shards_start=20, shards_end=17):
    start_time = datetime.now()
    before = func(shards_start)
    after = func(shards_end)
    stop_time = datetime.now()
    elapsed_time = (stop_time - start_time).total_seconds()
    result = {"Time (sec)": elapsed_time}
    result.update(compare_shards(title, before, after))
    return result


def main():
    data = [hash(f"item-{i}") for i in range(100000)]
    table = [
        compare_with_time("Naive modulo", lambda a: naive_modulo(data, a)),
        compare_with_time(
            "Jump consistent hashing", lambda a: jumping_hashing(data, a)
        ),
        compare_with_time("Fibonacci hashing", lambda a: fibonacci_hashing(data, a)),
        compare_with_time(
            "Consistent hashing (1 replica)", lambda a: consistent_hashing(data, a, 1)
        ),
        compare_with_time("Rendezvous hashing", lambda a: rendezvous_hashing(data, a)),
        compare_with_time("Maglev hashing", lambda a: maglev_hashing(data, a)),
    ]

    print(tabulate(table, headers="keys", tablefmt="github", floatfmt=".2f"))


if __name__ == "__main__":
    main()

Time (sec)

Название

Процент оставшихся на месте ключей

Сегменты (до)

Сегменты (после)

Записей на сегмент

Записей на сегмент(стандартное отклонение)

0.14

Naive modulo

4.92

20

17

5882.35

66.82

0.15

Jump consistent hashing

82.16

20

17

5882.35

57.99

0.54

Fibonacci hashing

4.94

20

17

5882.35

69.35

1.63

Consistent hashing (1 replica)

77.13

20

17

5882.35

4797.17

21.88

Rendezvous hashing

84.91

20

17

5882.35

79.63

1.32

Maglev hashing

4.96

20

17

5882.35

62.62

Также стоит отметить, что в GP5 использовались naive_module (или, как красочно, он обозначен в коде -- LAZYMOD) и распределение по битовой карте (но только, если число сегментов кратно 2). Эти реализации сохранились и в GP6, но судя по всему, только для обратной совместимости (все, что нашел по этому вопросу)

Подведем итоги по этому блоку. Jump_consistent_hash быстрый, простой алгоритм, который не требует ни памяти, ни ресурсов, еще и разработанный в google. Он точно лучше наивной реализации, потому что, в сравнении, почти нет миграции данных. И лучше Randezvous, потому что, если у меня 100 тысяч записей будут распределяться 22 секунды, я такой скандал учиню. Так что, хороший алгоритм, всем советую.

Random distribution

С random дистрибуцией все проще. Согласно документации используется round-robin алгоритм.

you may choose DISTRIBUTED RANDOMLY, which will send the data round-robin to the segment instances

Интересно, что broadcom использует название алгоритма в 5 и 6 версии, а в 7 написали просто randomly

you may choose DISTRIBUTED RANDOMLY, which will send the data randomly to the segment instances

Чтобы разобраться randomly или round_robin я пошел искать в коде. Замечательная функция дистрибуции была найдена

/*
 * Return a random segment number, for randomly distributed policy.
 */
unsigned int
cdbhashrandomseg(int numsegs)
{
	return random() % numsegs;
}

Пойдя глубже, были найдены еще 2 необходимые функции
RANDOM и pg_lrand48, которые нужны для генерации псевдослучайных чисел.

long
random()
{
	return pg_lrand48();
}
--------------------------------------------------------------------

long
pg_lrand48(void)
{
	_dorand48(_rand48_seed);
	return ((long) _rand48_seed[2] << 15) + ((long) _rand48_seed[1] >> 1);
}

Получается, что при рандом дистрибуции строка вызывает функцию cdbhashrandomseg(num_segments) , которая в свою очередь создает рандомное число и с ним делает key % num_segments. Получается, во-первых, используется случайное распределение, которое мы обсуждали в прошлой главе, и у которого есть описанные выше проблемы. Во-вторых, это не совсем round-robin. Обратимся к определению термина

Round-robin algorithm is a simple and widely used method for distributing tasks or requests evenly across a set of resources, such as servers. It cycles through the list of resources in order, assigning each incoming task to the next resource in the sequence. Once it reaches the last resource, it starts again from the first. This cycle repeats continuously, ensuring a roughly equal workload distribution.

Как это работает?

Пусть имеется N объектов, способных выполнить заданное действие, и M задач, которые должны быть выполнены этими объектами. Подразумевается, что объекты n равны по своим свойствам между собой, задачи m имеют равный приоритет. Тогда первая задача (m = 1) назначается для выполнения первому объекту (n = 1), вторая — второму и т. д., до достижения последнего объекта (m = N).

В общем-то, более рассказать и нечего, в GP судя по тому, что найдено, реализуется не round-robin и, соответственно, возможны выбросы, они не критичны и на большом объеме данных сводятся к равномерному распределению, но все же, на объемах меньше одного миллиона записей, это можно проследить.

Я потестировал несколько вариантов, которые показывают, что распределение рандомное. На тестовой машине 12 сегментов

CREATE TABLE test_table(id int4) 
DISTRIBUTED RANDOMLY ;

INSERT INTO test_table(id)
SELECT generate_series(1,120000)

SELECT gp_segment_id, count(*) FROM test_table GROUP BY gp_segment_id 
------------------------------------------------------------------------
segment_id count
0 10028
1 9757
2 9983
3 10039
4 9978
5 9915
6 10007
7 10098
8 10029
9 10180
10 9961
11 10025
------------------------------------------------------------------------
TRUNCATE test_table;
INSERT INTO test_table(id)
SELECT generate_series(1,1200000)

------------------------------------------------------------------------
0 999733
1 999577
2 1000606
3 999704
4 999022
5 1000015
6 1000456
7 999290
8 999568
9 1000301
10 1001390
11 1000338
------------------------------------------------------------------------
TRUNCATE test_table; 
INSERT INTO test_table(id)
SELECT generate_series(1,15)

------------------------------------------------------------------------
2 2
3 3
4 2
5 2
6 1
7 2
8 2
11 1

Replicated distribution

С этим элементом мозаики все еще проще. Никаких алгоритмов тут не используется, запись просто распределяется на каждый сегмент. Из интересного, при Update/delete записи реплицированной таблицы не создаются сабпланы, так как запись нужно изменить на каждом сегменте. Вот тут можно исследовать этот вопрос подробнее

{
	/*
	 * In the UPDATE/DELETE statement, if the result(target) relation
	 * is replicated table, we can not generate a sub-plan as
	 * SELECT statement, becuase we have to UPDATE/DELETE the tuple
	 * on all segments
	 *
	 * The JOIN operator is generated by 'UPDATE/DELETE ... FROM ...',
	 * so we can assume that the join type is inner-join:
	 *   a) if the outer table have 'wts', it can not add broadcast motion
	 *     directly.
	 *   b) We can sure that inner table have no 'wts'.
	 */
	if (root->upd_del_replicated_table > 0 &&
		bms_is_member(root->upd_del_replicated_table, inner.path->parent->relids) &&
		CdbPathLocus_NumSegments(inner.locus) >CdbPathLocus_NumSegments(outer.locus))
	{

Заключение

Надеюсь, сегодня все узнали что-то новое, а если и нет, то провели на 10 минут меньше в тик-токе. Возможно, кто-то из вас найдет практическое применение этой информации, кто-то расскажет ее на собеседовании. Так или иначе, спасибо за внимание.

Комментарии (0)