Сегодня мы с вами сделаем web-интерфейс для управления запуском ETL-процесса. В прошлой статье мы написали консольный скрипт, который разово запускает выгрузку. Но как это передать заказчику ?!

Что нам понадобится

  • виртуальное окружение с установленными пакетами django, redis, django_celery_beat, django-celery-results. Подробнее о требуемых зависимостях тут.

  • запущенный redis-server

Примечание: вместо redis-server можно использовать другой брокер сообщений - rabbitmq. В этом случае вам будет нужно указать другой URL брокера в настройках, указанных ниже.

Старый-добрый джанго

Итак, поскольку речь идёт о python и нам нужен web-интерфейс, мы поступим просто и инициализируем джанго проект с приложением в нём:

django-admin startproject config .  # проект создается в текущей папке, имя конфигурационной папки config
django-admin startapp etl_app  # приложение создаем для размещения в нем модуля с бизнес-логикой

Джанго мы выбрали из-за того, что в нём есть готовый административный интерфейс.

Согласно мануалам django_celery_beat, django-celery-results добавляем в settings.py нашего проекта новые приложения и некоторые настройки.

config/settings.py:


INSTALLED_APPS = [
    ...
    'etl_app',  # наше приложение
    'django_celery_beat',  # приложение из пакета django_celery_beat
    'django_celery_results',  # приложение из пакета django-celery-results
]"

В данном проекте "из песочницы" у нас одна очередь, однако можно настроить несколько очередей, например для распределения задач по приоритетам.

Настраиваем celery

Добавляем в папку настроек джанго проекта модуль celery.py и делаем доступным экземпляр приложения celery_app.

config/celery.py:

import os

from celery import Celery

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "config.settings")  # указываем Celery где найти Django-проект
app = Celery("etl_project")  # создаем экземпляр Celery
app.config_from_object("django.conf:settings", namespace="CELERY")  # настройки конфигурации для Celery в settings.py будут начинаться с префикса CELERY_
app.autodiscover_tasks()  # Celery будет искать задания в приложениях settings.INSTALLED_APPS

config/__init__.py:

""" чтобы экземпляр приложения Celery автоматически импортировался при запуске Django """
from .celery import app as celery_app

__all__ = ('celery_app',)

config/settings.py:

...
# CELERY
CELERY_BROKER_URL = 'redis://127.0.0.1:6379/0'  # для rabbitmq, поменяйте адрес брокера на amqp://guest:guest@127.0.0.1:5672
CELERY_TASK_TRACK_STARTED = True  # запускает трекинг задач Celery

# Планировщик задач
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'  # Celery настроен на использование планировщика из базы данных

CELERY_BROKER_TRANSPORT_OPTION = {'visibility_timeout': 3600}  # время ожидания видимости 1 час
CELERY_RESULT_BACKEND = 'django-db'  # указание для django_celery_results куда записывать результат выполнения задач
CELERY_ACCEPT_CONTENT = ['application/json']  # это тип содержимого, разрешенный к получению
CELERY_TASK_SERIALIZER = 'json'  # это строка, используемая для определения метода сериализации по умолчанию 
CELERY_RESULT_SERIALIZER = 'json'  # является типом формата сериализации результатов

CELERY_TASK_DEFAULT_QUEUE = 'default'  # celery будет использовать это имя очереди

Регистрируем нашу etl-функцию

В папке приложения etl_app создаем модуль tasks.py и импортируем в него код etl-процесса.

etl_app/tasks.py:

from celery import shared_task

from etl_app import etl


@shared_task(name="Задача ETL")  # регистрируем функцию в воркере
def etl_task(*args, **kwargs):
    unloads = etl.load()
    multiplication = etl.transform(unloads)
    etl.extract(multiplication)

    return "my result data"  # здесь может быть более полезная информация

Единственная задача декоратора @shared_task - зарегистрировать нашу функцию в воркере и сделать её доступной для запуска из очереди. Подробнее в документации.

Запускаем проект

Открываем несколько терминалов:

  1. (опционально) redis-server, если он у вас не запущен ранее

  2. celery -A config worker -l info # запускаем воркер для наших задач, все логи бизнес-кода будут здесь

  3. celery -A config beat -l info # запускаем службу beat в качестве отдельного процесса

  4. python manage.py runserver # запускаем сервер в тестовом режиме

В терминале воркера вы должны увидеть нашу задачу в перечне доступных:

- ** ---------- [config]
- ** ---------- .> app:         etl_project:0x7ff955e38490
- ** ---------- .> transport:   redis://127.0.0.1:6379/0
- ** ---------- .> results:     
- *** --- * --- .> concurrency: 2 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** ----- 
 -------------- [queues]
                .> default          exchange=default(direct) key=default


[tasks]
  . Задача ETL

В терминале beat'а должно быть сообщение о том, что управление расписанием задач доступно:

LocalTime -> 2023-01-22 05:02:23
Configuration ->
    . broker -> redis://127.0.0.1:6379/0
    . loader -> celery.loaders.app.AppLoader
    . scheduler -> django_celery_beat.schedulers.DatabaseScheduler

    . logfile -> [stderr]@%INFO
    . maxinterval -> 5.00 seconds (5s)
[2023-01-22 05:02:23,184: INFO/MainProcess] beat: Starting...

Создаем периодическую задачу

Для начала создаем экземпляр модели Intervals. На скрине ниже мы создаем период "раз в 10 минут":

Приложение django_celery_beat предлагает большое множество настройки расписания, включая классический cron. Оставляю вам это на самостоятельное изучение по документации.

Затем создаем расписание для нашей задачи в модели Periodic tasks. Если до этого вы всё сделали правильно, то в выпадающем списке задач вы увидите нашу "Задача ETL".

А вот обещанная возможность запустить задачу вне плана (см. Action):

Смотрим результат выполнения

Задача запускается в воркере. Пример вывода:

[2023-01-23 11:57:10,919: INFO/MainProcess] Task Задача ETL[a1bd0114-c3a9-4519-b1a4-af06614022cf] received
[2023-01-23 11:57:11,042: INFO/ForkPoolWorker-2] Задача ETL[a1bd0114-c3a9-4519-b1a4-af06614022cf]: the square of an even number: 0
[2023-01-23 11:57:11,043: INFO/ForkPoolWorker-2] Задача ETL[a1bd0114-c3a9-4519-b1a4-af06614022cf]: 1
[2023-01-23 11:57:11,045: INFO/ForkPoolWorker-2] Задача ETL[a1bd0114-c3a9-4519-b1a4-af06614022cf]: the square of an even number: 4
[2023-01-23 11:57:11,045: INFO/ForkPoolWorker-2] Задача ETL[a1bd0114-c3a9-4519-b1a4-af06614022cf]: skip load stage
[2023-01-23 11:57:11,045: INFO/ForkPoolWorker-2] Задача ETL[a1bd0114-c3a9-4519-b1a4-af06614022cf]: the square of an even number: 16
[2023-01-23 11:57:11,060: INFO/ForkPoolWorker-2] Task Задача ETL[a1bd0114-c3a9-4519-b1a4-af06614022cf] succeeded in 0.13728031599998758s: 'my result data'

В части вывода в консоль код из предыдущей статьи претерпел изменения. print'ы заменены на логгер. Причем я использую логгер из пакета celery from celery.utils.log import get_task_logger, который выводит мне айдишник задачи. Хотя можно остановится и на джанговском логгере, у которого есть отличная документация.

В модели Task results есть возможность посмотреть статус выполнения таска. Обратите внимание на поля Task State и Result Data. Значение последнего поля берется из return нашего таска (функция, обернутая в @shared_task):

Заключение

Сегодня мы научились на примере etl-процесса создавать, запускать и контролировать выполнение регулярных задач. Использовали джанго по назначению - создали web-интерфейс в "обозначенные" сроки. Результат можно передавать пользователю. Помните о том, что вашему пользователю нужно назначить права в админке на работу с моделями из приложений в django_celery_beat и django_celery_results.

Благодарю за внимание. С удовольствием отвечу на ваши вопросы. Репозиторий кода доступен по ссылке.

Комментарии (9)


  1. dolfinus
    22.01.2023 12:52

    Зачем тащить джангу, если можно использовать Airflow? Его сейчас практически каждый DE знает


    1. SergeyKlimov_Creator Автор
      22.01.2023 14:19

      Airflow безусловно да. Джанго появился из-за того, что в разрабатываемых мною проектах etl нужно было прикрутить в рамках уже существующего проекта. Как дополнительную функциональность.


  1. Val_SA
    22.01.2023 14:19

    Стоп, а где, собственно, вывод, те самые числа, которые должны были выдавать функции из 1 части?


    1. SergeyKlimov_Creator Автор
      22.01.2023 14:19

      Логируется всё в воркер


  1. velipre_xella
    22.01.2023 14:22

    Обычно в проде есть какие-то промышленные ETL-инструменты (Talend, Pentaho DI, IBM Datastage и тд и тп). Зачем может понадобиться это поделие, которое как минимум одного питонщика требует на сопровождение? Помимо всего прочего.


    1. SergeyKlimov_Creator Автор
      22.01.2023 14:23

      Цель данной статьи - показать способ решения задачи на питоне. Что именно использовать: собственную разработку или готовое программное решение - на усмотрение постановщика задачи.


  1. Jurasch
    23.01.2023 17:18

    В чём преимущество этого варианта ETL процесса на Python в сравнение к примеру с IBM DataStage?


    1. SergeyKlimov_Creator Автор
      23.01.2023 18:42

      Я ранее не работал с IBM DataStage. Насколько я понял из описания системы, она предназначена для перекачки данных из БД в БД. Если я ошибаюсь, напишите пож-та. Описанный мною etl процесс я использую для перекачки данных из БД в API-сервисы и наоборот. Причем API сервисы разные и требуют (или отдают) данные как json-формате, так и в xml. Помимо кода выгрузки, загрузки в etl-процессе требуется разработка билдеров и парсеров


      1. velipre_xella
        23.01.2023 20:41

        Потыкался сейчас в Designer Client IBM DataStage, вроде как можно xml и принимать, и отдавать. С json скорее всего так же. Полагаю, это маст хэв из коробки для любого етл-инструмента. Но я с IBM DataStage тоже вроде как не работаю.

        ЗЫ Вчера на учебном курсе в airflow попробовал. На 1 взгляд, комрад@SergeyKlimov_Creator, он в большинстве случаев выгодней твоего решения.