В этой статье я расскажу, как можно запустить простой ETL-процесс на виртуальном сервере, используя связку Superset, Airflow и ClickHouse. В качестве платформы я взял готовую конфигурацию от Beget, включающую Superset и Airflow из коробки — это позволяет сосредоточиться на логике обработки данных, а не на настройке окружения.

В качестве примера мы подготовим процесс выгрузки и визуализации данных о товарах с сайта Wildberries.

Для извлечения данных мы будем использовать Python-библиотеки selenium и BeautifulSoup — они хорошо подходят для парсинга веб-страниц. Дополнительно применим re для обработки текстовой информации с помощью регулярных выражений.

Что такое ETL

ETL (Extract — Transform — Load) — это классический процесс обработки данных, включающий три ключевых этапа:

  • Извлечение (Extract): на этом этапе данные получают из внешних источников — будь то базы данных, API, веб-страницы или файлы.

  • Преобразование (Transform): данные очищаются, нормализуются и приводятся к нужной структуре.

  • Загрузка (Load): данные загружаются в целевое хранилище — в нашем случае это ClickHouse, где их можно анализировать, например, с помощью Superset.

Теперь, когда мы разобрались с основами ETL, перейдём к практической части — подготовке окружения и настройке Airflow для выполнения нашего процесса.

Дополнительная настройка окружения Airflow

Поскольку среда выполнения Python-скриптов — это Airflow, необходимо добавить нужные зависимости в кастомный Docker-образ. Мы не изменяем контейнер напрямую, а создаём собственную сборку на базе apache/airflow:2.8.2.

Краткая справка о структуре Airflow 2.8.2

Airflow разворачивается как набор контейнеров:

Airflow 2.8.2 (версия, актуальная на ммоент начала работы над статьей) обычно использует несколько контейнеров для выполнения различных функций в рамках своего распределенного выполнения задач и управления. Стандартная установка через Docker или Docker Compose включает несколько контейнеров, чтобы обеспечить гибкость и масштабируемость. Вот почему это происходит и что каждый из контейнеров делает:

  1. Webserver пользователям взаимодействовать с системой, просматривать DAG'и, их статусы, логи выполнения, а также управлять запуском задач и планировками. Описание: Контейнер на базе Flask, который предоставляет веб-интерфейс.

  2. Scheduler Задача: Отвечает за планирование задач, определение того, какие задачи должны быть запущены, и их размещение в очереди. Описание: Контейнер, который постоянно работает и проверяет состояние DAG'ов и триггеров задач.

  3. Worker(s) Задача: Рабочие контейнеры, которые выполняют задачи DAG'ов. В зависимости от конфигурации может быть один или несколько таких контейнеров. Описание: Выполняет задачи, отправленные планировщиком. Если используется Celery Executor, то обычно создается несколько worker-контейнеров для параллельного выполнения задач.

  4. Database (Postgres/MySQL) Задача: Содержит основную базу данных для хранения метаданных Airflow, включая конфигурации DAG'ов, результаты выполнения задач, логи и другую информацию. Описание: В Docker Compose обычно используется PostgreSQL, но можно использовать и другие базы данных.

  5. Redis (или RabbitMQ) Задача: Является брокером очередей задач для Celery Executor. Используется для передачи задач от планировщика к воркерам. Описание: Redis или RabbitMQ выступают в роли message broker, который управляет очередями задач.

  6. Triggerer (если используется) Задача: Обрабатывает асинхронные сенсоры (например, AsyncSensor), которые могут ожидать определенных событий без блокировки ресурсов. Описание: Отдельный контейнер для эффективной обработки долгосрочных ожиданий.

Сборка кастомного образа

cd /opt/beget/airflow

Создаём Dockerfile:

FROM apache/airflow:2.8.2
USER root
RUN apt-get update && \    apt-get install -y wget unzip && \    wget https://dl.google.com/linux/direct/google-chrome-stable_current_amd64.deb && \    apt install -y ./google-chrome-stable_current_amd64.deb && \    rm -f ./google-chrome-stable_current_amd64.deb && \    export LATEST_VERSION=$(curl -s https://chromedriver.storage.googleapis.com/LATEST_RELEASE) && \    wget -O /tmp/chromedriver.zip "https://chromedriver.storage.googleapis.com/$LATEST_VERSION/chromedriver_linux64.zip" && \    unzip /tmp/chromedriver.zip -d /usr/local/bin/ && \    rm /tmp/chromedriver.zip && \    rm -rf /var/lib/apt/lists/*
USER airflow
RUN pip3 install pandas numpy selenium webdriver-manager beautifulsoup4

Собираем образ и обновляем конфигурацию:

docker build -t local/airflow:2.8.2 .

Меняем image: в docker-compose.yml и перезапускаем:

docker-compose down && docker-compose up -d

Пояснения к RUN

Эти команды, написанные в Dockerfile в блоке с первым RUN, выполняются последовательно, чтобы установить Google Chrome и ChromeDriver на контейнер. Вот разбор по шагам:

  1. RUN apt-get update && \ Выполняется обновление списка пакетов в системе. Эта команда загружает обновленную информацию о доступных пакетах из репозиториев, чтобы обеспечить установку самых последних версий.

  2. apt-get install -y wget unzip && \ Устанавливаются необходимые утилиты: wget: Команда для загрузки файлов по HTTP, HTTPS и FTP. unzip: Команда для распаковки ZIP-архивов. Флаг -y указывает apt-get автоматически отвечать "да" на запросы подтверждения при установке.

  3. wget https://dl.google.com/linux/direct/google-chrome-stable_current_amd64.deb && \ Скачивается пакет .deb, содержащий последнюю стабильную версию Google Chrome для архитектуры amd64 (64-битная версия).

  4. apt install -y ./google-chrome-stable_current_amd64.deb && \ Устанавливается Google Chrome из загруженного пакета .deb. Флаг -y снова автоматически подтверждает установку.

  5. rm -f ./google-chrome-stable_current_amd64.deb && \ Удаляется загруженный файл пакета .deb после установки, чтобы освободить место.

  6. export LATEST_VERSION=$(curl -s https://chromedriver.storage.googleapis.com/LATEST_RELEASE) && \ Переменная LATEST_VERSION устанавливается равной последней доступной версии ChromeDriver. curl -s загружает содержимое страницы без вывода процесса загрузки (флаг -s означает "silent" — без вывода прогресса).

  7. echo "Latest ChromeDriver version: $LATEST_VERSION" && \ Выводится сообщение в консоль с версией ChromeDriver, которая будет загружена.

  8. wget -O /tmp/chromedriver.zip "https://chromedriver.storage.googleapis.com/$LATEST_VERSION/chromedriver_linux64.zip" && \ Загружается ZIP-архив с ChromeDriver соответствующей версии в каталог /tmp/ и сохраняется под именем chromedriver.zip.

  9. unzip /tmp/chromedriver.zip -d /usr/local/bin/ && \ Распаковывается содержимое chromedriver.zip в каталог /usr/local/bin/, чтобы ChromeDriver был доступен в PATH.

  10. rm /tmp/chromedriver.zip && \ Удаляется временный ZIP-файл после его распаковки, чтобы освободить место.

  11. rm -rf /var/lib/apt/lists/* Удаляются временные файлы, созданные apt-get, для очистки и уменьшения размера контейнера.

В итоге, этот блок команд устанавливает Google Chrome и ChromeDriver в контейнере, а также очищает временные файлы, что важно для экономии места и повышения эффективности контейнера.

Работа с Airflow

Основные принципы

Об airflow можно рассказать много, но в рамках этой статьи ограничимся небольшой справкой, а далее посмотрим на него в рамках нашего кейса.

Apache Airflow — это платформа для создания, планирования и мониторинга рабочих процессов (DAGs — Directed Acyclic Graphs). Основная единица работы в Airflow это DAG. Он определяет последовательность задач и зависимости между ними.

DAG содержит:

  • Задачи (Tasks): Операции, которые должны быть выполнены. Задачи могут зависеть друг от друга, и эти зависимости выражаются в виде графа.

  • Расписание (Schedule): DAG можно запускать по расписанию (например, ежедневно в 9 утра) или вручную.

  • Ациклический характер: DAG не может содержать циклов, т.е. нельзя вернуться к уже выполненной задаче.

План такой: поскольку наш кейс по ETL, то первое что нужно сделать, это - ... ? ... extract. А затем уже transform и load.

Не будем изобретать какую то сложную логику, а прямо поместим в первую задачу парсинг интернет магазина wildberries, а именно всех роботов-пылесосов, которых мы там найдем, с их техническими характеристиками и ценами. Вторая задача будет посвящена обработке данных (очистка, унификация, структурирование), а в рамках третьей задачи загрузим полученные данные в базу данных на ClickHouse. Все эти задачи у нас будyт написаны в виде скрипта на python и загружены в специальную директорию на vps.

Полный код DAGа с комментариями. Осторожно, много кода.
from airflow import DAG
from airflow.decorators import dag, task
from airflow_clickhouse_plugin.hooks.clickhouse import ClickHouseHook
from datetime import datetime, timedelta
import pandas as pd
import numpy as np
import re
from time import sleep
from selenium import webdriver
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
from webdriver_manager.chrome import ChromeDriverManager
from bs4 import BeautifulSoup
# Настройка опций Chrome
options = webdriver.ChromeOptions()
options.add_argument('--headless')  # Работать в фоновом режиме
options.add_argument('window-size=1920x1080')  # Задаем большой размер окна
options.add_argument('--no-sandbox')
options.add_argument('--disable-dev-shm-usage')
options.add_argument('--disable-gpu')  # Отключить GPU для совместимости
options.add_argument('--incognito') # Режим инкогнито
default_args = {    'depends_on_past': False,    'retries': 1,    'retry_delay': timedelta(minutes=5),    'start_date': datetime(2024, 10, 29)}
schedule_interval = '0 9 * * *'
def get_page_content(articles):    '''    Функция, которая получает на вход список содежимого тегов article,    в которых содежится информация из карточек товара при быстром просмотре товара в каталоге    Возвращает датафрейм со следующими полями:    - id - артикул товара, string    - name - название товара, string    - price - цена, int    - old_price - старая цена, float или nan    - brand - название бренда, string    - rate - рейтинг товарв, float или nan    - estimate - количество оценок, float или nan    - href - ссылка на карточку товара для получения полной информации, string    '''    idx_lst, name_lst, price_lst, old_price_lst, brand_lst, rate_lst, estimate_lst, href_lst = [], [], [], [], [], [], [], []    for i in range(len(articles)):        try:            idx_lst.append(articles[i].attrs['data-nm-id'])        except Exception as e:            idx_lst.append(None)        try:            name_lst.append(articles[i].a.get('aria-label'))        except Exception as e:            name_lst.append(None)        try:            brand_lst.append(articles[i].find('span', class_='product-card__brand').get_text(strip=True))        except Exception as e:            brand_lst.append(None)        try:            price_lst.append(int(''.join(filter(str.isdigit, articles[i].find('ins', class_='price__lower-price')                                                .get_text(strip=True)))))        except Exception as e:            price_lst.append(None)        try:            old_price_lst.append(int(''.join(filter(str.isdigit, articles[i].find('del').get_text(strip=True)))))        except Exception as e:            old_price_lst.append(None)        try:            rate_lst.append(float(articles[i].find('span', class_='address-rate-mini address-rate-mini--sm')                                  .get_text(strip=True)))        except Exception as e:            rate_lst.append(None)        try:            estimate_lst.append(int(''.join(filter(str.isdigit, articles[i].find('span', class_='product-card__count')                                                   .get_text(strip=True)))))        except Exception as e:            estimate_lst.append(None)        try:            href_lst.append(articles[i].a.get('href'))        except Exception as e:            href_lst.append(None)    # у тех, у которых нет id - это из рекомендательной ленты, их можно удалить    df = pd.DataFrame({'id': idx_lst, 'name': name_lst, 'price': price_lst, 'old_price': old_price_lst,                  'brand': brand_lst, 'rate': rate_lst, 'estimate': estimate_lst, 'href': href_lst})\        .query('id==id').reset_index(drop=True)    return df
# Функция медленного скролла
def slow_scroll(driver, step, delay):    """Функция для медленного скролла страницы."""    # Получаем высоту страницы    scroll_height = driver.execute_script("return document.body.scrollHeight")    current_scroll_position = 0    while current_scroll_position < scroll_height:        # Прокрутка вниз на заданное количество пикселей        driver.execute_script(f"window.scrollBy(0, {step});")        current_scroll_position += step        sleep(delay)        # Обновляем высоту страницы после скролла        scroll_height = driver.execute_script("return document.body.scrollHeight")
def map_to_base_brand(brand):    if pd.isna(brand):        return brand    brand = brand.lower()    for base_brand, pattern in brand_mapping.items():        if re.search(pattern, brand):            return base_brand    return brand  # Вернуть оригинальное значение, если не найдено соответствие
def map_to_base_color(color):    if pd.isna(color):        return color    color = color.lower()    for base_color, pattern in color_mapping.items():        if re.search(pattern, color):            return base_color    return 'другой'  # Вернуть оригинальное значение, если не найдено соответствие
def dict_to_records(data_dict):    keys = list(data_dict.keys())    records = [tuple(data_dict[key][str(i)] for key in keys) for i in range(len(data_dict[keys[0]]))]    return records
columns_russian = ['Цвет', 'Объем пылесборника', 'Режимы уборки',                   'Объем резервуара для воды', 'Модель', 'Гарантийный срок',                   'Время работы (мин)', 'Емкость аккумулятора', 'Питание',                   'Время автономной работы', 'Тип управления', 'Тип уборки',                   'Тип пылесборника', 'Ориентация в пространстве', 'Мощность устройства',                   'Выходной фильтр', 'Максимальный уровень звука/шума',                   'Доп. опции робота пылесоса', 'Материал изделия',                   'Индикация робота пылесоса', 'Допустимая высота препятствия',                   'Вес товара без упаковки (г)', 'Комплектация', 'Страна производства',                   'Вес товара с упаковкой (г)', 'Установка на зарядное устройство',                   'Высота предмета', 'Ширина предмета', 'Глубина предмета',                   'Длина упаковки', 'Высота упаковки', 'Ширина упаковки',                   'Артикул', 'Объем пылесборника', 'Режимы уборки',                   'Объем резервуара для воды', 'Модель', 'Мощность всасывания',                   'Ограничитель зоны уборки', 'Charging_installation', 'Материал насадки',                   'Количество предметов в упаковке', 'Тип насадки', 'Вес с упаковкой (кг)',                   'Вес без упаковки (кг)', 'Мощность всасывания (Па)', 'Диаметр сопла']
# Новые названия колонок на латинице
columns_latin = ['Color', 'Dustbin_capacity', 'Cleaning_modes',                 'Water_tank_capacity', 'Model', 'Warranty_period',                 'Runtime_min', 'Battery_capacity', 'Power_supply',                 'Autonomy_time', 'Control_type', 'Cleaning_type',                 'Dustbin_type', 'Spatial_orientation', 'Device_power',                 'Output_filter', 'Max_noise_level',                 'Additional_features', 'Material',                 'Robot_vacuum_indication', 'Obstacle_height',                 'Weight_g', 'Package_contents', 'Country_of_manufacture',                 'Weight_with_packaging_g', 'Charging_installation',                 'Item_height', 'Item_width', 'Item_depth',                 'Package_length', 'Package_height', 'Package_width',                 'Article', 'Dustbin_capacity', 'Cleaning_modes',                 'Water_tank_capacity', 'Model', 'Suction_power',                 'Cleaning_area_limiter', 'Charging_installation','Nozzle material',                 'Number_of_items_in_package','Nozzle_type','Weight_with_packaging_kg',                 'Weight_wo_packaging_kg','Suction_power_Pa','Nozzle diameter']
# Создание словаря для соответствия старых и новых названий колонок
column_mapping = dict(zip(columns_russian, columns_latin))
brand_mapping = {    'xiaomi': r'(?i)\bксиоми\b|\bxiaomi robot\b|\bxioamii\b|\bс я о м и\b|\bx i a o m i\b|\bmi\b|\bxiaomi mi\b',    'samsung':r'(?i)\bsamsung\b|\bсамсун\b',    'honor': r'(?i)\bhonor\b|\bhonor choice\b|\bhonorchoice\b|\bхонор\b',    'polaris': r'(?i)\bpolaris\b|\bполярис\b',    'dreame': r'(?i)\bdreame\b',    'lydsto': r'(?i)\blydsto\b|\blidsto\b',    'roborock': r'(?i)\broborock\b',    'redmond': r'(?i)\bredmond\b',    'filterix': r'(?i)\bfilterix\b',    'hobot': r'(?i)\bhobot\b|\bхобот\b',    'mijia': r'(?i)\bmijia\b',    'futula': r'(?i)\bfutula\b',    'ilife': r'(?i)\bilife\b',
}
# Словарь основных цветов и соответствующих оттенков
color_mapping = {    'белый': r'(?i)\bбелый\b|\bwhite\b',    'черный': r'(?i)\bчерный\b|\bграфит\b|\bblack\b',    'синий': r'(?i)\bсиний\b|\bголубой\b|\b蓝色\b|\b蓝色\b|\bdark blue\b',    'серый': r'(?i)\bсерый\b|\bсеребристый\b|\bсеребристо\b|\bсеро\b',    'красный': r'(?i)\bкрасный\b|\bярко-красный\b|\bкрасно\b',    'зеленый': r'(?i)\bзеленый\b|\bсалатовый\b',    'желтый': r'(?i)\bжелтый\b',    'бежевый': r'(?i)\bбежевый\b',    'оранжевый': r'(?i)\bоранжевый\b',    'розовый': r'(?i)\bрозовый\b',    'фиолетовый': r'(?i)\bфиолетовый\b',    'золотистый': r'(?i)\bзолотистый\b',    'металл': r'(?i)\bметалл\b|\bхром\b|\bсеребро\b|\bплатина\b',
}
# Устанавливаем параметры скрола для прогрузки элементов
scroll_height = 15000 # Высота страницы
step = 100  # количество пикселей за один шаг
delay = 0.5  # задержка между шагами в секундах
@dag(default_args=default_args, schedule_interval=schedule_interval, catchup=False)
def robot_vacuum__scraper_wb():    @task    def scraping_data():        # Установка веб-драйвера и инициализация        driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=options)        print('driver created')        sleep(2)        try:            # Очищаем кэш перед началом работы и обновляем страницу            driver.delete_all_cookies()            driver.refresh()            driver.get('https://www.wildberries.ru/') # Открываем стартовую страницу            sleep(5)            ## Находим поисковую строку            search_box = WebDriverWait(driver, 18).until(EC.presence_of_element_located((By.ID, 'searchInput')))            sleep(3)            search_box.send_keys('робот-пылесос') # Вводим в поисковую строку запрос            sleep(4)            # Симулируем нажатие клавиши Enter            search_box.send_keys(Keys.RETURN)            sleep(10)            slow_scroll(driver, step, delay) # Выполнение медленного скролла            sleep(2)            content = BeautifulSoup(driver.page_source, "html.parser") # Получаем содержимое первой страницы            # Инициируем датафрейм, записывая в него содержимое первой страницы            df = get_page_content(articles = content.find_all('article')).assign(number_page = 1)            print(f"Получены данные со страницы 1 в {datetime.now().strftime('%H:%M:%S')}")            # В цикле перелистываем страницы - оставляем 10 первых страниц в выдаче            for page in range(2, 10):                try:                    # Переходим на следующую страницу                    next_page = WebDriverWait(driver, 20).until(EC.element_to_be_clickable((By.LINK_TEXT, str(page))))                    driver.execute_script("arguments[0].click();", next_page)                    sleep(2)                    slow_scroll(driver, step, delay) # Скроллим                    sleep(2)                    content = BeautifulSoup(driver.page_source, "html.parser") # Получаем содержимое                    page_content = get_page_content(articles = content.find_all('article')).assign(number_page = page)                    df = pd.concat([df, page_content]).reset_index(drop=True)                    print(f"Получены данные со страницы {page} в {datetime.now().strftime('%H:%M:%S')}")                except Exception as e:                    print(f"Страницы {page} не существует, получена ошибка {repr(e)}")                    break            # Удаляем дубликаты            df = df.drop_duplicates('id').reset_index(drop=True)            # Делаем обход каждой сохраненной страницы, получая подробные характеристики товара            all_details = pd.DataFrame()            # Последовательно проходим по всем ссылкам из списка пылесосов, открываем карточки и получаем детали            for i, product_url in enumerate(df.href):                try:                    driver.get(product_url) # Открытие страницы товара                    sleep(4)                    # Явное ожидание загрузки кнопки "Все характеристики и описание"                    button = WebDriverWait(driver, 14).until(EC.element_to_be_clickable((By.CLASS_NAME, 'product-page__btn-detail')))                    print(f'goods {i}, waited load')                    driver.execute_script("arguments[0].scrollIntoView(true);", button) #Прокрутка к кнопке, чтобы она была в видимой области                    button.click() # Нажимаем на кнопку                    sleep(5)                    page_source = driver.page_source # Получение исходного кода страницы после загрузки                    soup = BeautifulSoup(page_source, 'html.parser') # Инициализация BeautifulSoup для парсинга страницы                    #print(f'goods {i}, start pardced')                    details = pd.DataFrame([[i.span.text for i in soup.find_all("td", class_="product-params__cell")]],                            columns=[i.span.text for i in soup.find_all("span", class_="product-params__cell-decor")])                    select_index = pd.DataFrame({'col':details.rename(columns=column_mapping).columns})                            .drop_duplicates('col').index.to_list()                    details = details.rename(columns=column_mapping).iloc[:, select_index]                    all_details = pd.concat([all_details, details]).reset_index(drop=True)                except Exception as e:                    print(f'href {product_url}: {repr(e)}')            # Закрытие веб-драйвера            driver.quit()            # Объединяем основной список с подробными характеристиками            # Добавляем номер позиции в выдаче, удаляем дубликаты            # Добавляем дату получения данных, преобразуем в строку, т.к. JSON не поддерживает сериализацию объектов Timestamp напрямую            df = (df.drop_duplicates('id').rename(columns={'id':'Article'})              .merge(all_details.drop_duplicates('Article'), on='Article')            .assign(number_position = lambda df: np.arange(len(df)))\              .assign(date = pd.to_datetime(datetime.now()).normalize().strftime('%Y-%m-%d')))            #df = df.where(pd.notnull(df), None)            df = df.astype(object).where(pd.notnull(df), None)            return df.to_dict()        finally:            print(f'quit driver')            driver.quit()    @task    def preprocessing_data(records):        print('Start prepocessing')        df = pd.DataFrame(records)  # Преобразование records в DataFrame        #df['price'] = df['price'].astype('Int64')        # Бренд        df_brand = (df[['Article', 'brand']].rename(columns={'brand': 'old_brand'})                    .assign(sup_brand=lambda df: df.old_brand.apply(map_to_base_brand),                            num_brand=lambda df: df.groupby('sup_brand')['sup_brand'].transform('count'))                    .assign(brand=lambda df: [brand if num_brand > 1 else "other" for brand, num_brand in zip(df.sup_brand, df.num_brand)])[['Article', 'brand']])        print('get brand')        # Страна производства        # Решила захардкодить значения, возможно это не самый лучший вариант        country = pd.DataFrame({'Country_of_manufacture':['Китай', 'Россия', 'Тайвань', '中国', 'Вьетнам', 'Того', 'Республика Корея',                                                'Германия', 'China', 'Гонконг', 'Малайзия', 'КНДР', 'Южная Корея'],                     'country_of_manufacture':['Китай', 'Россия', 'Тайвань', 'Китай', 'Вьетнам', 'Того', 'Республика Корея',                                                'Германия', 'Китай', 'Гонконг', 'Малайзия', 'КНДР', 'Республика Корея']})        df_country = df[['Article','Country_of_manufacture']].merge(country, on='Country_of_manufacture', how='left')                    .fillna('other')[['Article','country_of_manufacture']]        print('get country')        # Цвет корпуса        df_color = df[['Article', 'Color']].rename(columns={'Color':'old_color'})            .assign(color = lambda df: df.old_color.apply(map_to_base_color))[['Article','color']]        print('get color')        # Специфика далее приведенных категориальных признаков в том, что каждый пылесос может иметь        # несколько значений одновременно.        # В качестве значений будем использовать флаги (значения 1 или 0) - есть опция или нет        # Тип управления ('Control_type')        # Механическое управление, вручную, кнопки        df_manual_control = df[(df.Control_type.fillna('na').str.contains('кнопк|кнопочн|механич|на корпусе', case=False))]                    [['Article']].assign(manual_control = 1)        print('get manual control')        # Голосовой помощник - также используем значение поля Ориентация в пространстве ('Spatial_orientation')        # и Доп. опции робота пылесоса ('Additional_features')        pattern = 'умный дом|умная колонка|алис|голос|alex|марус|assistant'        df_voice_assistant = df[(df.Control_type.fillna('na').str.contains(pattern, case=False))|                (df.Spatial_orientation.fillna('na').str.contains(pattern, case=False))|                (df.Additional_features.fillna('na').str.contains(pattern, case=False))]\                [['Article']].assign(voice_assistant = 1)        print('get voice assistant')        # Управление со смартфона - также берем из Доп. опции робота пылесоса ('Additional_features')        df_app_control = df[(df.Control_type.fillna('na').str.contains('мобильн|смартфон|приложен|телефон', case=False))|                    (df.Additional_features.fillna('na').str.contains('мобильн|смартфон|приложен|телефон', case=False))]\                    [['Article']].assign(app_control = 1)        print('get app control')        # Управление пультом - также берем из Доп. опции робота пылесоса ('Additional_features')        df_remote_control = df[(df.Control_type.fillna('na').str.contains('пульт|ДУ', case=False))|                              (df.Additional_features.fillna('na').str.contains('пульт|ДУ', case=False))]\                    [['Article']].assign(remote_control = 1)        print('get remove control')        # Тип уборки ('Cleaning_type') - также берем из Доп. опции робота пылесоса ('Additional_features')        # Моющий        df_wash_cleaning = df[(df.Cleaning_type.fillna('na').str.contains('мойк|моющ', case=False))|                             (df.Additional_features.fillna('na').str.contains('мойк|моющ', case=False))]\                    [['Article']].assign(wash_cleaning = 1)        print('get wash cleaning')        # Сухая уборка        df_dry_cleaning = df[(df.Cleaning_type.fillna('na').str.contains('сухая', case=False))|                            (df.Additional_features.fillna('na').str.contains('сухая', case=False))]\                    [['Article']].assign(dry_cleaning = 1)        print('get dry cleaning')        # Влажная уборка        df_wet_cleaning = df[(df.Cleaning_type.fillna('na').str.contains('влажная', case=False))|                            (df.Additional_features.fillna('na').str.contains('влажная', case=False))]\                    [['Article']].assign(wet_cleaning = 1)        print('get wet cleaning')        # Сухая и влажная уборка одновременно        df_dry_and_wet_cleaning = df[(df.Cleaning_type.fillna('na').str.contains('одновремен|Комбинирован|комплексн', case=False))]                    [['Article']].assign(dry_and_wet_cleaning = 1)        print('get dry and wet cleaning')        # Тип пылесборника ('Dustbin_type')        # Мешок для мусора        df_garbage_bag = df[(df.Dustbin_type.fillna('na').str.contains('мешок', case=False))]                    [['Article']].assign(garbage_bag = 1)        print('get garbage bag')        # Контейнер        df_container = df[(df.Dustbin_type.fillna('na').str.contains(r'контейнер|емкость', case=False))]                    [['Article']].assign(container = 1)        print('get container')        # Аквафильтр        df_aquafilter = df[(df.Dustbin_type.fillna('na').str.contains('аквафильтр', case=False))|                        (df.Dustbin_type.fillna('na').str.contains(r'(?=.*моющийся)(?=.*фильтр)', case=False))]\                        [['Article']].assign(aquafilter = 1)        print('get aquafilter')        # Ориентация в пространстве ('Spatial_orientation')        # также все признаки берем из Доп. опции робота пылесоса ('Additional_features'), т.к. они также могут быть там указаны        # возврат на базу        df_return_to_base = df[(df.Spatial_orientation.fillna('na').str.contains(r'(?=.*установк)(?=.*зарядн)', case=False))|                        (df.Additional_features.fillna('na').str.contains(r'(?=.*установк)(?=.*зарядн)', case=False))|                        (df.Spatial_orientation.fillna('na').str.contains(r'(?=.*поиск)(?=.*зарядн)', case=False))|                        (df.Additional_features.fillna('na').str.contains(r'(?=.*поиск)(?=.*зарядн)', case=False))|                        (df.Spatial_orientation.fillna('na').str.contains(r'(?=.*возврат)(?=.*баз)', case=False))|                        (df.Additional_features.fillna('na').str.contains(r'(?=.*возврат)(?=.*баз)', case=False))|                        (df.Spatial_orientation.fillna('na').str.contains(r'(?=.*возврат)(?=.*зарядн)', case=False))|                        (df.Additional_features.fillna('na').str.contains(r'(?=.*возврат)(?=.*зарядн)', case=False))|                        (df.Spatial_orientation.fillna('na').str.contains(r'(?=.*поиск)(?=.*баз)', case=False))|                        (df.Additional_features.fillna('na').str.contains(r'(?=.*поиск)(?=.*баз)', case=False))|                        (df.Spatial_orientation.fillna('na').str.contains(r'(?=.*установк)(?=.*баз)', case=False))|                        (df.Additional_features.fillna('na').str.contains(r'(?=.*установк)(?=.*баз)', case=False))|                        (df.Spatial_orientation.fillna('na').str.contains(r'парковк', case=False))|                        (df.Additional_features.fillna('na').str.contains(r'парковк', case=False))]\                        [['Article']].assign(return_to_base= 1)        print('get return to base')        # Виртуальная стена        df_virtual_wall = df[(df.Spatial_orientation.fillna('na').str.contains('Ограничитель', case=False))|                    (df.Additional_features.fillna('na').str.contains('Ограничитель', case=False))|                    (df.Spatial_orientation.fillna('na').str.contains(r'(?=.*запретн)(?=.*зон)', case=False))|                    (df.Additional_features.fillna('na').str.contains(r'(?=.*запретн)(?=.*зон)', case=False))|                    (df.Spatial_orientation.fillna('na').str.contains(r'(?=.*виртуальн)(?=.*стен)', case=False))|                    (df.Additional_features.fillna('na').str.contains(r'(?=.*виртуальн)(?=.*стен)', case=False))]\                    [['Article']].assign(virtual_wall = 1)        print('get virtual wall')        # Составление карты        df_space_map = df[(df.Spatial_orientation.fillna('na').str.contains('план|карт|зонирован', case=False))|                         (df.Additional_features.fillna('na').str.contains('план|карт|зонирован', case=False))]\                    [['Article']].assign(space_map = 1)        print('get space map')        # Специальный режим для ковра, определение ковра        df_carped_mode = df[(df.Spatial_orientation.fillna('na').str.contains('ковр|ковер', case=False))|                           (df.Additional_features.fillna('na').str.contains('ковр|ковер', case=False))]\                    [['Article']].assign(carped_mode = 1)        print('get carped mode')        # Лазерные датчики        df_laser_sensor = df[(df.Spatial_orientation.fillna('na').str.contains('лазер', case=False))|                            (df.Additional_features.fillna('na').str.contains('лазер', case=False))]\                    [['Article']].assign(laser_sensor = 1)        print('get laser sensor')        # гироскоп        df_giroscope = df[(df.Spatial_orientation.fillna('na').str.contains('гироскоп', case=False))|                         (df.Additional_features.fillna('na').str.contains('гироскоп', case=False))]\                    [['Article']].assign(giroscope = 1)        print('get giroscope')        # лидар        df_lidar = df[(df.Spatial_orientation.fillna('na').str.contains('лидар|LiDAR', case=False))|                     (df.Additional_features.fillna('na').str.contains('лидар|LiDAR', case=False))]\                    [['Article']].assign(lidar = 1)        print('get lidar')        # Детекция края, ступенек, перепада высоты        df_edge_detection = df                [(df.Spatial_orientation.fillna('na').str.contains('ступен|высот|обрыв|края|край|перепад|паден', case=False))|                (df.Additional_features.fillna('na').str.contains('ступен|высот|обрыв|края|край|перепад|паден', case=False))]\                [['Article']].assign(edge_detection = 1)        print('get edge detection')        # Определение препятствий        df_obstancle_detection = df                [(df.Spatial_orientation.fillna('na').str.contains('стен|столкновен|препятств|предмет|провод', case=False))|                (df.Additional_features.fillna('na').str.contains('стен|столкновен|препятств|предмет|провод', case=False))]\                [['Article']].assign(obstancle_detection = 1)        print('obstancle detection')        # Доп. опции робота пылесоса ('Additional_features')        # Планировщик        df_scheduler = df[(df.Additional_features.fillna('na')                      .str.contains('таймер|расписан|график|автовключен|автоотключен|автовыключен|отложенный старт', case=False))]\                    [['Article']].assign(scheduler = 1)        print('get scheduler')        # возобновление уборки после подзарядки        df_continued_after_charging = df[(df.Additional_features.fillna('na')                                          .str.contains('после зарядки|после подзарядки', case=False))]\                    [['Article']].assign(continued_after_charging= 1)        print('get continued after charging')        # Турбощетка        df_turbo_brush = df[(df.Additional_features.fillna('na').str.contains('турборежим', case=False))|                    (df.Additional_features.fillna('na').str.contains('Турбощетка', case=False))]\                    [['Article']].assign(turbo_brush = 1)        print('get turbo brush')        # Бампер        df_bamper = df[(df.Additional_features.fillna('na').str.contains('бампер', case=False))]                    [['Article']].assign(bamper = 1)        print('get bamper')        # уф лампа        pattern = 'уф лампа|уфлампа|УФ стерилизация|лампа для стерилизации|УФ-обеззараживание|УФ-лампа|Уфампа'+                '|УФ дезинфекция|Ультрафиолетовая лампа'        df_uv_lamp = df[(df.Additional_features.fillna('na').str.contains(pattern, case=False))]                    [['Article']].assign(uv_lamp = 1)        print('get uv lamp')        # видеокамера        df_video_camera = df[(df.Additional_features.fillna('na').str.contains('видео|камер', case=False))]                    [['Article']].assign(video_camera = 1)        print('get video camera')        # самоочистка        df_self_cleaning = df[(df.Additional_features.fillna('na').str.contains('самоочистк|Автоматическая очистка', case=False))]                    [['Article']].assign(self_cleaning = 1)        print('get self cleaning')        # Основные числовые признаки с фиксированными единицами измерения        # распаршиваем одним циклом        '''        Длина упаковки ('Package_length')        Высота упаковки ('Package_height')        Ширина упаковки ('Package_width')        Ширина предмета ('Item_width',)        Высота предмета ('Item_height')        Вес товара с упаковкой (г) ('Weight_with_packaging_g')        Максимальный уровень звука/шума ('Max_noise_level')        Объем пылесборника ('Dustbin_capacity')        '''        col_name=['Package_length', 'Package_height', 'Package_width','Item_width','Item_height',                  'Weight_with_packaging_g', 'Dustbin_capacity', 'Max_noise_level']        new_col_name=['package_length_sm', 'package_height_sm', 'package_width_sm', 'item_width_sm','item_height_sm',                      'weight_with_packaging_g','dustbin_capacity_l', 'max_noise_level_db']        measures = [' см', ' см', ' см', ' см', ' см', ' г', ' л', ' дБ']        low_borders = [5, 5, 5, 4, 4, 500, 0.05, 0]        upper_borders = [200, 200, 200, 100, 100, 20000, 3, 100]        df_fixed_measure = df[['Article']].copy()        for col, new_col, meas, low, upper in zip(col_name, new_col_name, measures, low_borders, upper_borders):            values = []            for val in df[col]:                try:                    values.append(float(val.split(meas)[0]))                except Exception as e:                    values.append(None)            df_fixed_measure[new_col] = values            df_fixed_measure.loc[(df_fixed_measure[new_col] < low)|(df_fixed_measure[new_col] > upper), new_col] = None            print(f'get {col}')        # Распаршиваем числовые значения с разными единицами измерений        '''        Мощность устройства ('Device_power') - может быть выражена в Вт, Па, причем ед. изм-я могут пыть написаны разными регистрами, кириллицей или латиницей, с разными символами        Гарантийный срок ('Warranty_period') - может быть в днях, годах, месяцах и вообще произвольным текстом        Емкость аккумулятора ('Battery_capacity') - может быть в в мАч, Ач        '''        # Мощность устройства ('Device_power')        # Регулярные выражения для поиска значений        watt_pattern = re.compile(r'(\d+(\.\d+)?)(?:\s*[WwВвтт]+|W|Вт)?')        pa_pattern = re.compile(r'(\d+(\.\d+)?)\s*(?:Па|Pa|пА|pa|PA|па|ПА)')        # Функция конвертации Па в Вт        def pa_to_watt(pa):            return pa / 71        # Результирующий список        device_power_watt = []        Article = []        for article, item in zip(df.Article, df.Device_power):            try:                if pd.isna(item):                    device_power_watt.append(None)                    Article.append(article)                else:                    # Поиск значений в паскалях                    pa_match = pa_pattern.search(item)                    if pa_match:                        pa_value = float(pa_match.group(1))                        device_power_watt.append(pa_to_watt(pa_value))                        Article.append(article)                    else:                        # Поиск значений в ваттах                        watt_match = watt_pattern.search(item)                        if watt_match:                            device_power_watt.append(float(watt_match.group(1)))                            Article.append(article)                        else:                            # Если не удалось распарсить, добавляем np.nan                            device_power_watt.append(None)                            Article.append(article)            except Exception as e:                device_power_watt.append(None)                Article.append(article)        df_device_power_watt = pd.DataFrame({'Article': Article, 'device_power_watt': device_power_watt})        print('get device power watt')        # Гарантийный срок ('Warranty_period')        warranty_days = []        pattern = re.compile(r'(\d+|\b[Оо]дин\b|\b[Оо]дного\b|\b[Пп]олгода\b|\b1\b|\( один\)|\(один\))[\s\-·/]*([гГ]од|[мM][еЕе]с(?:[еЕе]ц)?|д[еЕе]н|г|г\.|м|дней|день|дней)',                             re.IGNORECASE)        for entry in df['Warranty_period']:            try:                if pd.isna(entry):                    warranty_days.append(None)                    continue                match = pattern.search(entry)                if match:                    value, unit = match.groups()                    if value.lower() in ['один', 'одного', '1', '( один)', '(один)']:                        value = 1                    elif value.lower() == 'полгода':                        value = 0.5 * 12                    else:                        value = int(value)                    if 'год' in unit.lower() or 'г' in unit.lower():                        days = value * 365                    elif 'мес' in unit.lower() or 'м' in unit.lower():                        days = value * 30                    elif 'ден' in unit.lower() or 'дней' in unit.lower() or 'день' in unit.lower():                        days = value                    else:                        days = None                    warranty_days.append(days)                else:                    warranty_days.append(None)            except Exception as e:                warranty_days.append(None)        df_warranty_days = df[['Article']].assign(warranty_days = warranty_days)        print('get warranty days')        # Емкость аккумулятора ('Battery_capacity')        battery_capacity_measure = []        battery_capacity = []        # Регулярные выражения для поиска единиц измерения        mAh_pattern = re.compile(r'(?:м|m)[\s\-*·/]*[Aа][\s\-*·/]*[hч]', re.IGNORECASE)        Ah_pattern = re.compile(r'[Aа][\s\-*·/]*[hч]', re.IGNORECASE)        #V_pattern = re.compile(r'[Вv]', re.IGNORECASE)        # Регулярное выражение для поиска числовых значений        number_pattern = re.compile(r'\d+(?:[\.,]\d+)?')        for item in df.Battery_capacity:            try:                if isinstance(item, str):                    # Поиск единиц измерения                    if mAh_pattern.search(item):                        battery_capacity_measure.append('mAh')                    elif Ah_pattern.search(item):                        battery_capacity_measure.append('Ah')                    else:                        battery_capacity_measure.append(None)                    # Поиск числовых значений                    numbers = number_pattern.findall(item)                    if numbers:                        battery_capacity.append(float(numbers[0].replace(',', '.')))                    else:                        battery_capacity.append(None)                else:                    battery_capacity_measure.append(None)                    battery_capacity.append(None)            except Exception as e:                print(repr(e))                print(item)                battery_capacity_measure.append(None)                battery_capacity.append(None)        df_battery_capacity = df[['Article']].assign(battery_capacity=battery_capacity)                                .assign(battery_capacity_measure=battery_capacity_measure)        df_battery_capacity['battery_capacity_mAh'] = np.where(df_battery_capacity.battery_capacity_measure=="Ah",                 df_battery_capacity.battery_capacity*1000,                 df_battery_capacity.battery_capacity)        df_battery_capacity = df_battery_capacity[['Article', 'battery_capacity_mAh']]        print('get battery capacity')        new_df = (df[['Article','name','price','old_price','rate','estimate','number_page','number_position','date']]            .merge(df_country, on='Article', how='left').merge(df_brand, on='Article', how='left')            .merge(df_color, on='Article', how='left').merge(df_manual_control, on='Article', how='left')            .merge(df_voice_assistant, on='Article', how='left').merge(df_app_control, on='Article', how='left')            .merge(df_remote_control, on='Article', how='left').merge(df_wash_cleaning, on='Article', how='left')            .merge(df_dry_cleaning, on='Article', how='left').merge(df_wet_cleaning, on='Article', how='left')            .merge(df_dry_and_wet_cleaning, on='Article', how='left').merge(df_garbage_bag, on='Article', how='left')            .merge(df_container, on='Article', how='left').merge(df_aquafilter, on='Article', how='left')            .merge(df_return_to_base, on='Article', how='left').merge(df_virtual_wall, on='Article', how='left')            .merge(df_space_map, on='Article', how='left').merge(df_carped_mode, on='Article', how='left')            .merge(df_laser_sensor, on='Article', how='left').merge(df_giroscope, on='Article', how='left')            .merge(df_lidar, on='Article', how='left').merge(df_edge_detection, on='Article', how='left')            .merge(df_obstancle_detection, on='Article', how='left').merge(df_continued_after_charging, on='Article', how='left')            .merge(df_turbo_brush, on='Article', how='left').merge(df_bamper, on='Article', how='left')            .merge(df_uv_lamp, on='Article', how='left').merge(df_video_camera, on='Article', how='left')            .merge(df_self_cleaning, on='Article', how='left').merge(df_fixed_measure, on='Article', how='left')            .merge(df_device_power_watt, on='Article', how='left').merge(df_warranty_days, on='Article', how='left')            .merge(df_battery_capacity, on='Article', how='left')            # В перекодированных в бинарные признаках заменим пропуск на 0            .fillna({'manual_control': 0, 'voice_assistant': 0, 'app_control': 0, 'remote_control': 0, 'wash_cleaning': 0,                'dry_cleaning': 0, 'wet_cleaning': 0, 'dry_and_wet_cleaning': 0, 'garbage_bag': 0, 'container': 0,                'aquafilter': 0, 'return_to_base': 0, 'virtual_wall': 0, 'space_map': 0, 'carped_mode': 0, 'laser_sensor': 0,                'giroscope': 0, 'lidar': 0, 'edge_detection': 0, 'obstancle_detection': 0, 'continued_after_charging': 0,                'turbo_brush': 0, 'bamper': 0, 'uv_lamp': 0, 'video_camera': 0, 'self_cleaning': 0,'rate':0,'estimate':0})            )        print('union df')        # Очистим от аксессуаров        patern = 'Детский|фильтр|салфет|щетк|Тряпк|ручной|насадк|Аксессуар|Сменн|Фибр|щеток'+                        '|Щётк|Комплект|Накладк|аксесуар|Мешок|датчик|Рампа|автомобил|Батарейка'        new_df = new_df[~new_df.name.str.contains(patern, case=False)].query('price>700').reset_index(drop=True).replace({np.nan: None})        print('cleaned df')        return new_df.to_dict()    @task    def load_data(records):        # Создаем DataFrame из словаря records        df = pd.DataFrame.from_dict(records)        # Приведение типов для вставки в ClickHouse        df['Article'] = df['Article'].astype(str)        df['name'] = df['name'].astype(pd.StringDtype()).fillna('')        df['price'] = df['price'].astype('Int64')        df['old_price'] = df['old_price'].astype('float64')        df['rate'] = df['rate'].astype('float64')        df['estimate'] = df['estimate'].astype('float64')        df['number_page'] = df['number_page'].astype('Int64')        df['number_position'] = df['number_position'].astype('Int64')        df['date'] = pd.to_datetime(df['date']).dt.date        df['country_of_manufacture'] = df['country_of_manufacture'].astype(pd.StringDtype()).fillna('')        df['brand'] = df['brand'].astype(pd.StringDtype()).fillna('')        df['color'] = df['color'].astype(pd.StringDtype()).fillna('')        # Для столбцов с типом Nullable(UInt8) - приведение к типу Int64 с замещением None на 0        uint8_columns = [            'manual_control', 'voice_assistant', 'app_control', 'remote_control',            'wash_cleaning', 'dry_cleaning', 'wet_cleaning', 'dry_and_wet_cleaning',            'garbage_bag', 'container', 'aquafilter', 'return_to_base', 'virtual_wall',            'space_map', 'carped_mode', 'laser_sensor', 'giroscope', 'lidar',            'edge_detection', 'obstancle_detection', 'continued_after_charging',            'turbo_brush', 'bamper', 'uv_lamp', 'video_camera', 'self_cleaning'        ]        for col in uint8_columns:            df[col] = df[col].astype('Int64')        # Для столбцов с типом Nullable(Float64)        float64_columns = [            'package_length_sm', 'package_height_sm', 'package_width_sm',            'item_width_sm', 'item_height_sm', 'weight_with_packaging_g',            'dustbin_capacity_l', 'max_noise_level_db', 'device_power_watt',            'warranty_days', 'battery_capacity_mAh'        ]        for col in float64_columns:            df[col] = df[col].astype('float64')        # Преобразуем DataFrame в список кортежей        records_tuples = [tuple(x) for x in df.to_numpy()]        ch_hook = ClickHouseHook(clickhouse_conn_id='clickhouse_default')        ch_hook.execute('''CREATE DATABASE IF NOT EXISTS scraper_wb''')        ch_hook.execute('''CREATE TABLE IF NOT EXISTS scraper_wb.robot_vacuum (                        Article String,                        name Nullable(String),                        price Nullable(Int64),                        old_price Nullable(Float64),                        rate Nullable(Float64),                        estimate Nullable(Float64),                        number_page Nullable(Int64),                        number_position Nullable(Int64),                        date Date,                        country_of_manufacture Nullable(String),                        brand Nullable(String),                        color Nullable(String),                        manual_control Nullable(UInt8),                        voice_assistant Nullable(UInt8),                        app_control Nullable(UInt8),                        remote_control Nullable(UInt8),                        wash_cleaning Nullable(UInt8),                        dry_cleaning Nullable(UInt8),                        wet_cleaning Nullable(UInt8),                        dry_and_wet_cleaning Nullable(UInt8),                        garbage_bag Nullable(UInt8),                        container Nullable(UInt8),                        aquafilter Nullable(UInt8),                        return_to_base Nullable(UInt8),                        virtual_wall Nullable(UInt8),                        space_map Nullable(UInt8),                        carped_mode Nullable(UInt8),                        laser_sensor Nullable(UInt8),                        giroscope Nullable(UInt8),                        lidar Nullable(UInt8),                        edge_detection Nullable(UInt8),                        obstancle_detection Nullable(UInt8),                        continued_after_charging Nullable(UInt8),                        turbo_brush Nullable(UInt8),                        bamper Nullable(UInt8),                        uv_lamp Nullable(UInt8),                        video_camera Nullable(UInt8),                        self_cleaning Nullable(UInt8),                        package_length_sm Nullable(Float64),                        package_height_sm Nullable(Float64),                        package_width_sm Nullable(Float64),                        item_width_sm Nullable(Float64),                        item_height_sm Nullable(Float64),                        weight_with_packaging_g Nullable(Float64),                        dustbin_capacity_l Nullable(Float64),                        max_noise_level_db Nullable(Float64),                        device_power_watt Nullable(Float64),                        warranty_days Nullable(Float64),                        battery_capacity_mAh Nullable(Float64)                    ) ENGINE = MergeTree()                    PRIMARY KEY (Article, date)                    ORDER BY (Article, date)''')        ch_hook.execute('INSERT INTO scraper_wb.robot_vacuum VALUES', records_tuples)    df = scraping_data()    transformed_df = preprocessing_data(df)    load_data(transformed_df)
robot_vacuum__scraper_wb = robot_vacuum__scraper_wb()

Задача, которую решаем

В данном проекте мы строим ETL-процесс (Extract, Transform, Load) для автоматизированного сбора данных о роботах-пылесосах из интернет-магазина. Этот процесс полезен для анализа конкурентных предложений, изучения изменения цен и характеристик, что помогает в настройке конкурентного ценообразования. Наши данные, собранные и обработанные в Airflow, будут загружены в ClickHouse для быстрого анализа и визуализации, в нашем случае, с помощью BI-инструмента Apache Superset.

Используемые библиотеки и инструменты

Airflow — обеспечивает автоматизацию и управление DAG (Directed Acyclic Graph), в котором каждый Task отвечает за отдельный этап обработки данных. pandas — используется для обработки данных, позволяет преобразовывать данные в таблицы, работать с типами данных, заменять значения, очищать данные перед вставкой в ClickHouse.

ClickHouseHook — модуль интеграции для взаимодействия с ClickHouse в Airflow. Мы используем его для создания таблицы и загрузки данных в ClickHouse.

Requests — отправляет HTTP-запросы к веб-сайту, с которого собираем данные.

BeautifulSoup — это библиотека для парсинга HTML и XML документов, которая упрощает извлечение нужных данных из веб-страниц. Она особенно полезна, когда нужно обработать содержимое страницы, которое уже загружено, поскольку может работать с HTML-кодом, загруженным, например, Selenium.

Selenium — библиотека для автоматизации веб-браузера, которая позволяет получить данные с динамически загружаемых страниц, обходить JavaScript-загрузку и взаимодействовать с элементами интерфейса сайта. Если целевой сайт загружает контент с помощью JavaScript или имеет прокручиваемые элементы, которые нужно открыть для полного парсинга данных, то Selenium будет полезен. С его помощью можно:

  • Открывать страницы, которые загружаются динамически.

  • Прокручивать страницы до нужного места, чтобы загрузить все товары.

  • Кликать на кнопки, раскрывать скрытые секции, чтобы получить полный набор данных.

В процессе веб-скрейпинга с использованием Selenium и BeautifulSoup взаимодействие обычно выглядит так:

  • Selenium загружает страницу в браузере и обрабатывает динамический контент, то есть запускает JavaScript, кликает по кнопкам, заполняет поля и прокручивает страницу, чтобы загрузить все элементы.

  • BeautifulSoup затем получает HTML-код уже загруженной страницы (например, через driver.page_source в Selenium) и помогает удобно и быстро извлечь необходимые данные, предоставляя доступ к элементам HTML с помощью знакомых селекторов, таких как find, find_all, select, что облегчает навигацию по дереву элементов.

Пример простого использования Selenium и BeautifulSoup:

from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from webdriver_manager.chrome import ChromeDriverManager
from bs4 import BeautifulSoup
driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()))
driver.get("URL")
driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
html = driver.page_source
driver.quit()
soup = BeautifulSoup(html, "html.parser")
articles = soup.find_all('article')
for article in articles:    title = article.find('span', class_='product-title').text    price = article.find('span', class_='product-price').text    print(f"Title: {title}, Price: {price}")

Поскольку статья посвящена запуску ETL на сервере, настройку локального окружения опускаем, но важно помнить, что Selenium требует наличия браузера и WebDriver.

Разбор DAG: три этапа ETL

В нашем примере весь процесс, который в терминологии Airflow называется DAG (Directed Acyclic Graph), состоит из трёх этапов — tasks. Каждый task выполняет конкретную часть ETL-процесса:

  1. extract_data — извлечение данных с сайта.

  2. transform_data — очистка и обработка информации.

  3. load_data — загрузка в ClickHouse.

Разберем их подробнее.

Task 1: extract_data

Задача extract_data отвечает за получение данных с сайта. Сначала мы отправляем HTTP-запрос на URL интернет-магазина для загрузки страницы товаров, а затем извлекаем необходимые данные.

А если подробнее:

  1. Инициализация веб-драйвера.

  2. Начальная настройка браузера (очистка кэша, удаление куков, обновление страницы) и поиск по ключевому слову ('робот-пылесос').

  3. Постраничное извлечение данных. Сначала данные собираются с первой страницы, а затем происходит постраничный переход для сбора данных с последующих страниц (до 10). slow_scroll выполняет медленный скроллинг, чтобы загрузить все элементы, так как страница подгружает товары по мере прокрутки. На каждой странице BeautifulSoup используется для парсинга HTML-кода, извлекаются товары, и результат добавляется в основной DataFrame.

  4. Обход карточек товаров для получения подробных характеристик по каждому товару. Каждая ссылка на товар открывается в браузере, и для каждого товара нажимается кнопка "Все характеристики и описание", которая раскрывает подробную информацию. Далее страница парсится с помощью BeautifulSoup, чтобы получить все параметры, такие как размеры, вес, тип фильтра и другие характеристики.

  5. Добавление метаинформации (номер страницы, позиция на странице, дата извлечения данных). После всех преобразований DataFrame конвертируется в словарь и возвращается для дальнейшего использования в следующих тасках.

Почему DataFrame нужно превратить в словарь

Конвертация DataFrame в словарь для передачи между тасками в Airflow помогает обойти ряд ограничений и обеспечивает стабильность работы DAG.

Во-первых, это обеспечивает совместимость с XCom: Airflow использует XCom (cross-communication) для обмена данными между тасками. XCom может сохранять только данные, которые могут быть сериализованы в JSON (списки, словари, строки и числа). Поскольку DataFrame не может быть сериализован напрямую, его нужно конвертировать в JSON-совместимый формат, такой как словарь.

во-вторых, конвертированный в словарь DataFrame сохраняет все ключевые данные (колонки и их значения), что позволяет передавать структурированные данные без потерь и с минимальными преобразованиями.

в-третьих, конвертация в словарь помогает уменьшить объем данных, так как XCom имеет ограничения на размер хранимых объектов.

Пример конвертации и передачи DataFrame через XCom

@task
def extract_data():    df = pd.DataFrame(...)  # Собранные данные    return df.to_dict()     # Преобразуем в словарь для XCom
@task
def transform_data(data):    df = pd.DataFrame.from_dict(data)  # Восстанавливаем DataFrame    # Продолжаем обработку...

Task 2: preprocessing_data

В этом Task выполняется очистка и приведение данных к нужным типам для их последующей загрузки в ClickHouse.

Обработка категориальных признаков

Это опции, которые извлекаются из разных полей, а именно из 'Additional_features', 'Control_type', 'Spatial_orientation', 'Cleaning_type'. Одна и та же опция может быть у одного продавца указана в одном поле, а у другого — в другом. Для обработки и извлечения опции заготовлены "мешки слов" и используется метод строковой фильтрации pandas.Series.str.contains().

Каждому пылесосу присваивается бинарное значение (флаг) для признака (1 или 0 — есть опция или нет).

Признаки:

  • Планировщик ('scheduler')

  • USB-зарядка ('usb_charging')

  • Возобновление уборки после подзарядки ('continued_after_charging')

  • Возврат на базу ('return_to_base')

  • Голосовой помощник ('voice_assistant')

  • Управление со смартфона ('smartphone_control')

  • Управление пультом ('remote_control')

  • Дистанционное управление ('distance_control')

  • Влажная уборка ('wet_cleaning')

  • Виртуальная стена ('virtual_wall')

  • Составление карты ('room_map')

  • Специальный режим для ковра, определение ковра ('carped_mode')

  • Лазерные датчики ('laser_sensor')

  • Гироскоп ('giroscope')

  • Лидар ('lidar')

  • Большие колеса ('big_wheels')

  • WiFi ('wifi')

  • Турбощетка ('turbo_brush')

  • Бампер ('bamper')

  • УФ-лампа ('uv_lamp')

  • Самоочистка ('self_cleaning')

  • Детекция края, ступенек, перепада высоты ('edge_detection')

  • Определение препятствий ('obstancle_detection')

Обработка числовых признаков, у которых единицы измерения зафиксированы

Отдельно были рассмотрены все величины и установлены их единицы измерения и распределение. Для каждого признака установлены допустимые границы нормальных значений, значения, выходящие за эти границы, заменены на пропуск (None). В дальнейшем с этими признаками можно работать по своему усмотрению в зависимости от задачи — заменить их на моду, медиану, среднее.

Признаки:

  • Длина упаковки ('Package_length')

  • Высота упаковки ('Package_height')

  • Ширина упаковки ('Package_width')

  • Ширина предмета ('Item_width')

  • Высота предмета ('Item_height')

  • Вес товара с упаковкой (г) ('Weight_with_packaging_g')

  • Объем пылесборника ('Dustbin_capacity')

  • Максимальный уровень звука/шума ('Max_noise_level')

Обработка числовых признаков с разными единицами измерений

Здесь для каждого признака были написаны регулярные выражения, широко используются методы из библиотеки re.

Признаки:

  • Мощность устройства ('Device_power') — может быть выражена в Вт, Па, причём единицы измерения могут быть написаны разными регистрами, кириллицей или латиницей, с разными символами;

  • Гарантийный срок ('Warranty_period') — может быть в днях, годах, месяцах и вообще произвольным текстом;

  • Емкость аккумулятора ('Battery_capacity') — может быть в мАч, Ач.

Объединение в один DataFrame и очистка

Убираем аксессуары для пылесосов с помощью фильтрации по вхождению подстрок ("щетки", "тряпки" и пр.) и дополнительно ограничиваем по цене (не ниже 700 руб.). Для числовых признаков заменяем NaN на 0 (где это имеет смысл, например, для полей с UInt8 типом, которые должны содержать 0 или 1), для строковых — на пустую строку ''. Преобразуем в словарь и передаём следующему таску.

Task 3: load_data

Этот Task загружает очищенные данные в ClickHouse. В нем создается таблица, если она еще не существует, а затем с помощью ClickHouseHook выполняем вставку в нее данных.

Декораторы Airflow и их преимущества

В Airflow есть два основных подхода к созданию тасков: с использованием PythonOperator и с помощью декораторов. В этом проекте применяются декораторы @dag и @task. Они помогают сделать код чище и компактнее.

Пример 1: Использование PythonOperator

Этот метод традиционен и обеспечивает гибкость, но требует дополнительной настройки для каждого таска. Ключевым элементом является передача функции в качестве аргумента в PythonOperator.

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
import pandas as pd
# Определение задач
def task_1():    print("Выполняется task_1")
def task_2(data):    print("Выполняется task_2")
# Определяем DAG
with DAG(dag_id="example_python_operator_dag", start_date=datetime(2025, 1, 1), schedule_interval="@daily") as dag:    t1 = PythonOperator(        task_id="task_1",        python_callable=task_1,    )    t2 = PythonOperator(        task_id="task_2",        python_callable=task_2,        op_args=["Здесь могут передаваться данные из task_1"]    )    t1 >> t2

Пример 2: Использование декораторов

Airflow 2.x добавил возможность использовать декораторы для создания тасков, что делает код проще и легче читаемым. Декораторы упрощают передачу данных между тасками через XCom, и код выглядит более «чистым».

from airflow import DAG
from airflow.decorators import dag, task
from datetime import datetime
import pandas as pd
@dag(    dag_id="example_taskflow_api_dag_with_dag_decorator",    start_date=datetime(2025, 1, 1),    schedule_interval="@daily",    catchup=False
)
def example_taskflow_dag():    @task    def task_1():        print("Выполняется task_1")        data = {"key": "value"}        return data    @task    def task_2(data):        print(f"Выполняется task_2 с данными: {data}")    data_from_task_1 = task_1()    task_2(data_from_task_1)
example_dag = example_taskflow_dag()

Сравнение PythonOperator и декораторов

Аспект

PythonOperator

Декораторы (@task)

Читаемость кода

Требует более длинной записи

Более простой и читаемый код

Передача данных

Явная (XCom или вручную)

Автоматическая через XCom

Гибкость

Высокая

Подходит для большинства ETL-кейсов

Совместимость

Поддерживается во всех версиях Airflow

Только в Airflow 2.x

Лучше всего подходит

Сложные/специализированные таски

Простые DAG и стандартные пайплайны

Вывод

Использование декораторов особенно удобно для ETL-кейсов, где важно передавать результат из одного таска в другой. PythonOperator остаётся хорошим вариантом для более сложных или нестандартных сценариев.

Запуск и отладка скрипта

Совет: Для первого запуска процесса рекомендуется сократить количество страниц для скрапинга, чтобы ускорить проверку работоспособности DAG.

Скрипт загружается на сервер с помощью команды:

scp <локальный путь к скрипту> <пользователь>@<хост>:/opt/beget/airflow/dags/<название скрипта>

Пример:

scp scripts/ETL_robot_vacuum__scraper_wb.py root@31.128.45.77:/opt/beget/airflow/dags/ETL_robot_vacuum__scraper_wb.py

Все DAG-файлы должны быть размещены в директории /opt/beget/airflow/dags.

Через несколько минут Airflow автоматически подгрузит DAG, если в скрипте нет синтаксических ошибок. Он появится в списке DAG'ов на Dashboard.

Что бы он запустился в указанное в schedule_interval время, его нужно активировать.

Apache Airflow имеет разнообразный набор виджетов в пользовательском интерфейсе, которые предназначены для мониторинга, управления и настройки рабочих процессов (DAGs или Дагов). Интерфейс Airflow достаточно интуитивен. В случае же, если вы чувствуете неуверенность, вы можете посмотреть или почитать более подробные гайды, посвященные именно работе с этим приложением. Например вот так выглядит результат выполнения процесса за 3 дня:

Airflow предоставляет удобные виджеты для отслеживания выполнения задач:

  • Graph View (центральная часть экрана): это графический вид DAG, где отображены задачи (scraping_data, preprocessing_data и load_data), их зависимости и текущий статус. Цветовая индикация указывает статус задач: зеленый для success (успешное выполнение), красный для failed (ошибка), и оранжевый для upstream_failed (зависимость завершилась ошибкой). В данном примере мы видим, что первые два запуска были не удачными, а в последние 2 запуска все задачи DAG успешно выполнены, о чем говорит зеленый цвет.

  • Task Duration (левая панель): этот виджет показывает продолжительность выполнения каждой задачи на временной шкале. Вертикальные полосы указывают на продолжительность выполнения задач scraping_data, preprocessing_data и load_data. Красные полосы могут сигнализировать об ошибках, которые произошли на предыдущих запусках задач. Полезно для анализа и оптимизации производительности задач, чтобы выявить, какие из них требуют больше времени.

Мы можем выбрать таск и посмотреть детали выполнения, например на вкладке Logs мы можем проверить логи (среди которых также будут наши отладочные принты). Например в таске preprocessing_data мы видим ошибку ETAIL: Token "NaN" is invalid.

На вкладке XCom для первого таска scraping_data мы можем увидеть, какие данные были переданы в таск 2 и убедиться, что nan действительно присутствует. Причина в том, что хотя была сделана обработка df.where(pd.notnull(df), None) перед преобразованием в словарь на выходе из первого таска, но она не обработала None для int и float (признаки rate, estimate, old_price). Если столбец имеет тип данных, который не поддерживает None (например, int64 или float64), то None не может быть сохранен в этих ячейках. В таких случаях None будет либо игнорироваться, либо заменяться на NaN.

Преобразуем DataFrame к типу object, прежде чем заменять значения:

df = df.astype(object).where(pd.notnull(df), None)

Буквально через несколько секунд мы можем убедиться, что исправления скрипта применены, зайдя на вкладку Code:

Как видите, несомненное преимущество Airflow по сравнению, например, с планировщиком cron, состоит не только в наглядности, а в том, что можно просматривать статус выполнения, логи и историю запуска, а также повторно запускать задачи без дополнительных скриптов и настроек.

Совет: для отладки запускать не полный скрипт, например обход части карточек товара, а не всех.

Вот так выглядит результат недельной работы скрипта - как видите, полет нормальный.

Работа с Superset

Основные сущности Superset

Apache Superset — это мощная платформа для визуализации данных и создания дашбордов. В Superset существуют несколько ключевых сущностей, которые помогают организовывать данные и строить визуализации: datasets (наборы данных), charts (диаграммы), queries (запросы) и dashboards (дашборды). Давайте рассмотрим каждую из них подробнее.

  • Datasets (Наборы данных) — это собой источники данных, с которыми можно работать для построения визуализаций и дашбордов.

    В Superset есть два типа наборов данных:

    • Физические наборы данных (Physical datasets): это наборы данных, напрямую привязанные к таблицам или представлениям (views) в базе данных. Они обычно содержат реальные данные из подключенного источника (например, из ClickHouse, PostgreSQL и других поддерживаемых баз данных).

    • Виртуальные наборы данных (Virtual datasets): это наборы данных, основанные на результатах SQL-запросов. Виртуальные наборы данных полезны, когда нужно создать набор данных на основе сложного запроса, агрегировать или трансформировать данные перед визуализацией. Они создаются через интерфейс SQL Lab, где можно написать запрос, который будет сохраняться как виртуальный набор данных.

  • Charts (Диаграммы) — это визуальные представления данных, которые создаются на основе наборов данных. Каждая диаграмма строится на основе определённого набора данных и требует выбора метрик, измерений и фильтров.

  • Queries (Запросы) — создаются в процессе работы с данными, особенно при создании виртуальных наборов данных или визуализаций. В SQL Lab можно писать и выполнять SQL-запросы, чтобы исследовать данные и проверять гипотезы. Эти запросы можно сохранить как виртуальный набор данных или использовать для анализа и создания диаграмм. Запросы могут быть как временными, для изучения данных и получения выводов, так и сохранёнными, если на их основе нужно создать виртуальный набор данных для дальнейшего использования.

  • Dashboards (Дашборды) — это составные панели, которые позволяют объединять несколько диаграмм и визуализаций в одной рабочей области. На дашбордах можно добавлять фильтры, которые могут быть глобальными (например, общий фильтр по дате) или индивидуальными для каждой диаграммы.

Настройка Superset и создание визуализаций

Шаг 1. Настройка подключения к ClickHouse

  • Откройте интерфейс Superset и перейдите в Settings → Database Connections.

  • Нажмите на кнопку + Database в правом верхнем углу.

  • В списке баз данных выберите ClickHouse Connect.

  • Заполните форму подключения, указав URI базы данных.

  • Нажмите Test Connection, чтобы проверить соединение.

  • При успешной проверке нажмите Connect для завершения настройки.

После этого Superset отобразит новое подключение в списке баз данных.

Шаг 2. Строим запросы

Для работы с ClickHouse можно использовать интерфейс SQL Lab:

  • Напишите SQL-запрос к нужной таблице или представлению.

  • Выполните запрос и проверьте результат.

  • При необходимости сохраните результат в виде виртуального датасета.

Это позволит использовать результаты сложных SQL-запросов как источник данных для визуализаций.

Шаг 3. Строим диаграммы и дашборды

  • На основе физического или виртуального датасета создайте нужные диаграммы (charts).

  • Используйте различные типы визуализаций: графики, таблицы, круговые диаграммы и др.

  • Соберите диаграммы на одном дашборде (dashboard).

  • Добавьте фильтры — глобальные (например, по дате) или локальные для конкретных диаграмм.

Пример готового дашборда.

Заключение

В этой статье мы на практике рассмотрели, как можно реализовать ETL-процесс с использованием Superset, Airflow и ClickHouse на примере реального кейса — сбора данных о роботах-пылесосах с сайта Wildberries. Мы прошли путь от настройки окружения, написания DAG и скрапинга, до загрузки очищенных данных в ClickHouse и построения дашбордов в Superset.

Этот стек инструментов позволяет быстро организовать полноценный аналитический пайплайн: автоматизировать сбор и обработку данных, визуализировать ключевые метрики и обеспечить доступ к аналитике в удобной форме. Airflow даёт прозрачность и контроль, ClickHouse — высокую производительность, а Superset — мощные возможности визуализации без необходимости писать фронтенд.

Надеюсь, этот пример окажется полезным и послужит базой для ваших собственных ETL-процессов и аналитических систем.

Комментарии (0)