Привет, Habr!

Я работаю инженером по машинному обучению в МегаФоне. Занимаюсь аналитикой данных и являюсь частью команды разработки MLOps платформы. Задача нашей команды состоит в том, чтобы выстраивать и оптимизировать процессы разработки и продуктивизации моделей машинного обучения, предоставлять функционал для основных этапов (сбор данных, MQ/DQ, продуктивизация).

Сегодня поговорим о том, какие сложности могут возникать при работе с кластером Hadoop и какие есть методы для их решения (конфигурация, мониторинг, склеивание мелких файлов, память кластера)

Видеозапись по мотивам статьи здесь. Полезные ссылки здесь. Код здесь.

Эта статья будет интересна аналитикам и инженерам, которые работают с BigData и регулярно сталкиваются с необходимостью продуктивизировать модели на Hadoop.

Затронем следующие темы:

Вспомним, что Apache Hadoop («Хадуп») — это набор инструментов для построения системы работы с большими данными. Он предназначен для сбора, хранения и распределённой обработки непрерывного потока сотен терабайт информации.

Архитектура Hadoop

  1. Hadoop Common.
    Набор инфраструктурных программных библиотек и утилит.

  2. HDFS (Hadoop Distributed File System).
    Распределённая файловая система для хранения данных на различных узлах (встроено дублирование данных).

  3. YARN (Yet Another Resource Negotiator).
    Система планирования заданий и управления кластером.

  4. Hadoop MapReduce.
    Платформа, которая отвечает за MapReduce-вычисления с использованием nodes кластера.

Также в системе Hadoop есть многочисленные дополнительные компоненты, например, Hive - хранилище, которое позволяет запрашивать из HDFS большие наборы данных и создавать сложные задания MapReduce.

Настройка конфигурации спарка

Начнём с такой проблемы: несколько пользователей на кластере могут использовать все ресурсы, что может помешать работе остальных пользователей. Несмотря на то, что каждый пользователь ограничен в ресурсах, это ограничение установлено достаточно высоким для сложных задач.

Чтобы не прибегать к снижению лимитов, пользователи должны использовать настройки параметров pyspark (наиболее важные). Если неизвестно сколько нужно ресурсов на задачу, то можно воспользоваться базовой конфигурацией, постепенно увеличивая (или уменьшая) параметры.

conf = SparkConf()
conf.set('spark.dynamicAllocation.maxExecutors', '20')
conf.set('spark.executor.cores', '5')
conf.set('spark.executor.memory', '15g')
conf.set('spark.executor.memoryOverhead', '2g')
conf.set('spark.driver.memoryOverhead', '1g')
conf.set('spark.scheduler.mode', 'FAIR')
conf.set('spark.yarn.queue', f'root.users.{short_user_name}')
описание параметров

dynamicAllocation.maxExecutors - максимальное количество executor’ов при динамическом выделении ресурсов,
executor.cores – количество ядер для одного executor’а,
executor.memory – объём памяти для одного executor’а,
spark.executor.memoryOverhead - количество памяти вне очереди, которое может быть выделено для execitor’a (если установить слишком большое значение, вы можете опустить машину в своп, потому что эта память не управляется Ярном. Рекомендуемое значение: 1-2 Gb),
spark.driver.memoryOverhead – объем памяти, используемый каждым процессом драйвера Spark в кластерном режиме,
spark.scheduler.mode – метод планирования заданий,
spark.yarn.queue – имя очереди spark.

Для того чтобы не прописывать каждый раз указанную конфигурацию, можно завернуть ее во внутреннюю библиотеку.

При этом необходимо иметь настроенную конфигурацию динамической аллокации ресурсов:

  • spark.dynamicAllocation.enabled=true (динамическое выделение ресурсов),

  • spark.shuffle.service.enabled=true (сохранение данных на стадии shuffle),

  • spark.dynamicAllocation.executorIdleTimeout=120s (время ожидания executor до того, как забрать ресурсы),

  • spark.dynamicAllocation.cachedExecutorIdleTimeout=600s (время ожидания executor с закешированными данными),

  • spark.port.maxRetries=50 (число портов, которые будет проверять Spark перед стартом).

А также нужно следить в Running Applications за тем, сколько ресурсов кластера использует ваша задача, используя формулы ниже.

В таком случае, ваше задание будет ограничено и не будет отбирать ресурсы у коллег, они будут отрегулированы.

задание потребит чуть больше чем 
spark.dynamicAllocation.maxExecutors X spark.executor.cores процессоров
и не более чем spark.dynamicAllocation.maxExecutors X spark.executor.memory памяти в самом плохом случае.

Мониторинг нагрузки на кластер пользователями.

Имея доступ на Running applications, можно получить отчет с интересующими параметрами spark application.

Это возможно реализовать с использованием YARN и python библиотек. Подробнее здесь.

import subprocess
import requests
import re
import pandas as pd
from bs4 import BeautifulSoup

При помощи команды выводим список всех работающих приложений,

cmd = yarn application –list
applications = subprocess.check_output(cmd, shell = True).splitlines()

далее используем команду получения статуса приложения

cmd = yarn application –status <app_id>
status  = subprocess.check_output(cmd, shell = True).splitlines()

По статусу можно узнать в том числе и дату старта приложения.
Для приложений типа SPARK запросом request обращаемся к YARN Resource manager UI и собираем информацию о задачах.

site_res = requests.get("http://msk-hdp----------- /proxy/"+app_id+"/environment/", auth=(user_name, password)).text
site_jobs = requests.get("http://msk-hdp-----------/proxy/"+app_id+"/", auth=(user_name, password)).text
pandas dataframe с выводом мониторинга
pandas dataframe с выводом мониторинга (скрыты id)

Некоторые приложения могут работать, не выполняя никаких задач. По дате последней исполненной задачи приложения можно определять время последней активности приложения от текущего дня.

jobs_info = BeautifulSoup(site_jobs, 'lxml').find_all('table', attrs={'id': 'completedJob-table'}) 

Могут быть приложения, запущенные длительное время назад, но не выполняющие никаких действий. Их необходимо останавливать. Например, отправлять нотификацию на остановку при превышении продолжительности 7 дней или использовать yarn application –kill <app_id>.

Таким образом, при помощи обращения к UI можно настроить правила email нотификации, остановки и другие действия по триггерам.

Эти обращения можно завернуть в docker и поставить на расписание в airflow.

Проблема мелких файлов на Hadoop

При работе с кластером очень важно следить за тем, как сохраняются файлы на дисковом пространстве при сохранении данных.

В HDFS каждый файл хранится в блоке распределённой файловой системы размером 64 или 128 Мб, а метаданные блока хранятся в памяти узла имён в виде объекта.

Файлы и блоки являются объектами имен в HDFS и занимают место в пространстве имен.

hdfs
hdfs

Это означает, что емкость пространства имен системы ограничена физической памятью узла имен. Поэтому HDFS плохо работает с большим количеством мелких файлов.

Примерный размер для хранения каждого такого объекта равен 150 байт. Поэтому небольшие файлы в HDFS хранить неэффективно: много мелких файлов будут занимать много места на NameNode, больше, чем требуется для хранения их содержимого.

Что это значит? Например, файл с размером 1 Мб хранится с размером блока 128 Мб, два файла размером по 1 Мб будут храниться уже в двух блоках и занимать 300 байт на NameNode, хотя могли бы поместиться в 150.

В результате, могут быть следующие последствия:

  • Неэффективное заполнение дискового пространства

  • замедление скорости выполнения заданий,

  • рост рабочей нагрузки,

  • падение производительности.

Когда может возникнуть большое число файлов? При записи в таблицу Hive с динамическим разделением каждый раздел обрабатывается параллельно и открывает новый файл на каждый новый ключ раздела.

Если для конфигурации spark.sql.shuffle.partitions установлено значение 200, а выражение Partition By используется для загрузки, в 50 целевых разделов, будет 200 задач загрузки, каждая из которых потенциально может загружаться в 50 разделов.
Каждая загрузка создаст около 200*50=10000 файлов.

spark.sql.shuffle.partitions

spark.sql.shuffle.partitions настраивает количество партиций в момент перетасовки данных для (объединений) join или агрегаций.

Вот как это будет выглядеть у вас, каждый файл будет занимать блок и место на NameNode:

много мелких файлов
много мелких файлов

Справиться с большим числом файлов при сохранении помогут методы партиционирования.

партиционирование
партиционирование

Партицирование (partitioning) — это процесс разбиения данных на части, в том числе, на основе уникальных значений столбцов. Когда создается фрейм данных из чтения файла или на основе другого фрейма, Spark создает определенное число партиций в оперативной памяти.

Это одно из преимуществ Spark над Pandas. Преобразования на партициях будут выполняться быстрее за счет параллельных вычислений.

Apache spark поддерживает два вида партиций (в оперативной памяти и на диске):

  1. Партиция в памяти выполняется с помощью вызовов repartition или coalesce.
    repartition используется для увеличения или уменьшения партиций при помощи перетасовки всех данных, а coalesce — только для уменьшения числа партиций наиболее эффективным образом на основе существующих.

  2. Партиция на диске выполняется с помощью вызова partitionBy (это аналогично партициям в Hive).   

DataFrame.repartition(numPartitions, cols)возвращает новый DataFrame, разделенный по хешу. Целочисленный параметр numPartitions указывает целевое количество разделов. Строковый параметр cols задает столбец, по которому нужно разделить данные.

Для корректного сохранения финальных таблиц нужно использовать следующий код. Здесь сначала идет repartition на партиционирование данных в памяти и потом уже partition by для партиционирования на диске.

Таким образом, можно контролировать сохраняемое число файлов.

sdf.repartition(num_files_partitions, partition_columns) 
    .write 
    .partitionBy(partition_columns) 
    .mode(mode) 
    .format(wr_format) 
    .saveAsTable(table_name)

Можно заменить на repartition(num_files_partitions) или coalesce(num_files_partitions).

Перед сохранением нужно проверять число уникальных значений в колонке репартиционирования. В случае если репартиционирование будет произведено по уникальному ключу с миллионом значений может произойти падение NameNode (падение кластера и потеря данных).

Простой способ проверить уникальность по колонке – это distinct().count().

 

репартиционирование по уникальному ключу
репартиционирование по уникальному ключу

Для контроля мелких файлов на hdfs можно также настроить нотификацию на конкретные папки файловой системы, например, при помощи bash команд.
Такая команда выводит список объектов в директории /user/ с занимаемой памятью и числом файлов в порядке убывания числа файлов, список отсортирован по второму аргументу:

import sys
import subprocess

cmd = """
hdfs dfs -count /user/ | egrep -w -v "yarn|hive|spark" | sort -k2 -nr |
awk '{ if (2) "\t\t" int(3/(10241024$2)) " Mb \t \t" $4}'
"""
shell_cmd = True if isinstance(cmd, str) else False
s_output, s_err = subprocess.Popen(
  cmd, 
  stdout=subprocess.PIPE, 
  stderr=subprocess.PIPE,
  shell = shell_cmd,
  universal_newlines=True
).communicate()

sys.stdout.write(s_output)


Вывод из командной строки можно преобразовать в pandas dataframe и отправить в html письме, а также поставить на расписание регулярную нотификацию при помощи airflow с сообщением о числе файлов в директории.

Склеиватель (Spark gluer) файлов на hdfs.

spark gluer
spark gluer

Если нужно поправить уже существующую таблицу, то можно использовать скрытые методы spark для реализации склеивателя файлов на hdfs (spark gluer).

Кажется, что это простая задача, но на самом деле это не так.

Использование скрытых методов spark не является распространнённой практикой.
При помощи обращения к файловой системе можно подсчитывать, удалять и перемещать файлы.

Полный код здесь.

Для обращения к hdfs используйте код:

FileSystem = spark._jvm.org.apache.hadoop.fs.FileSystem
Path = spark._jvm.org.apache.hadoop.fs.Path
FileUtil = spark._jvm.org.apache.hadoop.fs.FileUtil
hadoop_conf = spark.sparkContext._jsc.hadoopConfiguration()
fs = FileSystem.get(hadoop_conf)
string_path = "hdfs://service…/user/…/hive/megapowers_apply_click_monthly/month=2022-12"
first_path = Path(string_path)

# Суммарный размер директории в байтах.
fs.getContentSummary(first_path).getLength()

# Суммарное число файлов в директории.
fs.getContentSummary(first_path).getFileCount()

Для «склеивания» файлов можно пересохранение в tmp папку и выполнить перемещение файлов из временной директории в первоначальную.

Для перемещения файлов нужно использовать метод copy.

it = fs.listFiles(first_path, False)

    while it.hasNext():
        part = it.next()
        to_delete = Path(part.getPath().toString())
        fs.delete(to_delete)
        
    it = fs.listFiles(tmp_path, False)

    while it.hasNext():
        part = it.next()
        to_move = Path(part.getPath().toString())
        FileUtil.copy(fs, to_move, fs, first_path, True, hadoop_conf)
spark gluer
spark gluer

Память кластера (alter table, hdfs cleaner)

Итак, мы затронули методы репартиционирования для сохранения стабильности работы кластера. Важно также помнить о контроле количества занимаемой памяти при регулярном сохранении таблиц для скорингов. 

При сохранении таблицы можно добавить партицию на дату, проверять ее глубину и очищать партицию, которая превышает глубину хранения таблицы при помощи spark.sql запроса:

drop_retro_script = f""" ALTER TABLE {table_name}
          DROP IF EXISTS PARTITION({date_partition_name}='{partition}')
                                    """
            
spark.sql(drop_retro_script)

Если таблицы уже сохраняются без ограничения глубины хранения, то можно настроить автоматизированную очистку при помощи airflow и bash скрипта.

now=$(date +%s);
days_to_keep=365;
hdfs dfs -ls /…/…/…/…/ | while read f; do
file_date_l1=echo $f | awk '{print $6}';
file_name_l1=echo $f | awk '{print $8}';
difference=$(( ($now - $(date -d "$file_date_l1" +%s)) / (24 * 60 * 60) ));
    if [ $difference -gt $days_to_keep ]; then
      echo "Found $file_name_l1 it is older than $days_to_keep and is dated $file_date_l1.";
      hdfs dfs -rm -r $file_name_l1;

Этот скрипт выводит все объекты в папке, определяет время создания объекта и очищает при превышении глубины хранения.

Подведём итоги:

  1. Для регулирования нагрузки на кластер необходимо настраивать конфигурацию спарка.

  2. Можно реализовать мониторинг определённых триггеров на spark application методами python.

  3. Проблема мелких файлов hadoop решается методами repartition, coalesce, partition by.

  4. Возможность обращения к скрытым методом spark позволяет редактировать уже существующие данные и реализовать склеиватель на hdfs.

  5. При помощи hdfs dfs можно контролировать использование памяти на кластере.

Спасибо за внимание.
Ниже публикуется список полезных источников для изучения.

Список источников и полезных ссылок:

Конфигурирование YARN на Spark 
гитхаб с кодом (spark gluer)
Архитектура Hadoop
fair-jobs-scheduling-apache-spark
Мониторинг spark apllication
Проблема мелких файлов на Hadoop.
PartitionBy
coalesce-vs-repartition-in-spark
repartition-vs-coalesce2
SparkMethodsFileSystem      
SparkMethodsFileUtil
apache-spark-best-practices-and-tuning

Поделитесь в комментариях, с какими проблемами вы сталкиваетесь при работе с кластером и как их решаете.

Комментарии (0)