Всем привет!

Меня зовут Максим Шептяков, и я занимаюсь продуктовой аналитикой уже больше четырёх лет. Так получилось, что я несколько раз приходил в компании или проекты, где (почти) совсем не было аналитики до меня, так что приходилось настраивать удобную работу с данными с нуля. И сегодня я вам расскажу, как быстро построить себе МВП аналитической системы без Docker, Kubernetes и Airflow, ведь часто аналитики не обладают знаниями этих систем.

Для понимания статьи нужно базовое знание Python и SQL.

Что есть в компании, куда вы пришли, с самого начала

Обычно в компании, куда вы пришли, уже есть какой-то рабочий продукт, необходимые данные о котором хранятся в боевой (то есть production) базе, на которой настроена репликация в базу-реплику. Первому аналитику выдаётся право ходить в реплику и получать данные оттуда. Но есть несколько нюансов:

  • Нельзя кидать слишком тяжелые запросы, ведь идёт постоянная репликация

  • Нельзя модифицировать схему данных, ведь это реплика production-базы

  • Часто нельзя коннектиться сторонними сервисами для визуализации данных

  • Некоторые данные нужно получать не из реплики, а из каких-то других источников (другие БД, API, etc.)

Подход, когда для каждой задачи отдельно подготавливаются данные возможен, но от него начинает тошнить уже на первой неделе такой работы, поэтому возникает желание поднять себе удобную аналитическую систему.

Как выглядит удобная аналитическая система

По моему опыту, для удобной работы аналитическая система должна удовлетворять следующим критериям:

  • Все данные, необходимые для работы, лежат в одном месте и автоматически обновляются

  • Аналитик имеет доступ ко всем данным, необходимым для работы, и может запускать на них тяжелые запросы

  • Аналитик может создавать промежуточные таблицы, витрины данных, вьюхи, etc

  • Аналитик может быстро добавить новый источник данных или новую витрину данных внутри системы

  • Аналитик может подключить систему визуализации к аналитической системе и выдать доступ команде

Схема аналитической системы
Схема аналитической системы

Кажется, что построить такую систему с нуля, да ещё и когда ты не разработчик, а аналитик, довольно сложно. Но я покажу, что для создания MVP (Minimal Viable Product), не нужно прикладывать больших усилий и что создать рабочий прототип для удобной работы можно всего за несколько часов.

В этом гайде сфокусируемся на части с загрузкой и обновлением данных в БД и на доступе аналитика ко всем данным.

Где будут храниться данные

Для хранения данных можно выбрать бесплатную реляционную БД, например, Postgres. Надеюсь, что в компании, куда вы пришли, есть ребята, ответственные за инфраструктуру, которые могут вам поднять БД и выдать все доступы для неё.

Но если это не так, есть несколько способов, как можно поднять себе аналитическую базу:

Не буду останавливаться на том, как поднимать базы данных, а лучше расскажу про то, как можно организовать данные внутри аналитической БД.

Схема данных для аналитической БД
Схема данных для аналитической БД

Я разделил аналитической БД на несколько схем, у каждой из которых своя роль:

  • etl — схема для загрузки сырых данных из различных источников

  • arch — схема для хранения старых сырых данных, которые могут понадобиться в будущем

  • public — схема для работы аналитика, то есть схема для проведения исследований, складывания временных данных, дизайна новых витрин данных

  • pre_dash — схема для предобработанных данных, которые будут использоваться для подготовки витрин для визуализации и для работы аналитика, но не будут доступны для доступа не из БД

  • dash — схема для данных, подготовленных для визуализации (то есть для небольших витрин данных, которые можно быстро достать из БД)

Для удобной работы с такой системой, нам пригодится 3 пользователя:

  • loader — системный пользователь для загрузки данных из внешних источников в аналитическую БД и обработки данных внутри БД

  • analyst — пользователь, у которого есть доступ ко всем данным и возможность работать в схеме public

  • viewer — системный пользователь для визуализации, у которого есть доступ только к схеме DASH с данными, готовыми для отрисовки

Код создания схем, пользователей и выдачи доступов в Postgres:

-- Создаём схемы данных
create schema etl;
create schema pre_dash;
create schema dash;
create schema arch;

-- Создаём пользователей
create role loader login password '*loader_password*';
create role analyst login password '*analyst_password*';
create role viewer login password '*viewer_password*';

-- Выдаём доступы пользователям к нужным схемам и таблицам в них 
-- После создания таблиц нужно будет заного выдать доступы к таблицам пользователям
grant usage, create on schema etl, arch, pre_dash, dash to loader;

grant usage on schema etl, arch, pre_dash, dash to analyst;
grant usage, create on schema public to analyst;
grant select on all tables in schema etl, arch, pre_dash, dash, public to analyst;


grant usage on schema dash to viewer;
grant select on all tables in schema dash to viewer;

Аналитическая БД готова к работе, можно начинать наполнять её данными!

ETL для аналитической БД

Теперь перейдём к сложной части, настройке ETL для автоматической загрузки и подготовки данных.

Давайте для начала определимся с задачами, которые должна выполнять ETL-система:

  • Загрузка данных из разных источников данных в аналитическую БД

  • Быстрое добавление новых источников данных для загрузки

  • Обработка данных внутри аналитической БД

  • Быстрое добавление новых обработок данных внутри БД

  • Учёт зависимостей данных при обработке

  • Работа по расписанию (мы рассмотрим работу раз в день)

Для начала создадим класс, который будет отвечать за работу ETL-системы. Сразу учтём то, что работать он будет раз в день.

from datetime import date


class TransferData:
    """
    Класс, ответственный за сбор данных из внешних источников и сохранение их в аналитическую БД
    """
    def __init__(self, work_date=date.today()):
        self.work_date = work_date

Все загрузки и обработки данных будут представлены как JSON-файлы (джобы), по которым будет проходиться класс TransferData. В этом классе должна быть информация о

  • Типе проводимой работы (загрузка или обработка данных)

  • Названии источника данных для загрузки

  • Необходимых параметрах для загрузки/обработки данных

Создадим первую джобу, которая будет собирать данные из открытого API https://api.publicapis.org/entries и перекладывать их к нам в аналитическую базу. Для описания этого процесса мы можем создать словарь в Python:

load_job = {
    # Тип джобы
    'job_type': 'load', 
    # Название источника данных
    'db_from':'open_api', 
    # DDL таблицы, в которую будут заливаться данные
    'ddl': '''create table if not exists etl.open_apis
(
    api text,
    description text,
    auth text,
    https text,
    cors text,
    link text,
    category text,
    load_date date
);''',
    # Название таблицы, в которую будут заливаться данные.
    'product': 'etl.open_apis',
    # Название промежуточного файла, через который будут загружаться данные
    'file_name': 'open_apis.txt',
}

И теперь напишем код, который при использовании нашего JSON создаст в аналитической БД таблицу и загрузит в неё данные из другой БД.

# Добавим импорты в начало файла
import psycopg2
import os

# .....

# Добавим 2 функции в класс TransferData
class TransferData:

# .....
  
    def load_data(self, load_job):
        """
        Функция для загрузки данных из внешних источников на диск.
        Должна по итогу создавать csv-файл на диске с форматом sep='|', quotechar="^".

        :param load_job: словарь с описанием джобы для загрузки данных
        """
        if load_job.get('db_from') == 'open_api':
            # Специфичная для источника логика сбора данных
            import requests
            import pandas as pd
            data = requests.get('https://api.publicapis.org/entries').json()
            data = pd.DataFrame(data['entries'])
            data['work_date'] = self.work_date
            data = data.loc[:, ['API', 'Description', 'Auth', 'HTTPS', 'Cors', 
                                'Link', 'Category', 'work_date']]
            # Сохраняем в специальный формат для уменьшения количества ошибок при загрузке
            data.to_csv(load_job.get('file_name'), index=False, 
                        sep='|', quotechar="^")

    def upload_data(self, load_job):
        """
        Функция для загрузки данных с диска в таблицы в аналитической БД.
        Для корректной работы необходим файл в формате csv с параметрами DELIMITER '|', QUOTE '^'
        ANALYTICS_DB_CONN_STRING - переменная окружения, в которой содержится connect-строка для
        нашей аналитической БД для юзера loader.
        Например, postgres://loader:*loader_password*@localhost:5432/postgres

        :param load_job: словарь с описанием джобы для загрузки данных
        """
        #
        with psycopg2.connect(os.environ['ANALYTICS_DB_CONN_STRING']) as conn:
            curs = conn.cursor()
            file_name = load_job.get('file_name')
            with open(file_name, 'r') as f:
                product = load_job.get('product')
                curs.execute(load_job.get('ddl'))
                curs.execute(f'truncate {product};')
                copy_query = f"""COPY {product} from STDIN with (FORMAT csv, DELIMITER '|', QUOTE '^', HEADER True)"""
                curs.copy_expert(copy_query, f)
                conn.commit()
            os.remove(file_name)

Теперь можно запуском 2 функций выгрузить данные из одного источника и загрузить их в аналитическую БД:

# Убедитесь, что находитесь в одной директории с файлом simple_etl.py
# и что установлены все зависимости из requirements.txt: pip install -r requirements.txt
import simple_etl as se

tr = se.TransferData()
tr.load_data(load_query)
tr.upload_data(load_query)

Теперь нам нужно создать обработку данных внутри БД. Создадим такую витрину, которая будет содержать в себе данные о количестве открытых API в https://api.publicapis.org/entries на каждый день:

process_job = {
    'job_type': 'process',
    # SQL-запрос, чтобы получить требуемые данные. load_date подставится в коде:
    'query': '''create table if not exists dash.openapi_daily_count
(
    work_date date,
    public_api_count int
);

delete from dash.openapi_daily_count
where work_date = '{work_date}';

insert into dash.openapi_daily_count(work_date, public_api_count) 
select work_date, count(distinct link) public_api_count 
from etl.open_apis 
where work_date = '{work_date}'
group by 1;''',
    # Название таблицы, в которую будут заливаться данные. Нужно для правильной работы зависимостей:
    'product': 'dash.openapi_daily_count',
    # Список таблиц, от которых зависит обработка данных
    'dependencies': ['etl.open_public_apis'],
}

И добавим код, который обработает данные внутри БД:

class TransferData:

# .....

    def process_data(self, process_job):
        """
        Функция для обработки данных внутри БД.

        :param process_job: Описание джобы по обработке данных.
        """
        with psycopg2.connect(os.environ['ANALYTICS_DB_CONN_STRING']) as conn:
            curs = conn.cursor()
            curs.execute(process_job.get('query').format(work_date=self.work_date))
            conn.commit()

Теперь мы можем обработать собранные данные одной командой:

import simple_etl as se
tr = se.TransferData()
tr.process_data(process_job)

Давайте выполним требование по быстрому добавлению новых источников данных и новых обработок данных. Для этого можно создать папку, в которой будут сохранены все джобы, которые нам нужны, в виде JSON-файлов.

import os
import json 

os.mkdir('etl_jobs')

with open('etl_jobs/dash_revenue_daily.json', 'w') as f:
    json.dump(process_job, f, indent=4)
    
with open('etl_jobs/etl_payments_daily.json', 'w') as f:
    json.dump(load_job, f, indent=4)

Теперь мы можем быстро добавлять в папку JSON-файлы с описанием джобов, которые нужно выполнить в течение дня.

Запуск всех джобов с учётом зависимостей

Теперь создадим функцию, которая поможет нам запускать все джобы разом, с учётом зависимостей.

# ...
import json

class TransferData:
    # .....
    # Обновим __init__ метод: 
    def __init__(self, work_date=date.today(), jobs_dir='etl_jobs'):
        self.work_date = work_date
        self.jobs_dir = jobs_dir
        self.loaded_dependencies = set()

    # .....

    def launch_job(self, job):
        """
        Функция, которая запускает джобы в зависимости от их типа

        :param job: Джоба по сбору или обработке данных
        """
        job_type = job.get('job_type')
        if job_type == 'load':
            self.load_data(job)
            self.upload_data(job)
        elif job_type == 'process':
            self.process_data(job)
        else:
            print('Unknown job type for job', job)

    def launch_all_jobs(self):
        """
        Запускаем все джобы с учётом наличия зависимостей в них
        """
        # Загружаем все джобы в память
        job_paths = os.listdir(self.jobs_dir)
        jobs = {}
        for job_path in job_paths:
            with open(os.path.join(self.jobs_dir, job_path), 'r') as f:
                jobs[job_path] = json.load(f)
        # Итерируемся по джобам, проверяя, загружены ли зависимости для них.
        # Осторожно, если зависимости не найдутся, цикл будет вечным
        while True:
            jobs_local = jobs.copy() # Для удаления элементов словаря во время итерации по нему
            for job_name, job in jobs_local.items():
                job_dependencies = job.get('dependencies')
                if job_dependencies is None or len(set(job_dependencies) - self.loaded_dependencies) == 0:
                    print(f'Started {job_name}')
                    self.launch_job(job)
                    product = job.get('product')
                    if product is not None:
                        self.loaded_dependencies.add(product)
                    jobs.pop(job_name)
                    print(f'Finished {job_name}')
            if len(jobs) == 0:
                break
        print('All jobs processed')

Теперь мы можем запустить все джобы, находящиеся в папке etl_jobs с помощью одной команды:

import simple_etl as se
tr = se.TransferData()
tr.launch_all_jobs()

Регулярный запуск сбора и обработки данных

Для регулярного запуска сбора и обработки данных добавим в файл с классом TransferData логику запуска обработки данных:

if __name__ == '__main__':
    """
    Запускаем обновление данных раз в день в 10.00
    """
    import schedule
    import time

    def scheduled_update():
        tr = TransferData(jobs_dir='path_to_jobs_dir/')
        tr.launch_all_jobs()

    schedule.every().day.at("10:00").do(scheduled_update)

    while True:
        schedule.run_pending()
        time.sleep(60)

Теперь мы можем запустить на своём компьютере (или на сервере, если вам его могут выдать ваши инфраструктурщики) в фоне запуск файла и наслаждаться обновлёнными данными в БД каждый день:

Как добавить новый сбор данных из нового источника

Добавить сбор данных из нового источника можно, если обновить функцию load_data и добавить новое условие в if c новым db_from. Тут можно добавлять логику сбора из любых источников: базы данных, API, документы, онлайн-таблицы...

Так можно добавить новый сбор данных в код:

    def load_data(self, load_job):
        # .....
        if load_job.get('db_from') == 'open_api':

           # .....

        elif load_job.get('db_from') == 'other_db_type':
            # DO SOME HERE AND SAVE FILE TO load_job.get('file_name')
            return

Ссылка на репозиторий с полным кодом simple_etl.

Вот и всё

Поздравляю, вы создали MVP аналитической системы! Теперь у вас есть:

  • Автоматическая загрузка и обработка данных из разных источников

  • Возможность работать со всеми данными в одной БД

  • Лёгкое и быстрое добавление новых данных (или обработка имеющихся) в систему

  • Возможность подключить визуализацию к заранее подготовленным и обновляемым данным

Наполняйте систему нужными данными и удобно работайте с ними в одной БД!
Дальше вы можете настроить автоматически обновляемые дэшборды в любой удобной системе визуализации, например, в Metabase (если интересно увидеть, как её можно просто настроить, пишите в комментарии).

Послесловие

Конечно, эта система — это лишь начало, её можно и нужно улучшать и развивать: добавлять логирование, обработку ошибок, параллельность работы, улучшать отказоустойчивость... Вообще говоря, обычно для этого существуют отдельные команды. Но это уже совсем другая история!

Комментарии (10)


  1. igor_suhorukov
    00.00.0000 00:00
    +1

    Почему для аналитической БД выбран обычный PostgreSQL?


    1. Sheptyakov Автор
      00.00.0000 00:00

      Если ты начинающий аналитик, то вряд ли сможешь поднять и содержать что-то сложнее. А если данных много, то скорее всего, уже будет какая-то система под это. Тем временем, Postgres неплохо работает с довольно большими данными, до 10ТБ на нём можно ещё работать, если грамотно распределить данные.


      1. igor_suhorukov
        00.00.0000 00:00
        +1

        Использование колоночных БД для аналитических запросов - обычная практика при объемах от сотни гигабайт данных. CitusDB, Greenplum, Redshift, Vertica, ClickHouse, Druid, QuestDB...

        Ну как минимум для целей аналитики надо в PostgreSQL подключать расширение CitusDB. Так в моей статье для запроса по подсчету зданий это ускорило выполнение аналитического запроса в 10раз. Или устанавливать форк PostgreSQL - GreenPlum.


        1. Sheptyakov Автор
          00.00.0000 00:00

          Согласен, очень правильное дополнение.


  1. web3_Venture
    00.00.0000 00:00

    видно там данных на одну горстку.

    А если на нормальных системах такие запросы гонять это смерть )

    Хотя даже для "не девов" есть тот же metabase


    1. Sheptyakov Автор
      00.00.0000 00:00

      Фишка в том, что в этой системе можно предподготовить данные, что занимает много времени и проходит раз в день. Потом в ходе работы или визуализации количество данных, с которыми работает аналитик, обычно сильно меньше входных.

      Но согласен, в целом эта система - это прототип для того, чтобы можно было начать работать, а не production-ready решение.


  1. tishkovets_maxim
    00.00.0000 00:00

    Сколь прекрасно автор! Я доковылял до очень похожей системы, но читаю в основном файлы (источников файлов становится неприлично много) и класс инициализируется через эрзац-базу в Экселе, в эрзац-базе - список источников и приведение полей.

    (По ленности, скудоумию и отсутствию более опытных коллег потратил на этот путь почти 2 года)


  1. astentx
    00.00.0000 00:00

    Как-то тема отказа от Docker/Airflow/K8s не раскрыта. Если поднимаем локальный Postgres, то почему бы не инсталлировать точно так же Docker и далее по мануалу на странице официального образа? До уровня "оно работает" как правило референсной страницы достаточно. Поскольку наш условный аналитик умеет в Python для написания пусть и не сложного управляющего процесса, у него не должно составить большого труда подключить это же в самый простой DAG в AirFlow (который может быть запущен также в контейнере) по Quick Start Guide. Но он получает гораздо больше вариантов мониторинга, перезапуска, управления датами из расписания, а не от today (а ведь 100% что-то будет падать периодически), визуализации зависимостей. При числе объектов больше 20 это будет несколько проще, чем просто голый код. Для T части в ETL на SQL как начальный вариант - dbt. Он имеет возможность строить модель, запускать обновление данных по зависимостям, визуализировать их из коробки.


    1. Sheptyakov Автор
      00.00.0000 00:00

      Основная тема -- не отказ от других инструментов, а использование тех, которые уже знаешь. Когда нужно начать хоть как-то работать, освоение новых для тебя технологий занимает довольно много времени, даже если нужно просто пройтись по чеклисту. То не работает какая-то версия, то не хватает зависимостей, то у тебя виндовс, а не мак... В общем, проблем обычно везде хватает.

      Безусловно, когда базовая потребность собирать и подготавливать самые основные данные закрыта, самое время разбираться с этими инструментами. Ну или если есть время сразу разбираться, то делать это можно и сразу.


      1. kizimenko
        00.00.0000 00:00
        +1

        Моя рекомендация любому аналитику, у которого нет норм инфры, купить себе средненький vps если компания не оплачивает и выстраивать там себе все ништяки. И не будет проблем с win и m1

        Далее docker compose который поднимает все что нужно. Jupyter Airflow DB VPN. Даже без знания докера готовых сборок много и запускаются они одной командой

        А там уже со временем разобраться надо ли ему ковырять airflow и другие инструменты или нет