Автор статьи: Дмитрий Володин

Analytics Engineer в Trafficstars

Всем привет. Меня зовут Дмитрий Володин, я Analytics Engineer в Trafficstars. Когда‑то я, как и все, был начинающим аналитиком данных на R и передо мной на этом этапе остро встал вопрос запуска моего кода по расписанию. Ещё желательно было иметь возможность запускать откуда угодно. А уж запускать «кнопочкой play» вообще казалось мечтой.

Эта статья нацелена на начинающих аналитиков данных (не обязательно на R). Также она может быть полезна и опытным аналитикам. Особенно если плохо с коммуникацией с разработчиками и инфраструктурщиками (или их просто нет у вас в компании). Я пересказываю свой опыт, но кажется, он будет весьма полезен читателям.

Простые способы

Когда говорят про выполнение команд по расписанию, то сразу вспоминают cron. Инструмент с историей и огромным влиянием. Но всё‑таки не такой гибкий, как некоторые другие способы.

Cron — утилита linux и macos, Windows и его Task Scheduler я не буду рассматривать просто по тому, что в целом у них с кроном примерно те же болячки и недобства (и у меня нет с ним опыта). В R можно использовать пакет taskscheduleR, там и add‑in для RStudio есть.

Чтобы запустить скрипт по расписанию, вам необходимо отредактировать файл (написав в терминале команду crontab -e), где хранится расписание команд у вас в системе и написать там довольно простую строчку (часто есть простая подсказка в самом файле, но зависит от системы):

m h  dom mon dow   command

Так как cron‑нотация используется почти везде, где надо выполнять команды по расписанию, разберём подробнее, что здесь есть что:

  • m — на какой минуте (от 0 до 59)

  • h — в который час (от 0 до 23)

  • dom — в какой по номеру (1–31) день месяца

  • mon — в какой месяц (1–12)

  • dow — в какой день недели (0–6, где 0 — это воскресенье).

  • будет выполняться ваша command

В качестве значений можно использовать не только числа, но и специальные символы:

  • * — означает любое значение (в каждую минуту или месяц);

  • n‑m — означает диапазон значений от n до m (включительно);

  • /k — означает шаг (каждый k‑ый день месяца);

  • список допустимых значений можно передавать через запятую.

Также если явно указаны значения для dom и dow, то команда будет выполняться, когда выпадает подходящее значения для хотя бы одного из них. То есть запись 30 11 1 * 0 перед командой будет означать «выполняй следующую команду в 11:30 каждый месяц первого числа или каждое воскресенье». Выходит вместе они работают как или, а не как и (но есть нюансы). Вообще вот этот сайт в помощь, крайне полезный.

Для работы с cron в R есть пакет cronR, с помощью которого можно как ставить задачи в расписание, так и работать с уже имеющимися. У него есть и add‑in для RStudio

Развитие cron

Cron — прекрасный инструмент, но лишён многих полезных вещей. Например, мониторинг и алёртинг не так легко сюда прикрутить. Логи централизованно хранить и удобно читать тоже возможно только с использованием дополнительных инструментов (например ELK стек). Также выполнение привязано к конкретной машине. И если машина не работает, то и ваше расписание тоже. И ещё на много весёлого можно наткнуться (права на выполнение, переменные окружения, относительные пути).

Развитие cron получил в виде различных утилит, сервисов и их компонентов. Например: scheduled pipelines в Gitlab, cronjob в Kubernetes, Airflow. Последний является самой распространённой технологией для ETL и вообще оркестрации аналитических (и дата инженерных) задач. Его я далее и буду рассматривать. Хотя искушённый читатель может и удивиться, ведь если речь о языках программирвоания, то в Airflow можно запускать только Python код, но я этой статьёй это заблуждение развенчаю.

Кроме знания R вам для реализации подхода понадобятся минимальные знания Docker. Мы будем как запускать контейнеры из готовых образов, так и создавать образы самостоятельно. Почитать документацию всегда полезно (и постоянно к ней возвращаться совершенно не стыдно). Также я крайне рекомендую (потому что требовать не могу) использовать систему контроля версий (например git).

Перед дальнейшим чтением желательно установить Docker desktop. И будет ещё лучше иметь виртуальную машину на линукс. Благо с распространением облачных сервисов это не сложно. А можно и попросить у своих коллег из соответсвующего департамента. Потом ваши наработки помогут вам вместе поставить всё на прочные рельсы.

Ссылка на скачивание Docker desktop.
Документация по утилите командной строки docker
Документация по Dockerfile
Документация по docker‑compose файлу

Контейнеризация

Заглавная мысль статьи — запуск кода где и когда угодно. И если разработчики в этот момент скажут, ну понятно же всё, заворачиваем код в Docker образ, то для начинающих аналитиков это может быть не так очевидно.

Коротко про контейнеры. Рассматривайте их как изолированные от окружения хоста виртуальные машины. Это не совсем правильное описание, зато наиболее понятное. Запуская на своём ноутбуке контейнер, собранный из вот такого образа (rocker/tidyverse), вы получаете отдельную систему на линукс с установленным R и пакетом tidyverse. В простом запуске нет особого смысла (если это не готовый сервис, который работает сам по себе), потому можно передать команду, которая выполнится в контейнере прямо в момент запуска.

docker run rocker/tidyverse R -e print\('"Hello, world!"'\)

Такая команда создаст контейнер из образа rocker/tidyverse, запустит и его и в нём выполнит команду R -e print('"Hello, world!"'). Думаю, теперь понятно, куда я клоню: нам надо подготовить образ, в котором можно выполнить нужную команду.

Если образ — это подготовленный к непосредственному запуску стартовый шаблонный контейнер, то Dockerfile — инструкция по созданию образа. Его‑то и надо написать в нашей папке с кодом. Я приведу пример очень простого Dockerfile, без какой‑либо конкретики.

# Стартовый образ из которого мы будем делать свой.
# Я использую именно этот, потому что там есть почти всё необходимое,
# особенно это касается linux библиотек
FROM rocker/tidyverse

# Тут можно объявить переменную, значение которой будет определяться
# на стадии сборки образа. Полезно для передачи логинов и паролей
ARG VERY_SECRET_VARIABLE

# Тут мы значение переменной присвоим переменной окружения, которая
# будет доступна в любом контейнере, созданном на основе этого образа
ENV VERY_SECRET_VARIABLE=$VERY_SECRET_VARIABLE

# Тут можно выполнить любые команды, доступные в исходной для образа версии линукс
# Здесь я, например, устанавливаю пакеты
RUN R -e "install.packages(с('lgr', 'gt'))"

# Если необходимо выполнить несколько команд, то для каждой надо писать отдельно RUN
# Здесь я создаю папку для своего кода в образе
RUN mkdir /home/my_cool_r_project

# Специальная команда для копирования файлов с хоста в образ
# Тут я копирую код из папки src, которая находится в той же папке, что и
# Dockerfile, и помещаю её в образ по указанному адресу
COPY src /home/my_cool_r_project

Я не буду здесь касаться сложных материй вроде ENTRYPOINT и CMD. Наша задача крайне проста — создать Docker образ с нашим кодом. Чтобы потом этот код запускать из любого утюга.

Собираем образ простой командой docker build. Флаг ‑t отвечает за название образа (my_cool_r_project). Точка в конце — относительный путь к папке с докерфайлом (в данном случае буквально: он здесь и лежит, где команда выполняется). Файл должен называться Dockerfile.

docker build -t my_cool_r_project .

Итак, код готов и запакован в образ. Осталось его только поставить в расписание? Нет, для начала его надо отправить в реестр контейнеров. Но сначала надо в нём зарегистрироваться. Для тестовых целей можно воспользоваться docker hub. Для этого надо будет сначала там зарегистрироваться, а потом залогиниться в docker desktop (а после этого надо будет это сделать и в терминале).

docker login
docker push my_cool_r_project

В вашем личном реестре в докер хабе после этого появится только что созданный образ. И оттуда его можно забирать с помощью DockerOperator в Airflow.

Airflow

На самом деле запустить Airflow не так просто, как кажется. По туториалу на сайте можно запустить только урезанную версию и DockerOperator там использовать совершенно невозможно. Потому я буду пользоваться готовым docker compose файлом с некоторыми доработками для использования DockerOperator (да, это docker‑in‑docker).

Я рассматриваю следующую конфигурацию Airflow как тестовую, но имеющую ряд недостатков. Особенно в части безопасности. Но в учебных целях нам это подходит как раз.

Сама папка, откуда всё будем запускать, имеет следующую структуру:

Папку dags советую сосздать сразу. Остальные будут созданы автоматически. Сейчас нас интересует Dockerfile и requirements.txt. В последнем пишутся все python библиотеки, которые могут понадобится в нашей работе. В данном примере там только apache‑airflow‑providers‑docker.

Текст докерфайла:

FROM apache/airflow:2.3.3

# Install Docker
RUN pip install Docker

USER root
ARG DOCKER_GROUP_ID

# Add permissions for running docker.sock
RUN groupadd -g $DOCKER_GROUP_ID docker && gpasswd -a airflow docker

USER airflow
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

Дополнив базовый образ airflow некоторыми расширениями прав и библиотеками, мы сможем пользоваться докер демоном на хосте из контейнера эйрфлоу воркера в контейнере.

version: '3'
# Описание общих настроек для всех airflow-контейнеров
x-airflow-common:
  &airflow-common
  # Здесь собираем образ по инструкции из локального Dockerfile
  build:
    context: .
    args:
      DOCKER_GROUP_ID: 999
  environment:
    &airflow-common-env
    AIRFLOW__CORE__EXECUTOR: CeleryExecutor
    AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
    AIRFLOW__CORE__ENABLE_XCOM_PICKLING: True
    # For backward compatibility, with Airflow <2.3
    AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
    AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
    AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
    AIRFLOW__CORE__FERNET_KEY: ''
    AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
    AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
    AIRFLOW__API__AUTH_BACKENDS: 'airflow.api.auth.backend.basic_auth'
    _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-}
  # Биндим папки на хосте с папками в airflow контейнерах
  volumes:
    - ./dags:/opt/airflow/dags
    - ./logs:/opt/airflow/logs
    - ./plugins:/opt/airflow/plugins
    # Обязательное условие, иначе докер демоном хоста невозможно будет воспользоваться
    - /var/run/docker.sock:/var/run/docker.sock
  user: 0:0
  depends_on:
    &airflow-common-depends-on
    redis:
      condition: service_healthy
    postgres:
      condition: service_healthy

# Не эйрфлоу сервисы довольно стандартные
services:
  postgres:
    image: postgres:13
    environment:
      POSTGRES_USER: airflow
      POSTGRES_PASSWORD: airflow
      POSTGRES_DB: airflow
    volumes:
      - postgres-db-volume:/var/lib/postgresql/data
    healthcheck:
      test: ["CMD", "pg_isready", "-U", "airflow"]
      interval: 5s
      retries: 5
    restart: always

  redis:
    image: redis:latest
    expose:
      - 6379
    healthcheck:
      test: ["CMD", "redis-cli", "ping"]
      interval: 5s
      timeout: 30s
      retries: 50
    restart: always

  airflow-webserver:
    <<: *airflow-common
    command: webserver
    ports:
      - 8080:8080
    healthcheck:
      test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      airflow-init:
        condition: service_completed_successfully

  airflow-scheduler:
    <<: *airflow-common
    command: scheduler
    healthcheck:
      test: ["CMD-SHELL", 'airflow jobs check --job-type SchedulerJob --hostname "$${HOSTNAME}"']
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      airflow-init:
        condition: service_completed_successfully

  airflow-worker:
    <<: *airflow-common
    command: celery worker
    healthcheck:
      test:
        - "CMD-SHELL"
        - 'celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"'
      interval: 10s
      timeout: 10s
      retries: 5
    environment:
      <<: *airflow-common-env
      # Required to handle warm shutdown of the celery workers properly
      # See https://airflow.apache.org/docs/docker-stack/entrypoint.html#signal-propagation
      DUMB_INIT_SETSID: "0"
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      airflow-init:
        condition: service_completed_successfully

  airflow-triggerer:
    <<: *airflow-common
    command: triggerer
    healthcheck:
      test: ["CMD-SHELL", 'airflow jobs check --job-type TriggererJob --hostname "$${HOSTNAME}"']
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      airflow-init:
        condition: service_completed_successfully

  airflow-init:
    <<: *airflow-common
    entrypoint: /bin/bash
    # yamllint disable rule:line-length
    command:
      - -c
      - |
        function ver() {
          printf "%04d%04d%04d%04d" $${1//./ }
        }
        airflow_version=$$(AIRFLOW__LOGGING__LOGGING_LEVEL=INFO && gosu airflow airflow version)
        airflow_version_comparable=$$(ver $${airflow_version})
        min_airflow_version=2.2.0
        min_airflow_version_comparable=$$(ver $${min_airflow_version})
        if (( airflow_version_comparable < min_airflow_version_comparable )); then
          echo
          echo -e "\033[1;31mERROR!!!: Too old Airflow version $${airflow_version}!\e[0m"
          echo "The minimum Airflow version supported: $${min_airflow_version}. Only use this or higher!"
          echo
          exit 1
        fi
        if [[ -z "${AIRFLOW_UID}" ]]; then
          echo
          echo -e "\033[1;33mWARNING!!!: AIRFLOW_UID not set!\e[0m"
          echo "If you are on Linux, you SHOULD follow the instructions below to set "
          echo "AIRFLOW_UID environment variable, otherwise files will be owned by root."
          echo "For other operating systems you can get rid of the warning with manually created .env file:"
          echo "    See: https://airflow.apache.org/docs/apache-airflow/stable/start/docker.html#setting-the-right-airflow-user"
          echo
        fi
        one_meg=1048576
        mem_available=$$(($$(getconf _PHYS_PAGES) * $$(getconf PAGE_SIZE) / one_meg))
        cpus_available=$$(grep -cE 'cpu[0-9]+' /proc/stat)
        disk_available=$$(df / | tail -1 | awk '{print $$4}')
        warning_resources="false"
        if (( mem_available < 4000 )) ; then
          echo
          echo -e "\033[1;33mWARNING!!!: Not enough memory available for Docker.\e[0m"
          echo "At least 4GB of memory required. You have $$(numfmt --to iec $$((mem_available * one_meg)))"
          echo
          warning_resources="true"
        fi
        if (( cpus_available < 2 )); then
          echo
          echo -e "\033[1;33mWARNING!!!: Not enough CPUS available for Docker.\e[0m"
          echo "At least 2 CPUs recommended. You have $${cpus_available}"
          echo
          warning_resources="true"
        fi
        if (( disk_available < one_meg * 10 )); then
          echo
          echo -e "\033[1;33mWARNING!!!: Not enough Disk space available for Docker.\e[0m"
          echo "At least 10 GBs recommended. You have $$(numfmt --to iec $$((disk_available * 1024 )))"
          echo
          warning_resources="true"
        fi
        if [[ $${warning_resources} == "true" ]]; then
          echo
          echo -e "\033[1;33mWARNING!!!: You have not enough resources to run Airflow (see above)!\e[0m"
          echo "Please follow the instructions to increase amount of resources available:"
          echo "   https://airflow.apache.org/docs/apache-airflow/stable/start/docker.html#before-you-begin"
          echo
        fi
        mkdir -p /sources/logs /sources/dags /sources/plugins
        chown -R "${AIRFLOW_UID}:0" /sources/{logs,dags,plugins}
        exec /entrypoint airflow version
    # yamllint enable rule:line-length
    environment:
      <<: *airflow-common-env
      _AIRFLOW_DB_UPGRADE: 'true'
      _AIRFLOW_WWW_USER_CREATE: 'true'
      _AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow}
      _AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow}
      _PIP_ADDITIONAL_REQUIREMENTS: ''
    user: 0:0
    volumes:
      - .:/sources

  airflow-cli:
    <<: *airflow-common
    profiles:
      - debug
    environment:
      <<: *airflow-common-env
      CONNECTION_CHECK_MAX_COUNT: "0"
    # Workaround for entrypoint issue. See: https://github.com/apache/airflow/issues/16252
    command:
      - bash
      - -c
      - airflow

  flower:
    <<: *airflow-common
    command: celery flower
    profiles:
      - flower
    ports:
      - 5555:5555
    healthcheck:
      test: ["CMD", "curl", "--fail", "http://localhost:5555/"]
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      airflow-init:
        condition: service_completed_successfully

volumes:
  postgres-db-volume:

Далее нам нужен наш DAG (далее я буду писать ДАГ), то есть пайплайн задач airflow, который будет выполнять код на R. Напишем простейший, который будет брать из общедоступного докер хаба базовый образ R и выполнять в нём какую‑нибудь команду.

dags/my_cool_r_dag.py

from airflow.models import DAG
from airflow.providers.docker.operators.docker import DockerOperator
from datetime import datetime


with DAG(
	# Идентификатор ДАГа
    dag_id='cool_r_dag',
    # Дата первого прогона ДАГа
    start_date=datetime(2023, 2, 21),
    # Расписание, по которому будет выполняться ДАГ
    # В 10 минут раз в 2 часа каждый вторник
    schedule_interval='10 */2 * * 2'
) as dag:

    task = DockerOperator(
	    # Путь к докер демону
        docker_url='unix://var/run/docker.sock',
        api_version='auto',
        # Команда, которая будет выполняться в контейнере
        command='''R -e print\('"Hello, world!"'\)''',
        # Образ, из которого будет собираться контейнер
        image='arm64v8/r-base',
        # Идентификатор задачи (ДАГи состоят из задач)
        task_id='hello',
        # Автоудаление контейнера после выполнения
        auto_remove=True,
        # В каком даге выполняется эта задача
        dag=dag,
        # Запрет на связывание временных папок в контейнере и на хосте
        mount_tmp_dir=False
    )

После этого остаётся запустить сервис и выполнение задач.

docker compose build
docker compose up -d

По адресу localhost:8080 доступен web UI airflow (логин/пароль: airflow/airflow). После логина там будет такой список:

Активируем ДАГ тумблером слева и нажимаем кнопку play справа (там выбираем Trigger DAG). После нажимаем на название ДАГа, выбираем раздел Graph, там нажимаем на бокс с задачей (hello) и выбираем сверху раздел Log. Там мы увидим примерно что‑то такое:

Идеально, мы запустили R код в airflow. Теперь не обязательно переписывать все свои манипуляция с данными на питон.

Заключение

Это прямо обзорная статья, не подробное руководство к действию. В ней я хотел показать примитивный пример того, как лучше всего организовать подход к запуску кода по расписанию. Правда немного не хватает этапа автоматизации сборки образа (потому что решил не усложнять туториал ещё одним звеном в инфраструктуре), его я попробую описать словами и ссылками. Итак, этапы:

  1. Написание R кода

  2. Упаковка его в docker‑образ

    1. По автоматизации советую почитать gitlab CI/CD. Многие как раз хранят код в гитлабе, там можно настроить сборку простейшим раннером. Сама сборка буквально состоит из docker build → docker push. Хотя стоит ещё тесты накинуть.

    2. Также можно использовать гитлабовский же container registry. Для того, чтобы использовать собранный образ в airflow, надо будет в connections создать подключение к этому container registry и в коде задачи (DockerOperator) ДАГа указать идентификатор этого подкючения.

  3. Запуск по расписанию конейтенера из этого образа с любой выполнимой командой.

Я специально ничего не сказал про airflow в последнем пункте, так как запакованный в docker образ R код можно выполнять по расписанию где угодно.

Ещэ не много про автоматизацию. У меня сейчас как раз настроен процесс на GitLab CI/CD (self‑hosted) и Airflow. В итоге чтобы внести изменения в бизнес‑логику отчёта или пофиксить баг мне буквально нужно сделать три действия:

  1. Внести изменения в код в отдельной ветке.

  2. Отправить ветку с изменениями в gitlab.

  3. Дождаться ревью (если необходимо) и смёрджить ветку с изменениями в основную.

После этого в реестре контейнеров появится обновлённый с таким же тегом образом, а Airflow в следующий запуск ДАГа по расписанию будет использовать уже его, то есть подхватит все исправления.

Итого мы имеем не просто запущенный по расписанию код, а много следующих преимуществ:

  1. Код можно запустить на любой машине, где есть докер демон (но стоит обратить внимание на архитектуру процессора).

  2. Код можно запустить «кнопочкой».

  3. Если код не выполнился, это легко увидеть в веб интерфейсе (а можно ещё и алёрты настроить).

  4. Логи легко доступны по каждому выполнению, на каждом этапе.

  5. Если добавить CI/CD, то вносить изменения в код становится крайне легко и не надо думать, как это правильно доставить до клиента.

  6. Ну и вы не привязаны к языку, пишите на любом, который подходит вам и решает поставленные задачи.

Статья направлена и на тех аналитиков, которые всё же думают, что так как почти все ETL/ELT инструменты так или иначе связаны с Python, то и писать придётся на нём, а R стоит забросить. Я решил показать, что это не так, вы можете пользоваться всеми удобствами обоих языков. И как раз 30 марта в OTUS стартует мой курс по Анализу данных на языке R. Там вы научитесь быстрым, эффективным и элегантным приёмам работы с данными.

Прямо сейчас хочу пригласить вас на бесплатный урок, где я расскажу как получать данные практически из любых источников в сети с помощью библиотеки httr2. А также отправлять данные и команды также по сети. Еще рассмотрим как собирать сложные HTTP запросы из простых блоков.

Комментарии (0)