Привет! Мы обучаем людей работе с большими данными. Невозможно себе представить образовательную программу по большим данным без своего кластера, на котором все участники совместно работают. По этой причине на нашей программе он всегда есть :) Мы занимаемся его настройкой, тюнингом и администрированием, а ребята непосредственно запускают там MapReduce-джобы и пользуются Spark'ом.


В этом посте мы расскажем, как мы решали проблему неравномерной загрузки кластера, написав свой автоскейлер, используя облако Mail.ru Cloud Solutions.


Проблема


Кластер у нас используется не совсем в типичном режиме. Утилизация сильно неравномерная. Например, есть практические занятия, когда все 30 человек и преподаватель заходят на кластер и начинают им пользоваться. Или опять же есть дни перед дедлайном, когда загрузка сильно возрастает. Во все остальное время кластер работает в режиме недозагрузки.


Решение №1 – это держать кластер, который будет выдерживать пиковые загрузки, но будет простаивать во все остальное время.


Решение №2 – это держать небольшой кластер, в который вручную добавлять ноды перед занятиями и во время пиковых нагрузок.


Решение №3 – это держать небольшой кластер и написать автоскейлер, который будет следить за текущей загрузкой кластера и сам, используя различные API, добавлять и удалять ноды из кластера.


В этом посте мы будем говорить о решении №3. Такой автоскейлер сильно зависит от внешних факторов, а не от внутренних, и провайдеры его часто не предоставляют. Мы пользуемся облачной инфраструктурой Mail.ru Cloud Solutions и написали автоскейлер, используя API MCS. А так как мы обучаем работе с данными, решили показать, как вы можете написать подобный автоскейлер для своих целей и использовать со своим облаком


Prerequisites


Во-первых, у вас должен быть Hadoop-кластер. Мы, например, пользуемся дистрибутивом HDP.


Чтобы у вас ноды могли быстро добавляться и удаляться, у вас должно быть определенное распределение ролей по нодам.


  1. Мастер-нода. Ну тут пояснять ничего особенно не надо: главная нода кластера, на которой запускается, например, драйвер Spark'а, если вы пользуетесь интерактивным режимом.
  2. Дата-нода. Это нода, на которой у вас хранятся данные на HDFS и на ней же происходят вычисления.
  3. Вычислительная нода. Это нода, на которой у вас ничего не хранится на HDFS, но на ней происходят вычисления.

Важный момент. Автоскейлинг будет происходить за счет нод третьего типа. Если вы начнете забирать и добавлять ноды второго типа, то скорость реагирования будет сильно низкой – декомишен и рекомишен будет занимать часы на вашем кластере. Это, конечно, не то, что ожидаешь от автоскейлинга. То есть ноды первого и второго типа мы не трогаем. Они будут представлять собой минимально жизнеспособный кластер, который будет существовать на протяжении всего действия программы.


Итак, наш автоскейлер написан на Python 3, использует Ambari API для управления сервисами кластера, использует API от Mail.ru Cloud Solutions (MCS) для запуска и остановки машин.


Архитектура решения


  1. Модуль autoscaler.py. В нем прописаны три класса: 1) функции для работы с Ambari, 2) функции для работы с MCS, 3) функции, связанные непосредственно с логикой работы автоскейлера.
  2. Скрипт observer.py. По сути состоит из разных правил: когда и в какие моменты вызывать функции автоскейлера.
  3. Файл с конфигурационными параметрами config.py. Там содержится, например, список нод, разрешенных для автоскейлинга и другие параметры, влияющие, например, на то, сколько времени подождать с того момента, когда была добавлена новая нода. Там же находятся еще и таймстемпы начала занятий, чтобы перед занятием была запущена максимальная разрешенная конфигурация кластера.

Давайте теперь посмотрим на куски кода, находящиеся внутри первых двух файлов.


1. Модуль autoscaler.py


Класс Ambari


Так выглядит кусочек кода, содержащий класс Ambari:


class Ambari:
    def __init__(self, ambari_url, cluster_name, headers, auth):
        self.ambari_url = ambari_url
        self.cluster_name = cluster_name
        self.headers = headers
        self.auth = auth

    def stop_all_services(self, hostname):
        url = self.ambari_url + self.cluster_name + '/hosts/' + hostname + '/host_components/'
        url2 = self.ambari_url + self.cluster_name + '/hosts/' + hostname
        req0 = requests.get(url2, headers=self.headers, auth=self.auth)
        services = req0.json()['host_components']
        services_list = list(map(lambda x: x['HostRoles']['component_name'], services))
        data = {
            "RequestInfo": {
                "context":"Stop All Host Components",
                "operation_level": {
                    "level":"HOST",
                    "cluster_name": self.cluster_name,
                    "host_names": hostname
                },
                "query":"HostRoles/component_name.in({0})".format(",".join(services_list))
            },
            "Body": {
                "HostRoles": {
                    "state":"INSTALLED"
                }
            }
        }
        req = requests.put(url, data=json.dumps(data), headers=self.headers, auth=self.auth)
        if req.status_code in [200, 201, 202]:
            message = 'Request accepted'
        else:
            message = req.status_code
        return message

Выше для примера можно посмотреть на реализацию функции stop_all_services, которая останавливает все сервисы на нужной ноде кластера.


На вход классу Ambari вы передаете:


  • ambari_url, например, вида 'http://localhost:8080/api/v1/clusters/',
  • cluster_name – название вашего кластера в Ambari,
  • headers = {'X-Requested-By': 'ambari'}
  • и внутри auth лежит ваш логин и пароль от Ambari: auth = ('login', 'password').

Сама функция представляет из себя не более чем парочку обращений через REST API к Ambari. С точки зрения логики мы вначале получаем список запущенных сервисов на ноде, а затем просим на данном кластере, на данной ноде перевести сервисы из списка в состояние INSTALLED. Функции по запуску все сервисов, по переводу нод в состояние Maintenance и др. выглядят похожим образом – это просто несколько запросов через API.


Класс Mcs


Так выглядит кусочек кода, содержащий класс Mcs:


class Mcs:
    def __init__(self, id1, id2, password):
        self.id1 = id1
        self.id2 = id2
        self.password = password
        self.mcs_host = 'https://infra.mail.ru:8774/v2.1'

    def vm_turn_on(self, hostname):
        self.token = self.get_mcs_token()
        host = self.hostname_to_vmname(hostname)
        vm_id = self.get_vm_id(host)
        mcs_url1 = self.mcs_host + '/servers/' + self.vm_id + '/action'
        headers = {
            'X-Auth-Token': '{0}'.format(self.token),
            'Content-Type': 'application/json'
        }
        data = {'os-start' : 'null'}
        mcs = requests.post(mcs_url1, data=json.dumps(data), headers=headers)
        return mcs.status_code

На вход классу Mcs мы передаем id проекта внутри облака и id пользователя, а также его пароль. В функции vm_turn_on мы хотим включить одну из машин. Логика здесь чуть сложнее. В начале кода идет вызов трех других функций: 1) нам нужно получить токен, 2) нам нужно конвертировать hostname в название машины в MCS, 3) получить id этой машины. Далее мы делаем просто post-запрос и запускаем эту машину.


Так выглядит сама функция по получению токена:


def get_mcs_token(self):
        url = 'https://infra.mail.ru:35357/v3/auth/tokens?nocatalog'
        headers = {'Content-Type': 'application/json'}
        data = {
            'auth': {
                'identity': {
                    'methods': ['password'],
                    'password': {
                        'user': {
                            'id': self.id1,
                            'password': self.password
                        }
                    }
                },
                'scope': {
                    'project': {
                        'id': self.id2
                    }
                }
            }
        }
        params = (('nocatalog', ''),)
        req = requests.post(url, data=json.dumps(data), headers=headers, params=params)
        self.token = req.headers['X-Subject-Token']
        return self.token

Класс Autoscaler


В этом классе содержатся функции, относящиеся к самой логике работы.


Так выглядит кусок кода этого класса:


class Autoscaler:
    def __init__(self, ambari, mcs, scaling_hosts, yarn_ram_per_node, yarn_cpu_per_node):
        self.scaling_hosts = scaling_hosts
        self.ambari = ambari
        self.mcs = mcs
        self.q_ram = deque()
        self.q_cpu = deque()
        self.num = 0
        self.yarn_ram_per_node = yarn_ram_per_node
        self.yarn_cpu_per_node = yarn_cpu_per_node

    def scale_down(self, hostname):
        flag1 = flag2 = flag3 = flag4 = flag5 = False
        if hostname in self.scaling_hosts:
            while True:
                time.sleep(5)
                status1 = self.ambari.decommission_nodemanager(hostname)
                if status1 == 'Request accepted' or status1 == 500:
                    flag1 = True
                    logging.info('Decomission request accepted: {0}'.format(flag1))
                    break
            while True:
                time.sleep(5)
                status3 = self.ambari.check_service(hostname, 'NODEMANAGER')
                if status3 == 'INSTALLED':
                    flag3 = True
                    logging.info('Nodemaneger decommissioned: {0}'.format(flag3))
                    break
            while True:
                time.sleep(5)
                status2 = self.ambari.maintenance_on(hostname)
                if status2 == 'Request accepted' or status2 == 500:
                    flag2 = True
                    logging.info('Maintenance request accepted: {0}'.format(flag2))
                    break
            while True:
                time.sleep(5)
                status4 = self.ambari.check_maintenance(hostname, 'NODEMANAGER')
                if status4 == 'ON' or status4 == 'IMPLIED_FROM_HOST':
                    flag4 = True
                    self.ambari.stop_all_services(hostname)
                    logging.info('Maintenance is on: {0}'.format(flag4))
                    logging.info('Stopping services')
                    break
            time.sleep(90)
            status5 = self.mcs.vm_turn_off(hostname)
            while True:
                time.sleep(5)
                status5 = self.mcs.get_vm_info(hostname)['server']['status']
                if status5 == 'SHUTOFF':
                    flag5 = True
                    logging.info('VM is turned off: {0}'.format(flag5))
                    break
            if flag1 and flag2 and flag3 and flag4 and flag5:
                message = 'Success'
                logging.info('Scale-down finished')
                logging.info('Cooldown period has started. Wait for several minutes')
        return message

На вход мы принимаем классы Ambari и Mcs, список нод, которые разрешены для скейлинга, а также параметры конфигурации нод: память и cpu, выделенные на ноду в YARN. Также есть 2 внутренних параметра q_ram, q_cpu, являющихся очередями. При помощи них мы храним значения текущей нагрузки кластера. Если мы видим, что в течение последних 5 минут стабильно была повышенная нагрузка, то мы принимаем решение о том, что нужно добавить +1 ноду в кластер. То же самое справедливо и для состояния недозагрузки кластера.


В коде выше приведен пример функции, которая удаляет машину из кластера и останавливает ее в облаке. Вначале происходит декомишен YARN Nodemanager, дальше включается режим Maintenance, дальше мы останавливаем все сервисы на машине и выключаем виртуальную машину в облаке.


2. Скрипт observer.py


Пример кода оттуда:


if scaler.assert_up(config.scale_up_thresholds) == True:
        hostname = cloud.get_vm_to_up(config.scaling_hosts)
        if hostname != None:
            status1 = scaler.scale_up(hostname)
            if status1 == 'Success':
                text = {"text": "{0} has been successfully scaled-up".format(hostname)}
                post = {"text": "{0}".format(text)}
                json_data = json.dumps(post)
                req = requests.post(webhook, data=json_data.encode('ascii'), headers={'Content-Type': 'application/json'})
                time.sleep(config.cooldown_period*60)

В нем мы проверяем, сложились ли условия для увеличения мощностей кластера и есть ли вообще в резерве машины, получаем хостнейм одной из них, добавляем ее в кластер и публикуем об этом сообщение в Slack нашей команды. После чего запускается cooldown_period, когда мы ничего не добавляем и не убираем из кластера, а просто мониторим загрузку. Если она стабилизировалась и находится внутри коридора оптимальных значений загрузки, то мы просто продолжаем мониторинг. Если же одной ноды не хватило, то добавляем еще одну.


Для случаев когда у нас впереди занятие, мы уже знаем наверняка, что одной ноды не хватит, поэтому мы сразу стартуем все свободные ноды и держим их активными до конца занятия. Это происходит при помощи списка таймстемпов занятий.


Заключение


Автоскейлер – это хорошее и удобное решение для тех случаев, когда у вас наблюдается неравномерная загрузка кластера. Вы одновременно добиваетесь нужной конфигурации кластера под пиковые нагрузки и при этом не держите этот кластер во время недозагрузки, экономя средства. Ну и плюс это все происходит автоматизированно без вашего участия. Сам автоскейлер – это не более, чем набор запросов к API кластер-менеджера и API облачного провайдера, прописанных по определенной логике. О чем точно нужно помнить – это о разделении нод на 3 типа, как мы писали ранее. И будет вам счастье.

Комментарии (3)


  1. sshikov
    27.12.2019 12:27

    Честно говоря, прочитал и не понял самое главное — ну хорошо, вычислительных нод добавили динамически. Но откуда прирост производительности возьмется в общем случае? Данных на этих нодах нет, соответственно, данные для обработки туда сначала нужно притащить — а это нагрузка на сеть. Какая вообще нагрузка, и каким образом, будет туда распределяться, если данные остались там же где и были?


    1. a-pichugin Автор
      27.12.2019 13:04

      Да, это правда важный момент. Нагрузка на сеть увеличивается, потому что данные под конкретную джобу копируются на соответствующие ноды. Поделиться данными по загруке не могу, так как кластер уже погасили.


  1. lev
    31.12.2019 00:10

    Почему не AWS со spot-ами? Там это есть «из коробки».