¹ …просто потому, что другого варианта добиться необходимого результата тупо не существует.
² и да, довольно-таки глубоко.
³ нет, серьёзно!



Давайте рассмотрим следующий бизнесовый кейс.


Дано: реально большие данные. Очень много датасетов по много терабайтов каждый, — в сумме объём тянет на петабайты. Лежат в облаке, но это не важно. Важно, что мы эти данные покупаем в «сыром» виде, каким-то образом «готовим», а потом перепродаём конечному потребителю.


Требуется: при подготовке каждого из датасетов разделить его согласно значениям одного или нескольких полей, составляющих его записи, на несколько. И это одна из особенно часто встречающихся в нашем процессе операций.


Довольно-таки сложный, продвинутый ETL у нас. Поясню на типичном примере.


К нам приходит исходный датасет с полями широта, долгота, хеш ID пользователя, дата/время, точность, и ещё десятком, которые сейчас неважны.


Координаты могут покрывать всю страну целиком (например, UK или Францию), а сам датасет аккумулирован за месяц, то есть, диапазон дат в нём довольно-таки немаленький, как минимум, 28 дней.


А наши клиенты хотят получить себе всё то же самое, но, помимо кое-какой подготовки (санитизации, фильтрации, конверсии формата) порезанное, во-первых, по отдельным муниципалитетам, и, во-вторых, сгруппированное по дням недели. А иногда ещё и по рабочим/нерабочим часам.


Понять клиентов можно: деловая активность и паттерны перемещения популяции в реальном мире довольно сильно зависят от того, утро вторника сегодня, или субботний вечер. Потому что примерно половина человеков работает по графику 5/2 с утра до вечера, и именно они наиболее интересны для анализа (по большей части, уже силами наших клиентов). А те, кто работает по иным графикам, типа 12-часовой смены 2 дня через 2, как правило, заняты на низкоквалифицированной и малооплачиваемой работе. Денег они приносят куда меньше, и маркетингово исследовать их особого смысла нет.


Вот такая предметная область. Но наша конкретная задача — это первоначальная подготовка.


Значит, нам, как промежуточному звену, в процессе надо порезать датасет как минимум по широте+долготе (наложив некий фенсинг для каждого из целевых муниципалитетов), и потом ещё по дням недели (либо по каждому по отдельности, либо по рабочим/выходным) + почасовые интервалы. «Как минимум», потому что некоторые клиенты хотят ещё сильнее раздробить данные, вплоть до первой буквы хеша ID пользователя (который представляет из себя 16-ричную строку), или наложить ещё какие-нибудь особенные способы разбиения.


И вы знаете что?


В стандартном для бигдаты инструменте Apache Spark попросту нет штатного способа, чтобы проделать подобную операцию дёшево и сердито. Причём не только в самом его ядре (сиречь, RDD API), но и во всех более высоких обвязках. Не говоря уже обо всяких надстройках и производных продуктах.


Почему нет?


Я не знаю. Видимо, всем тем, кто разрабатывает данные инструменты, задача разбиения одного датасета на несколько не является приоритетной.


Что делать? Ну, только писать какую-то свою имплементацию.


Прошу заметить, что ключевое слово тут именно «сделать дёшево». Сделать-то «как-нибудь» и «в принципе» ещё как можно, конечно же. Вот только производительность этих «стандартных» способов как минимум сомнительная, потому что отбор по значению полей в абсолютно большинстве случаев потребует полного перебора датасета множество раз.


Стандартный путь, доступный через API, таков: фильтрануть по каждому из значений с начала до конца. Столько раз, сколько этих самых значений.

Давайте посмотрим способ сделать тупо и в лоб (на кастомные типы — врапперы записей и RDD — можно не обращать внимания, важен сам алгоритм):


public ListOrderedMap<String, DataStream> execute() {
    ListOrderedMap<String, DataStream> outputs = new ListOrderedMap<>();

    DataStream input = inputStreams.getValue(0);
    input.cache();

    final List<String> _splitColumnNames = splitColumnNames;

    JavaPairRDD<Object, DataRecord<?>> distinctSplits = input.rdd()
            .mapPartitionsToPair(it -> {
                Set<Tuple2<Object, DataRecord<?>>> ret = new HashSet<>();

                while (it.hasNext()) {
                    DataRecord<?> v = it.next()._2;

                    Columnar r = new Columnar(_splitColumnNames);
                    for (String col : _splitColumnNames) {
                        r.put(col, v.asIs(col));
                    }

                    ret.add(new Tuple2<>(r.hashCode(), r));
                }

                return ret.iterator();
            })
            .distinct();

    Map<Object, DataRecord<?>> uniques = distinctSplits
            .collectAsMap();

    for (Map.Entry<Object, DataRecord<?>> u : uniques.entrySet()) {
        Columnar uR = (Columnar) u.getValue();
        String splitName = outputNameTemplate;
        for (String col : _splitColumnNames) {
            splitName = splitName.replace("{" + col + "}", uR.asString(col));
        }

        int hash = (Integer) u.getKey();
        JavaPairRDD<Object, DataRecord<?>> split = input.rdd().mapPartitionsToPair(it -> {
            List<Tuple2<Object, DataRecord<?>>> ret = new ArrayList<>();

            while (it.hasNext()) {
                Tuple2<Object, DataRecord<?>> v = it.next();

                Columnar r = new Columnar(_splitColumnNames);
                for (String col : _splitColumnNames) {
                    r.put(col, v._2.asIs(col));
                }

                if (r.hashCode() == hash) {
                    ret.add(v);
                }
            }

            return ret.iterator();
        });

        outputs.put(splitName, new DataStreamBuilder(splitName, input.attributes())
                .filtered(meta.verb, input)
                .build(split)
        );
    }

    return outputs;
}

Что тут происходит?


Тут происходит чёртова уйма последовательных проходов по всему датасету целиком.


Причём, по-разному. В самом первом проходе выбираются уникальные сочетания полей записей, по которым производится разбиение.


А во всех последующих проходах производится отбор тех записей, которые соответствуют каждому из собранных уникальных сочетаний.


И это крайне нерациональный способ разбиения датасета. Допустим, если нам требуется разбить его по дням месяца (31 значение) и по полю с первым символом хеша ID пользователя (16 значений), то у нас в максимальном кейсе будет 1 + 31 * 16 = 497 (!) полных проходов по исходному месячному датасету от начала и до конца.


Прикиньте, сколько бесполезной работы сделает кластер, отбирая всего 1/496 от всех записей за каждый один проход!


И если у нас терабайт данных на входе, разбитый на 2000 партиций (чтоб уж точно влезали в екзекуторы облачного кластера), то все выходные датасеты будут иметь уже 992 000 партиций в сумме. Тот же терабайт, размазанный почти на миллион выходных файлов, каждый из которых почти в 500 раз меньше исходного!


Полное безобразие. Помимо миллиона мелких спарковских тасок, — на котором оверхед на запуск экземпляров байткода JVM для каждого .mapPartitionsToPair() в цикле становится более чем заметен, — так ещё и дисковое / сетевое IO на копирование результата, раскиданного в мелконькие файлы, с кластера в конечное хранилище влетает в копеечку, мягко говоря.


У нас впустую жрётся время, а в облаке время — это деньги.


И ведь самое поганое, что с точки зрения идеологии Apache Spark это единственно «правильный» способ — мы бесконечно итерируемся по датасету, собирая нужные нам кусочки данных по одному за проход, и копируя их в новый. RDD ведь штука иммутабельная… и в конце любой операции может быть только единственный результат. Ну вот так и надо, чтобы всё по книжке.


Впрочем, до какого-то момента нас такое «идеологически правильное» безобразие устраивало. Но когда объёмы выросли, стало уже совсем некомфортно по себестоимости.


Что ж. Если бегать вдоль слона от кончика хобота до кончика хвоста слишком дорого, то надо менять парадигму. Обозначим этот способ «подход №1», назовём его легаси, и поищем какой-нибудь иной метод.


Например, попробовать поиграться с партициями.


Допустим, а что, если вынести значения полей разбиения из самой записи в её ключ? После чего сделать .groupByKey() с заранее заданным алгоритмом партиционирования, так, чтобы каждое из уникальных сочетаний попадало в строго определённый диапазон партиций? После чего обращаться уже к конкретным партициям за каждой порцией данных, и сохранять их в свой отдельный датасет.


Для описанного ранее примера это будет обозначать, что записи со значениями в поле дата = 1 и цифра ID хеша = 0 должны попасть в партиции с номерами от 0 до 3, дата = 2 + цифра ID хеша = 1 в партиции от 4 до 7, и так далее вплоть до записей с дата = 31 + цифра ID хеша = F в партиции от 1980 до 1983.


1984 = 496 * 4, и это количество наиболее близко к изначально выбранному количеству в 2000 партиций (которые у нас влазят в екзекуторы). Тут важно, что алгоритм вычисления номера партиции должен быть стабильным, и выдавать известный наперёд int для каждого уникального сочетания значений полей.


Стоимость этого «подхода №2» будет складываться из одного глобального шаффла и последующих итераций по датасету. Но, в отличие от прохождения по ВСЕМ записям, последующие итерации должны обращаться только к конкретным 4 партициям для каждого сочетания, и брать записи напрямую из 1/496 объёма за раз.


Даже с учётом шафла, количество спарковских тасок будет уже не под миллион, а всего лишь 1984 + 1984 = 3968. Выигрыш в 50 (!) раз.


Вот только теперь возникает вопрос, а как в Spark можно штатно дёрнуть только какую-то конкретную партицию.


А вот фиг там.


Штатно — опять никак нельзя.


Штатно доступна только полная итерация. Можно, конечно, попробовать сжульничать в каком-то таком духе:


Broadcast<int[]> parts = sparkContext.broadcast(partitions);
rdd.mapPartitionsWithIndex((idx, it) -> {
    if (Arrays.binarySearch(parts.getValue(), idx) >= 0) {
      return it;
    }
    return Collections.emptyIterator();
}, true);

Тут мы выбираем указанные партиции по номерам, нооо… количество тасок у нас так и остаётся под миллион. Просто 95% из них будут пустышками. Но оверхед на их вызов никуда не девается, и 95% из миллиона результирующих файлов хоть и будут весить 0 байтов, время на их обработку никуда не денется.


Может, есть какой-то другой вариант?


Спросив совета у коллективного разума, выясняем, что можно попробовать заставить Spark использовать кастомный коалескер.


То есть, класс-наследник Coalescer, который на уровне RDD API позволяет поменять количество партиций у датасета.


Покурив немножко исходники самого Спарка, приходим к такому решению:


public class RetainCoalescer implements PartitionCoalescer {
    private final int[] partsToRetain;

    public RetainCoalescer(int[] partsToRetain) {
        this.partsToRetain = partsToRetain;
    }

    @Override
    public PartitionGroup[] coalesce(int ignore, RDD<?> parent) {
        PartitionGroup[] pg = new PartitionGroup[partsToRetain.length];
        for (int i = 0; i < partsToRetain.length; i++) {
            int partIndex = partsToRetain[i];

            Seq<TaskLocation> preferredLocs = parent.context().getPreferredLocs(parent, partIndex);
            pg[i] = new PartitionGroup(Option.apply(preferredLocs.head().host()));
            pg[i].partitions().$plus$eq(parent.partitions()[partIndex]);
        }

        return pg;
    }
}

Тут мы берём исходную RDD, и насильно выдираем из неё только те парты, которые нам надо, игнорируя все остальные. Получаем новую RDD сразу из нужных нам партов.


Теоретически.


Потому что на практике такой кастомный коалескер не работает. Да и вообще никакой кастомный коалескер в Spark не отработает, потому что в единственном месте, где он вызывается, воткнута проверка, которая всегда падает.


Причём, падать она будет, даже если через рефлексию прописать в соответствующем поле true.
«Что за дичь?!» спросите вы.



Внутри require()false. Но почему?


А вот. В кишках Спарка частенько происходит какая-то невменяемая чертовщина, которую фиг отладишь :(


Обычно либо из-за race condition, либо из-за double initialization. Но конкретно здесь скорее всего попросту multi-threaded access в отсутствие memory barrier. Что ни делай, при вызове под отладчиком у нас видно true, но как только проваливаешься внутрь метода, туда всегда прилетает false, и момент, когда происходит эта подмена значения, мне за два дня поочередной постановки всех тредов по отдельности на паузу в отладчике отловить так и не получилось.


И ещё интереснее становится, когда понимаешь, что require этот бессмысленный, так как DefaultPartitinСoalescer не имплементирует Serializable, и вообще никуда с драйвера не передаётся. То есть, тут реально написана какая-то ненужная хрень, и воткнута лишняя проверка.


Вот оно, то единственное место во всём Spark, где используется кастомный коалескер:


override def getPartitions: Array[Partition] = {
    val pc = partitionCoalescer.getOrElse(new DefaultPartitionCoalescer())

    pc.coalesce(maxPartitions, prev).zipWithIndex.map {
      case (pg, i) =>
        val ids = pg.partitions.map(_.index).toArray
        CoalescedRDDPartition(i, prev, ids, pg.prefLoc)
    }
  }

А больше нигде и никак. То есть, по факту он вовсе не нужен, достаточно было бы этот метод переопределить прямо в CoalescedRDD, и усё.


И вот сидишь, такой, и думаешь: «багу, что ли, в спарк коре завести?»


Но кто-нить заводил там баги? А из России они их щас принимают вообще?


Поколебавшись, таки заходим в баг-трекер, и видим совершенно удручающую картину: подобных багов там на самом деле немало, и некоторые даже сразу с патчами для фикса, но вот только все они автоматически закрыты ботом как stale ровно через полгода после заведения. Их попросту никто и никогда не ревьюит.


Такой вот он, этот ваш хвалёный опенсорс.


Если что-то не нужно корпоративному спонсору проекта, то хоть зазаводи ишьёв, и хоть завали их патчами. Никто и никогда из ментейнеров в них не посмотрит.


Тьфу ты, пропасть.


Ну, коли у нас уже пошла пьянка с глубоким курением исходников, то не остаётся иного варианта, кроме как попробовать обойти этот косяк, реализовывав свою собственную CoalescedRDD, в которой напрямую прописывать выбор партов:


public class RetainerRDD<T> extends CoalescedRDD {
    private final RDD prev;
    private final int[] partsToRetain;

    public RetainerRDD(RDD<?> oneParent, int[] partsToRetain, ClassTag ct) {
        super(oneParent, partsToRetain.length, Option.empty(), ct);

        this.prev = oneParent;
        this.partsToRetain = partsToRetain;
    }

    @Override
    public Partition[] getPartitions() {
        Partition[] ret = new Partition[partsToRetain.length];
        for (int i = 0; i < ret.length; i++) {
            Seq<TaskLocation> preferredLocs = prev.context().getPreferredLocs(prev, partsToRetain[i]);
            ret[i] = new CoalescedRDDPartition(i, prev, new int[]{partsToRetain[i]}, Option.apply(preferredLocs.isEmpty() ? null :  preferredLocs.head().host()));
        }
        return ret;
    }

    public static JavaPairRDD<Object, DataRecord<?>> retain(JavaPairRDD<Object, DataRecord<?>> rdd, int[] partsToRetain) {
        if (partsToRetain == null) {
            return rdd;
        }

        return new RetainerRDD<Tuple2>(rdd.rdd(), partsToRetain, ClassManifestFactory$.MODULE$.fromClass(Tuple2.class))
                .toJavaRDD()
                .mapToPair(t -> (Tuple2) t);
    }
}

Тут мы всю логику запихиваем прямо в .getPartitions(), а для того, чтобы было удобно вызывать из своего собственного кода, делаем статический метод .retain(), в который и прокидываем исходную RDD и массив с номерами нужных нам партов.


Даже специфический стиль спарковских исходников тут у нас выдержан, хотя, конечно, код в стиле Scala на Java выглядит отвратительно, не говоря уже о вызове собственно Scala-специфичных методов. Лично меня немного корёжит от подобного, но это хотя бы работает!


Другое дело, что всё, что нам нужно, помечено в спарковских исходниках аннотацией @DeveloperApi, и может сломаться в следующей минорной версии — и нет никакой гарантии, что код RetainerRDD не придётся переписывать каждый раз, мигрируя на новый Spark.


Но тут уж либо шашечки, либо ехать.


Если с точки зрения core-разработчиков Spark наш бизнесовый кейс ниже уровня радара, то решение частной проблемы будет именно таким: с залезанием глубоко в кишки, и наведением там своего порядка при помощи некрасивых костылей и какой-то там матери.


С точки зрения чистоты кода, довольно омерзительное решение.


Но если смотреть с точки зрения стоимости запуска, то уже при тестировании датасета на 1 ТБ, — с разбиением всего лишь на 24 почасовых куска, — запуск процесса вместо 1 часа 40 минут стал занимать 1 час ровно. То есть, на 40 минут меньше. А ведь тут вместо 25 линейных проходов стал 1 с общим шафлом, и 24 частичных, по 1/24 от исходного — и сразу такой заметный выигрыш. Дальнейшее внедрение на процессах с сотнями сочетаний значений полей для разбиения показало соответствующий результат, срезав стоимость некоторых процессов уже не на 40%, а в разы.


Более того, можно с уверенностью сказать: никакой другой инструмент для ETL-ивания больших данных не поддерживает дешёвое разбиение датасетов. Только наш. Ведь у нас это один из наиболее важных этапов, а всем остальным такой кейс пофиг.


Для того, чтобы конечные пользователи нашего инструмента, — то есть, аналитики данных, — смогли использовать этот новый механизм в удобоваримом виде, тоже пришлось постараться. Ведь у нас не просто какой-то там очередной ETL-движок, а целый интерпретатор SQL с императивными элементами.


Например, он поддерживает функции (кучу разных; можно и свои определить). Поэтому, для аналитиков разбиение теперь выглядит как-то так:


$p_p_l = 4; -- parts per letter
$p_p_d = 16 * $p_p_l; -- parts per day

-- we pass $year and $month via command line
CREATE FUNCTION daysPerMonth(@year, @month) AS BEGIN
   IF $month IN [4,6,9,11] THEN RETURN 30; END;
   IF $month == 2 THEN
      IF ($year % 400 == 0) || ($year % 4 == 0) && ($year % 100 <> 0) THEN RETURN 29;
      ELSE RETURN 28; END;
   END;
   RETURN 31;
END;

$parts = daysPerMonth($year, $month) * $p_p_d;

-- set record keys according to “day” and “userid” values, then shuffle DS into $parts
ALTER "processed" SET KEY (INT "day" - 1) * $p_p_d + INT ('0x' || STR_SLICE("userid", 0, 1)) * $p_p_l + RANDOM $p_p_l PARTITION $parts;

LET $days = RANGE[0, 30]; -- days are from 1 to 31, but part # base is 0
LET $hex_digits = STR_SPLIT('0123456789abcdef', '', 16);
LOOP $day IN $days BEGIN
    LOOP $digit IN $hex_digits BEGIN
        -- for each day and letter select only needed partitions: 0..3, 4..7, …, 1980..1983
        LET $range = ARR_RANGE($day * $p_p_d + INT ('0x' || $digit) * $p_p_l, $day * $p_p_d +  INT ('0x' || $digit) * $p_p_l + $p_p_l - 1);
        SELECT *
            FROM "processed" PARTITION $range
            INTO "processed/{$day + 1}/{$digit}";
    END LOOP;
END LOOP;

Но вот что касается кастомных функций в SQL, такой как daysPerMonth() из этого примера, то это уже материал для отдельной статьи в соответсвующей серии.


Be healthy and stay tuned!

Комментарии (9)


  1. Ninil
    27.05.2025 19:48

    Наконец-то нормальная статья для хабра соотв. уровня, а не очередной хлоу_ворлд или реклама канала. Плюсую!


    1. GidraVydra
      27.05.2025 19:48

      *Написанная для отчетности по гранту


      1. PastorGL Автор
        27.05.2025 19:48

        одно другому не мешает. или вы думаете, что все гранты должны тупо проедаться без реального выхлопа?

        я вот вполне себе (то есть нам) нормальный продукт написал.


        1. GidraVydra
          27.05.2025 19:48

          Да нет, просто забавно видеть это в тэгах. Ну и ещë очень интересно, почему и с какой целью коммерческой компании, которая занимается вот этим:

          эти данные покупаем в «сыром» виде, каким-то образом «готовим», а потом перепродаём конечному потребителю

          вообще выдают гранты. Но это вопрос не к вам, а к тем, кто их выдает.


          1. PastorGL Автор
            27.05.2025 19:48

            я что-то не догоняю причин вашего негодования.

            грант был выдан с конкретной темой: «разработка инструмента для ETL». цель достигнута, инструмент успешно разработан. более того, выложен в открытый доступ: https://github.com/PastorGL/datacooker-etl — кто угодно может брать и пользоваться.

            а бизнес-модель конторы, в которой он внедрён, это дело только самой конторы. вы вообще в курсе, как и на что выдаются гранты?


            1. GidraVydra
              27.05.2025 19:48

              вы вообще в курсе, как и на что выдаются гранты?

              Я-то как раз в курсе, потому что всю жизнь работаю в науке и R&D, и сам руковожу и руководил НИР и НИОКР по грантам в некоммерческих научных учреждениях. Поэтому мне и непонятно, зачем и почему выдают гранты коммерческой компании на еë хоздеятельность, направленную на получение прибыли. Говоря простым языком, это примерно то же самое, что выдавать Пятерочке гранты на продажу круассанов.


              1. PastorGL Автор
                27.05.2025 19:48

                тогда должны знать, что гранты выдают очень разные фонды (коммерческие, некоммерческие, государственные, частные, и т.д.) на совершенно разных условиях (например: разработка продукта с нуля, коммерциализация существующего продукта, поддержка инновации, внедрение технологии, и т.п.).

                вообще, никому не советую делать далеко идущих conjectures на основе неполной информации. вы ведь понятия не имеете, какие именно вводные были у нас (а раскрывать их я, конечно же, не буду — не имею права). но покритиковать очень хочется, да?


  1. chotizaperets
    27.05.2025 19:48

    Спасибо за столь интересную статью! Такую проблему я решал с использованием dataframe api df.repartition(keys).write.partitionBy(keys). Чем этот подход уступает подходу, описанному в статье?


    1. PastorGL Автор
      27.05.2025 19:48

      а вы почитайте исходники dataframe api, сразу станет понятно.

      если кратко, то это «подход №1» со всем его оверхедом.