Билайн давно борется с мошенниками, как и все крупные операторы сотовой связи. Однако формирование множества отчётов для регулятора неожиданно стал для меня вызовом.

Привет, Хабр! Меня зовут Александр Маркачев и я — Data Engineer команды Голосовой Антифрод в билайн. Расскажу, как борьба с мошенниками может обернуться личным вызовом.

Просто, но не просто

В начале была задача. И задача была защитить абонентов от звонков мошенников. билайн противодействует мошенническим звонкам и в определённый момент нам понадобилось передавать данные о фродовых вызовах регулятору. Однако процесс усложнялся тем, что за нами находились другие операторы, от которых тоже требовалась передача информации. Более того, за каждым из них могли находиться их собственные пользователи, что превращало нас в агрегатора такой информации — для формирования последующих отчётов.

Каждая передача данных должна была осуществляться из индивидуальных источников клиентов в их собственные учётные записи на сервисе регулятора. Дополнительно наши клиенты могли динамически присоединяться или покидать систему, что требовало разработки и поддержки отдельного приложения для каждого из них — процесс трудозатратный и требующий много ресурсов.

Кроме того, возникла необходимость учитывать возможность самостоятельного добавления или удаления клиентов без уведомления и контроля разработчика, при этом все данные должны были направляться на сервер регулятора. Данные поступали к нам через SFTP и Kafka, что требовало универсального и гибкого подхода к их обработке.

Цель состояла в автоматизации обработки данных, обеспечении масштабируемости системы при добавлении новых клиентов и снижении нагрузки при их уходе — желательно, без постоянного вмешательства разработчиков. В процессе создания решения мы изучили методы эффективного репартицирования данных, оптимизации операций объединения (joins) и устранения их избыточности, а также повышения производительности оконных функций. В конце расскажу о нашем итоговом решении и его преимуществах.

Репартицирование (repartitioning)

Репартицирование — это ключевой процесс в Spark, который помогает эффективно обрабатывать большие объёмы данных. Представьте, что у вас есть гигантский файл с данными. Чтобы быстрее и проще с ним работать, мы разбиваем его на множество более мелких частей. Это как разбивать большой груз на удобные для переноски коробки. Spark может автоматически выполнять репартицирование, но иногда мы хотим настраивать этот процесс вручную, чтобы лучше контролировать обработку данных на разных этапах.

Когда можно регулировать этот процесс:

  • На этапе чтения. Можем задать параметр maxPartitionBytes, который позволяет разбить слишком большие блоки и прочитать их последовательно. Это снижает необходимость перемещения данных между узлами системы, что называется шаффлингом. Меньшие файлы легче обрабатывать, и процесс становится быстрее.

  • После загрузки данных. Увеличить или уменьшить число блоков, перераспределить между экзекьюторами с помощью repartition и coalesce.

Если у нас много маленьких файлов, их объединение с помощью команды coalesce поможет сократить количество операций шаффлинга. Это ускоряет обработку, так как система не тратит время на управление множеством мелких блоков — вместо загрузки корабля сотнями рабочих вручную, они грузят коробки в контейнеры, а потом большие контейнеры отправляются на погрузку.

С другой стороны, если блоки данных становятся слишком большими, мы можем разделить их с помощью команды repartition. Это помогает распределить нагрузку равномерно между всеми узлами и повысить общую производительность — мы разгружаем наш корабль обратно.

  • Во время действий – агрегации, джойны. Параметр spark.sql.shuffle.partitions отвечает за количество блоков при шаффлинге данных. Найти оптимальное значение этого параметра — задача не из лёгких. Слишком много блоков приведёт к избыточному шаффлингу и увеличению времени обработки, а слишком мало — к неэффективному использованию ресурсов: если дать рабочему носить по одной вещи в грузовик, то это неэффективно, а если загрузить его десятью большими коробками сразу, он никогда не доедет.

Выделим тут две проблемы:

  1. Как определить подходящие параметры.

  2. Как подбирать параметры «гибко», чтобы не хардкодить их единожды, а перерасчитывать регулярно и, желательно, без участия разработчика.

Например, если возьмём значение по умолчанию, сможем прочитать таблицу за 5-7 секунд. Задав слишком маленький блок и дополнительно репартицировав наш DataFrame, получим увеличение времени обработки. Разбив DataFrame на десять частей, мы можем как улучшить результат, так и ухудшить его за счёт того, что будем обрабатывать их в меньшее число потоков. В итоге, расчёт может упасть — экзекьютор не выдержит нагрузки.

Выбор оптимальных параметров требует анализа и понимания данных.

В Spark существуют инструменты, которые помогают понять, с какими файлами работает DataFrame. Команда dataframe.inputFiles покажет список всех файлов, а функция input_file_name — конкретные файлы в текущей выборке. Важно оставить только уникальные значения, чтобы избежать дублирования.

val arr: Array[String] =
            df.select(input_file_name())
              .distinct()
              .collect
              .map(row => row.getString(0))

val conf: Configuration = spark.sparkContext.hadoopConfiguration

Input_file_name особенно полезен, если в DataFrame происходит много фильтраций. Чтобы не высчитывать лишнее, можно сразу определить только используемые файлы.

Определив список файлов, его нужно привести к единому формату, например, перенести в Set. После этого создать конфигурацию для подключения к файловой системе.

При наличии списка файлов и подключения к файловой системе можем воспользоваться специализированной библиотекой для конкретного типа файлов. Если у вас Orc, можно использовать соответствующую библиотеку, при помощи прямого контекста с вашего языка программирования подключиться к файловой системе и напрямую из файла получать все метаданные. Например, размер файла или данных в несжатом виде, максимальное и минимальное количество записей в файле, число строк и многое другое.

Для решения нашей задачи можем использовать либо размер файла в сжатом виде, либо размер файла в несжатом виде, если есть жёсткие ограничения по памяти.

// Получение списка файлов и определение конфигурации
val tableSize = arr.par.map(file => {
     val path = new org.apache.hadoop.fs.Path(file)
     val confReader = OrcFile.readerOptions(conf)
     val content = OrcFile.createReader(path, confReader)
     content.getContentLength

}).reduce(_ + _)

Получив размер каждого файла, мы их все конкатенируем и получаем некое единое число. Это будет объём данных, который задействуется в конкретном DataFrame на момент конкретного расчёта.

// Получение списка файлов и определение конфигурации
val tableSize = arr.par.map(file => {
     val path = new org.apache.hadoop.fs.Path(file)
     val confReader = OrcFile.readerOptions(conf)
     val content = OrcFile.createReader(path, confReader)
    // Если мало ОЗУ и много ЦПУ
     content.getRawDataSize
}).reduce(_ + _)

Имея все эти данные, можем понять, насколько действительно необходимо репартицировать наш DataFrame для получения идеального результата.

// Получение списка файлов и определение конфигурации
// Определение объема данных
val perfectBlockSize = 256 * 1024 * 1024
val repartitionCount = tableSize / perfectBlockSize

Посмотрим на результаты.

Начальное число осталось прежним. Если мы начинаем искусственно дополнительно репартицировать, время может вырасти. Но при этом за счёт более равномерного распределения в некоторых случаях это может дать выигрыш времени или ресурсов.

Если нет необходимости делать дополнительное репартицирование, мы правильно прочитали файл, разбив его на блоки нужного размера, для агрегации сразу задали правильные параметр и размер, то можем сократить время даже при гибком определении без предрасчёта до 2,5 секунд.

В случае, если размер данных у нас никогда не будет меняться (или меняется незначительно), можем положиться на предрасчёт и сократить время до двух секунд. Особенно удобно это для стриминга: в начале запуска всё рассчитывается, а дальше работает итеративно без пересчётов. При перезапуске он будет снова обращаться к вашим файлам и в дальнейшем так и обрабатывать.

Выводы:

  1. Нужно ручное управление блоками чтения и обработки. Это сильно помогает, когда мы понимаем, как правильно его использовать.

  2. Итеративно выбираем лучший набор из размера блока чтения и блока обработки. Таким образом для конкретного расчёта можно определить идеальные параметры.

  3. Ставим на автоопределение при перезапуске, если у вас стриминг. После этого при каждом перезапуске пересчитываем. Всё будет отлично и быстро работать.

Так или иначе, репартицирование затрагивает Join.

Join

Join — это операция, которая объединяет два набора данных по общему ключу. Неважно, одному или нескольким.

Проблемы тут те же, что и у стандартного репартицирования:

  • Большой шаффл замедляет обработку.

  • Маленький шаффл замедляет обработку.

  • Сложно гибко определять размеры для обработки.

У нас к этому моменту уже была написана функция для определения правильного размера репартицирования. Она отлично подходит не только для одного DataFrame, но и для двух и более. В случае, если перед считыванием DataFrame мы задаём размер блока чтения, он будет применяться только к конкретному DataFrame. То есть перед каждым DataFrame мы можем задать этот параметр и в дальнейшем с ним работать.

Так, если мы будем использовать малые объёмы, можем получить двукратную выгоду с 12 до 6,5 секунд.

А когда мы начинаем работать с чуть большим объёмом, выгода становится уже не столь очевидна, потому что при автоопределении придётся потратить достаточно времени на то, чтобы вычитать все файлы с DataFrame, если мы используем input_file_name. Сделать на них distinct, потом параллельно обработать, посчитать — это всё требует много времени.

С предрасчётом мы уже выиграем 20−30%, что достаточно хороший результат.

Если мы ещё больше углубимся, выиграть можно больше, до 50% даже на средних данных. Но даже пройдясь верхнеуровнево, можно получить 20−30% без лишних затрат, что классно.

На больших данных этот эффект становится ещё заметнее.

Input_file_name начинает сильно проигрывать (блок «построчно») по скорости обработки. Когда DataFrame на миллиарды строк, сначала нужно получить имена файлов, из которых он был создан, потом — их уникальность, затем высчитать все файлы и обработать их - это занимает много времени. Но если после фильтраций остаётся мало данных из этого объёма, то конечно это стоит использовать. Если же отфильтровывается не слишком много данных - выгоднее использовать именно inputFile. Я его специально вначале вынес отдельно, он не заставляет нас дополнительно обрабатывать DataFrame. При чтении мы сразу получаем весь список имён файлов. В этом случае даже при больших данных мы начинаем выигрывать уже даже десятки секунд на таких объёмах. Если же мы можем предрасчитать, выигрыш становится больше.

Альтернативы использованию join'ов

Мы рассмотрели, как правильное репартицирование может улучшить время обработки, но давайте посмотрим на альтернативы join'ам.

Стандартный подход заключается в использовании небольшой таблицы, которую мы соединяем с большей для выполнения соединения. Однако в некоторых случаях можно обойтись без join'ов.

Что будет, если мы возьмём большую таблицу и соединим её с малой, или же что случится, если мы сначала преобразуем небольшую таблицу и применим её к большой. Какой подход использовать? Если мы используем небольшую таблицу для фильтрации, то выигрыш будут незначительными — например, с 16 секунд до 15.5 секунд. 

Если преобразовать малую таблицу в Set, Map или Dictionary, сможем получить ощутимый прирост производительности. Фильтрация через Set в некоторых случаях работает быстрее, чем Broadcast Join — особенно при больших объёмах.

List приведён как плохой пример, потому что чем больше объём данных для малой таблицы, тем медленнее он будет работать, в то время как Set будет показывать стабильный результат независимо от объёма. При этом Set начинает выигрывать гораздо больше с увеличением объёма малой таблицы. Чем больше малая таблица, тем больше выигрыш даже по сравнению с Broadcast Join.

Но особенно выигрывает подход с Set, когда нужно объединять DataFrame по условию OR (или).

Я думаю, многие слышали и сталкивались с проблемой Nested Loops Join, который множество раз прогоняет малую таблицу, чтобы всё соединить. Если мы используем Set, то можем применить UDF, в которой пропишем условие для соединения, и в дальнейшем получим то же, что и при стандартном Join.

В примере на изображении выше — фильтрация по условию ИЛИ, но несмотря на это мы можем использовать его и для Join непосредственно для обогащения нашей таблицы.

Выигрыш будет не столь заметен, потому что если соединение не один к одному, нужно это учитывать. После соединения Set с нашей таблицей мы должны сделать explode, чтобы разбить получившийся массив на отдельные строки. Но даже в этом случае мы выигрываем у Nested Loop. Тут точно такой же результат, как и в случае с увеличением Set. Чем больше Set, тем больше будет этот выигрыш, потому что Nested Loop всё-таки очень дорогая операция, которая много раз заставляет проходить по всему DataFrame.

Оконные функции

В этот момент мы смогли избавиться от половины Join в нашем коде, но оставалось что-то тревожащее, некие периодические «тормоза». Так мы вышли на оконные функции.

Оконные функции — мощный аналитический инструмент, который часто необходим, но не всегда оправдан. В Spark у него есть дополнительная уязвимость.

По умолчанию значение для оконных функций — 4096 строк. Когда значение ключа становится больше, чем 4к строк, мы начинаем сваливаться в spill, то есть, в утечку памяти. Каждые 4к строк будут сбрасываться на диск, сильно замедляя работу. Если посмотреть на данный пример, то несмотря на малый объем DataFrame в 1,5 MB, мы получили Spill 42.0 MB, то есть почти в 30 раз больше, чем сам DataFrame. Это связано с многократным сбросом и чтением с диска.

Для этого примера мы специально сделали DataFrame всего в 201к строк, из которых 200 тысяч — это один ключ. К счастью, в Spark есть замечательные опции, позволяющие задать эти параметры такими, которые нам удобны:

spark.sql.windowExec.buffer.spill.threshold = ???

spark.sql.windowExec.buffer.in.memory.threshold = ???

Допустим, мы знаем, что у нас DataFrame на 201 тысячу строк, мы задаём этот параметр на 201 тысячу строк.

spark.sql.windowExec.buffer.spill.threshold = 201000

spark.sql.windowExec.buffer.in.memory.threshold = 201000

Весь spill уходит.

В параметрах мы видим один ключ на 200 тысяч, но его не сбрасываем, и всё работает.

Опираясь на знания, которые мы получили на Join-ах, подумаем, действительно ли нам нужна эта оконная функция или можно без неё обойтись. При наличии сложных агрегатных оконных функций без неё мы, конечно, не справимся. А в случае с такими функциями, как row_number, lag, lead придётся потратить много усилий, чтобы их переписать и они заработали.

Но если мы начинаем работать с такими функциями, как расчёт среднего, расчёт числа записей, суммирование, минимальное, максимальное и другими, то можем отказаться от них в пользу предрасчёта наших данных с помощью простого агрегата и потом с помощью Set присоединить к таблице. Это будет гораздо эффективнее, потому что при агрегате у нас Join один к одному. А значит нет необходимости делать explode.

Если сделать расчёт среднего с помощью оконной функции, мы потратим на это 400 секунд, а для подсчёта числа записей— около 200 секунд. А когда мы делаем агрегатную функцию, а потом соединяем, то тратим всего 14 секунд. Это огромный прирост, который позволит нам перейти к следующему шагу и начать реализовывать потребности.

Выводы

  • Настройка spark.sql.windowExec.buffer.in.memory.threshold нужна для большего хранения в памяти.

  • Настройка spark.sql.windowExec.buffer.spill.threshold — для своевременного сброса на диск.

  • Не использовать оконные функции.

По поводу использования двух этих опций должен сразу предупредить, что есть некоторые риски. Если мы задаём этот параметр жёстко, то сброса на диск не будет. Если внезапно придёт большое количество ключей, а вы задали большое число, то расчёт может просто упасть по памяти, потому что вы не позволяете сбрасывать на диск лишние данные этой опцией. Выбирайте что-то одно.

Таким образом, хорошо продуманные подходы к использованию join'ов и оконных функций могут значительно повысить производительность обработки данных в Apache Spark. 

Параллелим Spark

Есть много аргументов против и много аргументов за. Расскажу про нашу точку зрения.

Мы начали с систематизации обработки данных в Apache Spark и избавились от лишних операций, что помогло ускорить выполнение задач. У нас много клиентов, и создание отдельных приложений для каждой компании было бы сложно масштабируемым и требующим постоянного контроля. Поэтому мы начали рассматривать другие варианты, при этом важно, чтобы данные передавались вовремя, не терялись и всегда были актуальными. На основе этого можем установить определенные критерии успеха для нашего приложения, чтобы оно функционировало эффективно:

Вот эти критерии:

  • Приложение работает стабильно.

  • Данные обрабатываются за ожидаемый диапазон времени. Каждые 15 минут мы должны отправлять данные по каждому оператору, без задержек и потерь.

  • Потребление ресурсов меньше, чем при обычном подходе, или хотя бы такое же.

  • Можно завершать и стартовать новые расчёты без изменения кода, то есть без дополнительного участия разработчика и падения всего расчёта, чтобы не было необходимости перезапускать.

Также мы видели некоторые проблемы:

  • Ежедневно могут добавляться или исчезать новые стримы без нашего участия. То есть к нам кто-то подключился или от нас кто-то ушёл. Это было необходимо делать автоматически.

  • Расчётов могут быть сотни или даже тысячи. Мы начинали со ста двадцати расчётов, сейчас их уже больше трёхсот. Пока что всё работает стабильно, без проблем.

  • В Spark нельзя создать несколько сессий с разными конфигурациями в рамках одного приложения.

Зная обо всех этих рисках и о том, что нам необходимо, можем приступить к реализации нашего приложения.

Для начала нужно вывести общие переменные, таблицы и функции. С функциями всё достаточно просто, они заводятся, и те, которые подходят для разных расчётов, используются в каждом. Они не меняются, всё замечательно.

Соединения с базами немного сложнее. Необходимо, чтобы соединения с базами переиспользовали все расчёты и не создавали множество коннектов к базам.

Самое сложное — это таблицы. В том случае, если какое-то приложение работает длительное время, таблица или переменная, на которой это строится, могут стать не актуальны. Тогда весь расчёт теряет смысл.

Давайте постараемся решить проблему с таблицами. Для этого напишем метод для обновления.

def func(): Type = {
  val tmpValue = spark.read.table(table).count()
  if (tmpValue =!= value && tmpValue.isValidLong)
    value = tmpValue
}
var value = empty

В принципе, он достаточно простой. Просто берём функцию, в которой читаем таблицу, рассчитываем число строк, сравниваем. Если оно не совпадает, то обновляем переменную, если совпадает, то не обновляем.

После этого нам необходимо каким-то образом запускать обновление данных таблиц, потому что сама по себе функция ничего не даст. Запускать обновление при каждом старте процесса тоже очень дорого, это дополнительная нагрузка на базы. В языках программирования есть инструменты для создания отдельных пулов внутри приложения. Например, в случае Scala это newScheduledThreadPool, который создаёт пул потоков с возможностью планирования задач и запуска их по определённому расписанию.

// Подготовка функций и переменных
val updExec = Executors.newScheduledThreadPool(num_value)

В данном случае число потоков будет равно числу обновляемых нами переменных. Так, если необходимо обновлять три таблицы, число будет три. Это нужно, чтобы избежать простоев в случае, если одновременно две таблицы будут обновляться.

Теперь мы можем сделать метод для запуска отдельного потока, когда необходимо. Для этого есть Runnable, которая представляет собой задачу, подлежащую выполнению в отдельном потоке. В него мы передаём нашу функцию, и после этого можем задать расписание, с которым он будет обновляться.

// Подготовка функций и переменных
// Определение числа потоков

val updMethod = new Runnable {
  def run(): Unit = func()
}

Мы запускаем Runnable с задержкой в 120 минут и повторяем обновление каждые два часа.

// Подготовка функций и переменных
// Определение числа потоков
// Подготовка метода для обновления

updExec.scheduleWithFixedDelay(updMethod, 120, 120, TimeUnit.MINUTES)

Мы подготовили некий метод для обновления имеющихся у нас данных — но зачем мы это сделали?

Пишем метод запуска и перезапуска

Вместо блокировок вычислений используем так называемые Future — они позволяют выполнять вычисления, не блокируя текущий поток. 

Сейчас сделаем Future, где у нас будет запускаться стрим, который каждые 10 секунд будет читать таблицу. В данном случае просто читать, но туда можно закинуть всё, что угодно.

def starter(path: String)(ec: ???): Future[Unit] = Future {
   spark.readStream
      .format("rate")
      .option("rowsPerSecond", "1")
      .load()
      .writeStream
      .trigger(Trigger.ProcessingTime("10 seconds"))
      .foreachBatch {(batchDF: DataFrame, batch: Long) => {
         spark.read.orc(path)
}
}(ec)

Мы каждые 10 секунд читаем таблицу, но в том случае, если у нас этот расчёт упадёт, он за собой потащит все остальные расчёты, и приложение вообще рухнет. Это нежелательное поведение, и нам нужно его предусмотреть.

Есть параметр recoverWith. Он позволяет в случае, если упал конкретный Future по какой-либо ошибке, сказать, как это обрабатывать. Например, мы можем заблокировать данный поток на 10 минут и через 10 минут запустить его снова. Мы рассчитываем, что за это время всё восстановится и будет работать.

// Метода запуска расчёта

def retry(path: String)(ec: ???, fjP: ???) = Future {
  starter(path)(ec).recoverWith {
    case ex => Future {
      blocking {Thread.sleep(600000)}
              }(ecTopic)
.flatMap(_ => retry(path)(ec, fjP))(ec)}

Можно поставить какой-нибудь счётчик, что через n запусков он перестанет перезапускаться.

Здесь есть два неизвестных параметра (ec и fgp) — о них расскажу дальше.

Запускаем-перезапускаем

Для управления расчётами пишем функцию, которая сравнивает текущие и новые задачи. То, что у нас есть в новом списке, необходимо запустить. То, чего там нет, нужно остановить.

// Метод запуска расчёта
// Метод перезапуска расчёта
def startALL(oldList: TrieMap[Int, Int], newList: TrieMap[Int, Int]) = {
  val diff: TrieMap[Int, Int] =
     newList.filter { case (k, v) => !oldList.get(k).contains(v) }
}

Получив данные списки, мы можем начать их обрабатывать.

// Метод запуска расчёта
// Метод перезапуска расчёта
def startALL(oldList: TrieMap[Int, Int], newList: TrieMap[Int, Int]) = {
  val diff: TrieMap[Int, Int] =
     newList.filter { case (k, v) => !oldList.get(k).contains(v) }
  val future: Future[Unit] =
    diff.map { value =>
      val fjP = new java.util.concurrent.ForkJoinPool(1)
    }
}

У нас появляется FJP (ForkJoinPool) — инструмент для реализации параллельных асинхронных вычислений. Он не блокирует другие потоки, при этом позволяет остановить конкретный Future. То есть, если у нас какой-то Future падает или нам необходимо его остановить, мы можем сказать, что конкретно этот FJP необходимо остановить — для этого есть метод shutdown.

// Метод запуска расчёта
// Метод перезапуска расчёта
def startALL(oldList: TrieMap[Int, Int], newList: TrieMap[Int, Int]) = {
  val diff: TrieMap[Int, Int] =
     newList.filter { case (k, v) => !oldList.get(k).contains(v) }
  val future: Future[Unit] =
    diff.map { value =>
      val fjP = new java.util.concurrent.ForkJoinPool(1)
      val ecTopic: ExecutionContextExecutor =
        ExecutionContext.fromExecutor(fjP)
    }
}

У нас появляется ExecutionContext — механизм для управления и распределения потоков выполнения, в который мы передаём ForkJoinPool.

Таким образом, передавая ForkJoinPool в ExecutionContext, в расчёте будет известно, что когда ForkJoinPool останавливается, нужно остановить ExecutionContext, и таким образом остановить расчёт конкретного Future.

С ExecutionContext мы получаем управление: если расчёт падает — это можно отследить и перезапустить, даже если он остановился планово. Для этого есть метод onComplete, где мы можем задать поведение, которые считаем правильным.

// Метод запуска расчёта
// Метод перезапуска расчёта
def startALL(oldList: TrieMap[Int, Int], newList: TrieMap[Int, Int]) = {
  val diff: TrieMap[Int, Int] =
     newList.filter { case (k, v) => !oldList.get(k).contains(v) }
  val future: Future[Unit] =
    diff.map { value =>
      val fjP = new java.util.concurrent.ForkJoinPool(1)
      val ecTopic: ExecutionContextExecutor =
        ExecutionContext.fromExecutor(fjP)
      val future = retry(topicName)(ecTopic, fjP)
    }
}

Можем задать даже перезапуск этого же расчёта, потому что плановый никогда не остановится.

Наконец, мы делаем такой же newScheduledThreadPool и запускаем его каждые x минут, чтобы проверять, а были ли у нас запущены новые потоки, остановились ли старые по ошибке, сравнивать между собой и совершать все необходимые действия.

// Метод запуска расчёта
// Метод перезапуска расчёта
// Метода запуска всех расчётов

val updCalc = Executors.newScheduledThreadPool(1)

updCalc.scheduleWithFixedDelay(startALL, 140, 120, TimeUnit.MINUTES)

Допустим, каждые два часа будем проверять, действительно ли у нас ничего не остановилось, действительно ли нам нужно запустить что-то новое.

Что даёт данный подход

  • Сильно сокращает время простоя. Если взять стандартный подход, то между перезапусками тасок будет достаточно длительное время простоя на переключение ресурсов. ForkJoinPool позволяет этого избежать благодаря тому, что контекст переключается сразу, практически мгновенно. Как только отбежал один поток, сразу же запускается следующий.

  • Значительно сократили расходы ресурсов. Мы снизили использование ресурсов с 600+ ядер и 600+ ГБ ОЗУ до 100 ядер и 200 ГБ.

Если взять стандартный подход, когда мы запускали на каждого оператора по отдельному приложению, нам потребовалось бы минимум 1GB на драйвер и 1GB на экзекьютор. На больших операторов требовалось бы значительно больше. На малых этого было бы много, и у нас был бы расход лишних ресурсов. Тут же мы смогли обойтись всего лишь 100 ядрами и 200GB, потому что нет простоев. Более того, у нас ещё остаётся запас. Мы должны отправлять отчёт каждые 15 минут, а время обработки 300 операторов составляет около 6−7 минут.

  • Ускорили отдельные расчёты. Также за счёт того, что у нас правильно определён repartition, у нас не случается проблем, что кто-то прислал много файлов, и из-за этого все остальные расчёты зависли в ожидании, когда обработаются эти десять тысяч тасок. На этапе чтения мы объединяем мелкие файлы, чтобы исключить задержки в других расчётах и избежать перегрузки экзекьюторов.

  • Уменьшили нагрузку на смежные системы. У нас используется один коннект с экзекьютора, и потом переиспользуется для каждого потока. Также есть общие переменные, которые никак не перерасчитываются лишний раз. Благодаря этому мы не ходим в базу каждые пять минут или пять секунд для каждого расчёта. Мы используем всё это один раз и удерживаем.

С чем ещё придётся столкнуться

  • Избыточное количество партиций. При избыточном количестве партиций вы действительно столкнётесь с огромными проблемами при расчёте большого количества потоков, потому что Spark распределяет задачи неравномерно, особенно при FIFO-планировщике. В этом случае Fair даёт более сбалансированную загрузку. Мы используем FIFO, потому что гарантированно должны отдавать данные каждые x времени. При использовании FIFO равномерного распределения не добиться. Если какой-то поток с большим количеством партиций решает отобрать все ресурсы, мы начинаем сильно тормозить. Благодаря этому на начальном этапе возник расчёт правильного репартицирования на основании данных, которые к нам пришли.

  • Необходимость настройки для каждого приложения. Настраивать параметры для каждого приложения можно с помощью с помощью spark.scheduler.pool. Таким образом в каждом потоке могут находиться разные конфигурации до определённой степени.

  • Требуется глубокое понимание бизнес-логики приложения и точности входящих данных. Это нужно не только для того, чтобы всё это правильно написать, но и чтобы понимать, когда вообще это стоит использовать, а когда нет, потому что вкидывать это куда угодно тоже не стоит.

Для того чтобы у вас была некая заготовка, я подготовил код, как это перерасчитывать и определять автоматически. Там код под мои потребности, но я постарался сделать его универсальным. В случае необходимости можете доработать.

Итоги

  • Интеграция spark с языком даёт преимущества.

  • Разработка ядра требует хорошего понимания Spark и выбранного языка программирования.

  • Отлично подходит для типовых расчётов.

23 и 24 июня в Санкт-Петербурге пройдёт профессиональная конференция разработчиков высоконагруженных систем Saint HighLoad++ 2025. Планируйте свое участие, расписание и полный список тем с тезисами уже на сайте.

Комментарии (0)