Привет!
У малых (да даже и крупных) компаний на практике часто встречается задача построить полноценную аналитику, используя при этом данные из excel, csv файлов. Разнообразие подходов к заполнению и образованию таких файлов может быть разное:
ручной ввод данных в локальные excel файлы
получение выгрузок данных в файлах от подрядчиков, партнеров
совместное заполнение таблиц например в google sheets и их дальнейшее использование
При этом не всегда можно предложить решение прямой интеграции данных из одной системы в другую или из системы в аналитическую базу данных.
В этой статье рассмотрим практическое решение разработки потока данных с загрузкой csv, excel файлов в базу данных для будущей аналитики над этими данными. Особенность решения заключается в инфраструктуре – будем использовать serverless-инструменты в облаке, которые позволяют быстро и дешево реализовать data pipelines.
Предлагаю подписаться на мой телеграм-канал про аналитику и инжиниринг данных - Data Study. Тебя ни к чему не обязывает, а мне будет приятно.
Что такое data pipleline
Data pipelines (потоки данных) – это технические решения для извлечения, обработки и загрузки данных из системы источника в систему приемник. Еще потоки данных можно описывать как ETL-процессы, которые применяются в рамках проектов построения платформ данных (Data Platform) и аналитических хранилищ данных (Data Warehouse).
Extract – Извлечение/получение данных из системы источника
Transform – Трансформация данных (например, фильтрация, преобразование форматов, обогащение, агрегация, дедупликация и другое)
Load – Загрузка данных в систему приемник
Хочу отметить, что шаг Transform может отсутствовать или например стоять после Load, тогда сам процесс будет называться ELT.


Для каких задач применяются потоки данных в компаниях
Если задуматься и погрузиться работу с данным и в компании, то потоки данных – они повсюду, часть из них могут быть автоматизированы, часть существует в полу-ручном режиме.
Системы обмениваются данными между собой для работы сервисов, приложений и продуктов компании (интеграции систем например с помощью API)
Из этих же систем данные могут забираться в аналитическую базу данных/хранилище для построения аналитических отчетов и единого доступа к данным в хранилище (с помощью API, очередей сообщений или прямое подключение к базам данных)
Загрузка или выгрузка ручных файлов специалистами компании из рабочих систем, отчетов, почты, различных ботов, отправляющих сводки и мини-отчеты
Ручные выгрузки данных аналитиками или data science специалистами для ad-hoc задач или обучения моделей машинного обучения
Все эти потоки данных внутри компании покрывают разные задачи и бизнес-кейсы компаний: от обмена данными между сайтом и CRM-системой, до обеспечения данными сервиса рекомендаций на основе ML для более персонализированных предложений клиентам компании.
Давайте рассмотрим следующий бизнес кейс: Данные от партнеров компании приходят сотрудникам в csv формате. В файлы могут вноситься ручные правки, после чего есть потребность стоить аналитический отчет по этим данным в виде интерактивного дашборда.
Ниже рассмотрим техническое решение как обеспечить загрузку csv файлов в базу данных, где уже в свою очередь можно более качественно подготовить данные для анализа и использовать их в BI инструменте.
Практическое построение потока данных в облаке с использованием serverless сервисов
Описание инфраструктуры

Все сервисы инфраструктуры доступны в Яндекс Облаке. Для интерфейса ручной загрузки csv/excel файла используется Яндекс Форма (версия для бизнеса). В формах очень просто добавить поле для загрузки файлов, настроить интеграцию для загрузки этих файлов в объектное хранилище Object Storage (можно воспользоваться инструкцией), а сама форма будет доступна для пользователей по ссылке в браузере, при этом ее можно открыть как в веб-версии, так и в мобильной версии.
Для хранения файлов с данными в облаке выбрано стандартное решение – Object Storage, которое позволяет хранить данные разных форматов недорого, а для кейсов с небольшими объемами данных тарификация позволяет хранить данные бесплатно.
Когда файл сохранился в бакет Object Storage, его нужно считать и загрузить в базу данных в виде таблицы. В качестве базы данных будем использовать Managed Service for PostgreSQL, который настраивается и становится доступным для использования буквально через 10 минут после создания кластера и предоставляется как SaaS решение с администрированием на стороне облачного провайдера.
Чтобы построить сам data pipeline между Object Storage и базой PostgreSQL выбор пал на сервис Cloud Functions. Это решение позволяет написать нужную логику на Python прямо в облаке и там же запускать эту функцию без каких-то дополнительных сервисов, CI/CD, отдельных etl-инструментов и виртуальных машин для них. В чем плюсы использовать Cloud Functions в данном решении:
простота. не нужно отдельно поднимать виртуальную машину или отдельные etl инструменты типа Apache Airflow чтобы запускать python код для наших целей, а значит и платить за эти сервисы тоже не нужно
бесплатное использование вызова функций в рамках тарификации. Лимит бесплатного использования на момент написания статьи – 1 000 000 вызовов в месяц, что довольно много, в нашем кейсе функция будет вызываться 10-20 раз в день.
есть отдельный инструмент настройки триггеров, который вообще не тарифицируется. Для нас актуален триггер на отслеживание записи нового файла в Object Storage, по которому будет вызываться сама cloud функция обработки этого файла
Стоит отметить, что это решение - максимально простое и дешевое в реализации. Но из-за этого оно может приводить к своим ограничениям при масштабировании, появления новых требований и увеличения объема загружаемых данных. Не говоря уже от том, что в принципе считывать ручные файлы в табличный вид требует их четко зафиксированного формата, чтобы не возникало ошибок при их обработке. Также разные файлы возможно потребуется грузить в разные таблицы, тогда потребуется вводить правила еще и на нейминг файлов для их разбора и конечно реализовывать логику обработки ошибок при загрузке данных и уведомлениях об этом пользователей или ответственных за этот процесс аналитиков/инженеров.
Настройка потока данных
Расчехляем свои технические навыки и идем все это настраивать в облаке)
1. Для настройки сервисов вам понадобится изначально создать само облако, этот этап описывать не буду, его можно пройти по инструкции. Здесь важный момент, что для возможности создавать сервисы внутри облака, нужно будет создать платежный аккаунт и привязать свою банковскую карту, после создания облака будут подсказки для этого. Хорошая новость, что если у вас до этого не было яндекс облака, вам будет доступен грант, который можно использовать для бесплатного использования сервисов на первое время.
2. Настраиваем сервисный аккаунт
- создаем сервисный аккаунт с ролями storage.editor
, functions.mdbProxiesUser
, functions.functionInvoker
(эти роли позволяют работать с Object Storage и Cloud Functions)

- создаем статический ключ доступа в сервисном аккаунте. сохраняем себе где-нибудь в заметках идентификатор ключа и секретный ключ, они вам пригодятся в следующих шагах


3. Создание бакета в Object Storage для хранения файлов
- Заходим в сервис Object Storage


- Создаем бакет и папку внутри него


4. Создаем Яндекс Форму
- Заходим в Яндекс Формы для бизнеса, создаем новую форму, добавляем в нее поле для загрузки файлов

- Переходим в настройки формы и настраиваем интеграцию с Object Storage по инструкции

На этом шаге можно проверить как работает интеграция. Публикуем форму, загружаем туда файл и отправляем форму. Идем в бакет Object Storage, там должен появиться загруженный файл.
5. Создаем первую версию Cloud Function для тестирования интеграции с Object Storage

- Переходим в созданную функцию. Выбираем интерпретатор Python. Вставляем в редактор кода следующую функцию (print нужны только для дальнейшей проверки логов, можно заменить на нормальный logger)
def handler(event, context):
import boto3
import pandas as pd
import os
from io import StringIO
object_id = event['messages'][0]['details']['object_id']
bucket_id = event['messages'][0]['details']['bucket_id']
print('object_id: ', object_id)
print('bucket_id: ', bucket_id)
aws_access_key_id = os.environ['KEY']
aws_secret_access_key = os.environ['SECRET_KEY']
s3 = boto3.client(service_name='s3',
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
endpoint_url='https://storage.yandexcloud.net',
region_name='ru-central1')
response = s3.get_object(Bucket=bucket_id, Key=object_id)
csv_data = response['Body'].read().decode('utf-8')
df = pd.read_csv(StringIO(csv_data))
print('формат датасета:', df.shape)

- Создаем файл requirements.txt и добавляем в него нужные библиотеки для работы функции

- Выставляем остальные настройки функции (таймаут 3 минуты, память поставил на максимум чтобы была возможность обрабатывать объемные датасеты с помощью pandas, логирование можно оставить без изменений, выбрать созданный в пункте 2 сервисный аккаунт, указать в переменных окружения KEY = идентификатор ключа, SECRET_KEY = секретный ключ из пункта 2, рис. 5)

- Сохраняем изменения функции и ждем ее сборки

6. Создаем триггер для отслеживания новых файлов в Object Storage и вызова функции из пункта 5
- Переходим в триггеры внутри сервиса Cloud Functions
- Открываем инструкцию по настройке триггера и выполняем все тоже самое

7. Идем тестировать нашу функцию и работу триггера
- Переходим в Логи функции

- Идем в Яндекс Форму и еще раз загружаем файл


- Если все работает корректно, должны отобразиться логи работы функции (отработала интеграция формы с Object Storage, сработал триггер на появление нового файла, который запустил вашу функцию)
8. Идем в сервис Managed Service for PostgeSQL и создаем кластер базы данных
В нашем решении это будет единственный сервис, который будет «кушать» наш бюджет. Для тестирования и малого количества данных можно выбрать минимальные ресурсы для кластера. Рекомендую отключать кластер, если вы им не пользуетесь в рабочем режиме, чтобы не тратить бюджет впустую.
Создаем кластер по инструкции, настройки можно взять такие же как на рисунке 18. Укажите обязательно пароль для создаваемого пользователя в разделе База данных.




- Создаем кластер и дожидаемся его статуса Alive
- Открываем кластер, переходим в раздел Хосты, листаем вправо до многоточия … и заходим в редактирование хоста, ставим чекпоинт на публичный доступ (инструкция по управлению хостами)

9. Возвращаемся в Cloud Functions и дополняем нашу функцию
- Дописываем функционал загрузки датасета в таблицу базы данных
def handler(event, context):
import telebot
import boto3
import pandas as pd
import os
from io import StringIO
import psycopg2
import sqlalchemy as sa
object_id = event['messages'][0]['details']['object_id']
bucket_id = event['messages'][0]['details']['bucket_id']
aws_access_key_id = os.environ['KEY']
aws_secret_access_key = os.environ['SECRET_KEY']
login = os.environ['LOGIN']
password = os.environ['PASSWORD']
host = os.environ['HOST']
port = os.environ['PORT']
database = os.environ['DATABASE']
s3 = boto3.client(service_name='s3',
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
endpoint_url='https://storage.yandexcloud.net',
region_name='ru-central1')
response = s3.get_object(Bucket=bucket_id, Key=object_id)
csv_data = response['Body'].read().decode('utf-8')
df = pd.read_csv(StringIO(csv_data))
df['meta_timestamp'] = pd.Timestamp.now()
datetime_columns = list(df.select_dtypes(include=['datetime64']).columns)
df[datetime_columns] = df[datetime_columns].astype('string')
connection_string = f"postgresql+psycopg2://{login}:{password}@{host}:{port}/{database}"
engine = sa.create_engine(connection_string)
df.columns = (df.columns.str.lower())
with engine.connect() as connection:
df.to_sql(name='customer_raw', schema='public', con=connection,index=False)
connection.close()
print('загрузка завершена')
- Дописываем нужные библиотеки в requirements.txt

- Добавляем настройки подключения к базе данных в переменные окружения функции: LOGIN, PASSWORD, HOST, PORT, DATABASE
- Сохраняем изменения функции
10. Проводим тестирование как в пункте 7
- запущенная функция должна отработать, а данные должны появиться в базе данных

- проверить данные в базе можно через WebSQL или подключившись к базе данных через DBeaver например (убедитесь, что вы открыли публичное подключение к хосту)

Заключение
Таким образом мы получили поток данных, который работает в облаке на serverless технологиях, где достаточно управлять кодом cloud функции, если нужно расширить или например добавить логику трансформации данных. Для этого кстати можно создать отдельную функцию, которая будет запускаться по расписанию, подключаться к базе данных и например собирать аналитическую витрину данных из сырых данных, чтобы использовать их в аналитике. Либо витрину данных можно реализовать с помощью view (представления) внутри базы данных, которая подключается к BI инструменту Datalens для построения отчета.
Подписывайтесь на мой канал Data Study с полезной информацией по аналитике и инженерии данных, а также про карьеру и будни в IT.