Качаем в локальную папку docker-compose файл. curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.5.0/docker-compose.yaml'.

Устанавливаем докер для вашей ос.

Создаем в корне проекта . env файл со следующим содержимым:
export AIRFLOW_UID=50000

Запускаем команду docker-compose up. У вас будут подняты все необходимые сервисы (включая бд Postgres, redis, celery воркеры и тд). Так же у вас будут созданы необходимые директории для создания дагов и плагинов, а так-же директория с логами.

Вывод docker ps после запуска сервиса.
Вывод docker ps после запуска сервиса.

Теперь когда у вас установлен Airflow, то вы можете в нем авторизоваться. Логин и пароль по умолчанию - airflow.

Более подробные инструкции по локальной установке airflow при помощи Docker вы можете найти в документации.

В Airflow из коробки есть множество готовых примеров реализации дагов. Возьмем в качестве заготовки один из них и напишем свой простой даг с синтетическим кодом для загрузки изображений и сохранению их в условное хранилище.

В extract будем ходить в апи за json со ссылками на фотографии.

[{"id":"chb","url":"https://cdn2.thecatapi.com/images/chb.jpg","width":480,"height":360}]

В transform будем преобразовывать данные в массив с названиями фотографий фото.

["chb.jpg", "chb2.jpg"]

В load будем выгружать данные в условный Storage(s3, ftp и тд).

Вот так выглядит наш даг состоящий всего из 3 методов.

Давайте его запустим:

Наш даг работает и полностью подходит нам в качестве заготовки для наших будущих проектов. Как вы заметили мы никуда не сохраняем скачанные фотографии, а просто вычитываем их из респонса. Давайте попробуем поработать с хранилищем и пусть это будет S3. Так-как у нас под рукой нет бакета, то мы его сэмулируем при помощи MinIO.

Для этого давайте добавим в наш docker-compose файл еще один сервис:

s3
  hostname: airflow-s3
  image: minio/minio:latest
  ports:
    - 9000:9000
    - 9001:9001
  env_file:
    - .env
  healthcheck:
  test: [ 'CMD', 'curl', 'http://airflow-s3:9000/minio/health/cluster/read' ]
  start_period: 10s
  timeout: 5s
  interval: 3s
  retries: 100
  command: server --console-address :9001 /data:

Добавьте в .env файл наш будущий secret_key и access_key.

export MINIO_ROOT_USER=airflow
export MINIO_ROOT_PASSWORD=airflowpass

Создадим с3 бакет. Перезапускаем наши сервисы через docker-compose restart и заходим в UI minIO. Авторизоваться нужно при помощи MINIO_ROOT_USER/PASSWORD указанных в env файле.

Теперь нужно создать новое соединение для s3 бакета. Сделать это мы можем при помощи интерфейса или командной строки. Воспользуемся UI для простоты.

В поле extra необходимо дописать: {"region_name": "us-east-1", "bucket_name": "my-s3bucket", "endpoint_url": "http://airflow-s3:9000"}

Нам необходимо добавить к нашему дагу работу с соединением и получение bucket_name из поля Extra в airflow connections.

hook = S3Hook(aws_conn_id="S3_ETL_CONN")
extra = hook.get_connection(hook.aws_conn_id).get_extra()
hook.load_bytes(r.content, key=url.split('/')[-1], bucket_name=json.loads(extra)['bucket_name'])

Теперь переходим в даг и запускаем его.

Даг успешно отработал т.к в нашем minIO S3 появились фото котиков.

Источники:

Комментарии (2)


  1. Paranoich
    28.12.2022 19:57
    +2

    О чем этот фильм? Да ни о чём.


  1. alexxz
    28.12.2022 20:24
    +1

    Из статьи может получиться неплохой туториал для начинающих. В настоящий момент она довольно сумбурно написана и потому бесполезна, на мой взгляд. Можно часть про ETL заменить на что-то другое... Всё равно суть ETL не раскрыта...