Всем привет! Меня зовут Артур, и я Python разработчик. Хочу поделиться опытом доработки Apache Superset и рассказать, как мы использовали его для визуализации больших объемов данных в рамках сводных таблиц. Если получится выйти из песочницы, обещаю добавлять и другие полезные статьи.
Apache Superset - это мощный инструмент для визуализации данных, который широко используется в Open-Source сообществе. Его главными достоинствами являются:
Простота использования: Superset имеет простой и интуитивно понятный интерфейс, который позволяет пользователям быстро создавать красивые и информативные диаграммы и отчеты.
Масштабируемость: Superset может легко масштабироваться и работать с большими объемами данных.
Поддержка множества источников данных: Superset поддерживает множество источников данных, включая SQL-базы данных, Hadoop, Apache Druid, Elasticsearch, а также многие другие.
Расширяемость: Superset может быть расширен с помощью различных плагинов, что позволяет настраивать его под различные потребности.
Но, несмотря на все эти достоинства, в практическом применении могут возникать проблемы, связанные с необходимостью правильной настройки и оптимизации Superset для конкретных задач. Так, например, некоторым пользователям может не хватать существующих типов визуализации, или они могут быть недостаточно производительными для их объемов данных. Поэтому, зачастую изначального “коробочного” функционала Apache Superset бывает недостаточно, и тогда нужно начинать дорабатывать его под свои потребности, что Superset и позволяет делать.
Задача от заказчика
У нашего заказчика был запрос на реализацию отчета, который должен был работать с большим количеством данных, которые дополнительно требовалось обрабатывать и сводить. Сводные таблицы, которые поставляет Superset в комплекте “из коробки”, на такое не рассчитаны ни по скорости, ни по возможности выводить данные порционно, а не все разом.
Когда мы пробовали выводить данные встроенными возможностями этих таблиц и повышали количество строк до лимита, установленного разработчиками Superset, этого было недостаточно для существующей грануляции данных. Вместо ожидаемых данных за полгода клиент видел лишь данные едва ли за 1,5 месяца.
Попытавшись увеличить это число в существующей версии сводной таблицы, мы столкнулись с проблемами производительности. Во-первых, страница с данными за три месяца могла грузиться 15 минут, с трудом работая в принципе, а при попытке вывода данных хотя бы за полгода браузер и вовсе “падал” в виду нехватки памяти.
Таким образом, существующая сводная таблица никак не могла устроить нашего заказчика, потому что рассчитана она была, по задумке разработчиков, лишь на сведение большого количества данных до малого числа строк, но если получаемое количество строк было больше, чем число, которое они установили как лимит, то таблица переставала работать приемлемо. Нужна была существенная доработка, о которой пойдет речь ниже.
План доработок
Было решено реализовать следующий функционал:
Подготовить данные.
Добавить пагинацию по аналогии с обычной таблицей.
Также вынести на бэкенд сортировку по столбцам, раз мы перенесли туда формирование данных. Только там теперь мы сможем делать корректную сортировку.
Добавить кэширование на результат, чтобы не нужно было пересчитывать данные каждый раз и постоянно обращаться к диску при смене страницы или сортировки.
Реализации доработок
Подготовка
Поскольку мы создаем данную доработку для работы с большим количеством данных, нам также необходимо будет для этих запросов убрать лимит на запрос в БД. Поэтому, если запрос пришел к нашей новой таблице, мы этот лимит вручную уберём с помощью вот такого кода:
if form_data.get("viz_type") == "pivot_table_v3": # Проверяем что запрос пришел на нашу новую таблицу.
for query in self.queries:
query.row_limit = 0 # И для каждого запроса в БД уберем лимит
Дополнительно нам требовалось создать свой постпроцессор. а первое время просто продублируем вторую версию, но с нашим именем и добавим его в список всех постпроцессеров:
post_processors = {
"pivot_table": pivot_table,
"pivot_table_v2": pivot_table_v2,
"pivot_table_v3": pivot_table_v3,
"table": table,
}
Пагинация
Было еще одно требование заказчика - это реализация обратной совместимости с прошлой версией сводной таблицы, поэтому мы сделаем пагинацию опциональной и будем применять её только тогда, когда в запросе передаётся переменная server_pagination в значении true.
Реализовывать пагинацию мы будем после постпроцессинга, где у нас будет сформирован итоговый DataFrame. Изначальный код выглядит следующим образом:
processed_df = post_processor(df, form_data, datasource)
query["colnames"] = list(processed_df.columns)
query["indexnames"] = list(processed_df.index)
query["coltypes"] = extract_dataframe_dtypes(processed_df, datasource)
query["rowcount"] = len(processed_df.index)
И, чтобы добавить пагинацию, мы изменим его на следующий код:
processed_df = post_processor(df, form_data, datasource)
query["colnames"] = list(processed_df.columns)
query["coltypes"] = extract_dataframe_dtypes(processed_df, datasource)
query["rowcount"] = len(processed_df.index)
if form_data.get("server_pagination", False): # Проверяем, включена ли серверная пагинация.
page = max(form_data.get("page", 0), 0) # Достаем номер страницы, при этом проверяя, что число не отрицательное.
page_size = max(form_data.get("pageSize", 100), 1) # Достаем размер страницы, проверяя, что число не меньше 1.
processed_df = processed_df[page * page_size:(page + 1) * page_size] # Отрезаем от DataFrame необходимый отрезок.
query["indexnames"] = list(processed_df.index) # Также мы перенесли создание индекса после пагинации, так как он может отличаться от изначального.
Итого, на данный момент у нас уже есть готовая пагинация, которая будет работать вне зависимости от объемов вашей базы данных.
Кеширование
Однако нам всё еще нужно добавить кэширование на это, так как иначе постоянные запросы к БД и диску на каждую страницу вызвали бы пересчеты всей сводной таблицы.
Здесь мы расширили код, который добавили в шаге с подготовкой, добавив в него попытку достать данные из кэша до запроса в БД. Здесь нам также стоит определиться, что мы будем использовать в качестве ключа для нашего кеша. Мы решили воспользоваться объектом form_data, который мы передаём в каждом запросе к данному эндпоинту, где содержатся все уникальные данные для конкретного запроса, и нам лишь необходимо было убрать оттуда те переменные, которые отвечают за пагинацию и за сортировку, так как именно от них не меняется наш запрос к БД. Сгенерировать сам кэш нам позволит уже существующая в Superset’e функция generate_cache_key, которая принимает в себя любой словарь и возвращает нам уникальный ключ. Создадим для этого отдельную функцию:
if form_data.get("viz_type") == "pivot_table_v3": # Проверяем что запрос пришел на нашу новую таблицу.
for query in self.queries:
query.row_limit = 0 # И для каждого запроса в БД уберем лимит
Теперь добавим попытку получения данных из кеша:
post_processors = {
"pivot_table": pivot_table,
"pivot_table_v2": pivot_table_v2,
"pivot_table_v3": pivot_table_v3,
"table": table,
}
И, конечно же, раз мы пытаемся что-то получить из кэша, было бы неплохо что-то туда пытаться класть, поэтому добавим код для этого в наш созданный постпроцессинг:
def pivot_table_v3(
df: pd.DataFrame,
form_data: Dict[str, Any],
datasource: Optional["BaseDatasource"] = None,
) -> pd.DataFrame:
"""
Pivot table v3.
"""
verbose_map = datasource.data["verbose_map"] if datasource else None
if form_data.get("granularity_sqla") == "all" and DTTM_ALIAS in df:
del df[DTTM_ALIAS]
res_df = pivot_df(
df,
rows=get_column_names(form_data.get("groupbyRows"), verbose_map),
columns=get_column_names(form_data.get("groupbyColumns"), verbose_map),
metrics=get_metric_names(form_data["metrics"], verbose_map),
aggfunc=form_data.get("aggregateFunction", "Sum"),
transpose_pivot=bool(form_data.get("transposePivot")),
combine_metrics=bool(form_data.get("combineMetric")),
show_rows_total=bool(form_data.get("rowTotals")),
show_columns_total=bool(form_data.get("colTotals")),
apply_metrics_on_rows=form_data.get("metricsLayout") == "ROWS",
)
cache_key = generate_cache_for_pivot(form_data) # Создадим ключ с помощью функции которую мы создали ранее.
cache.set(cache_key, res_df.to_dict()) # Положим получившийся DataFrame в кэш.
return res_df
Сортировка
Чтобы отсортировать нашу таблицу, нужно вставить сортировку перед пагинацией:
columns_to_sort = form_data.get("sortByColumns") or [] # Достаем массив по которому необходимо отсортировать данные# Создаем хеш из нашего нового объекта и возвращаем его
if columns_to_sort: # Если массив существует, применяем сортировку
ascSort = form_data.get("sortAsc") or False # Достаем значение определяющие сортировать по возрастанию или убыванию.
df = df.sort_values(by=columns_to_sort, ascending=ascSort) # Сортируем DataFrame встроенной функцией сортировки.
Также нужно изменить функцию генерации ключа для кэша с учетом переменных для сортировки:
def generate_cache_for_pivot(form_data: dict[str, Any]) -> str:
cache_dict = {k: form_data[k] for k in form_data.keys() - {'page', 'page_size', 'server_pagination', 'sortByColumns', 'sortAsc'}} # Убираем из полученного объекта лишние поля и формируем новый объект.
return generate_cache_key(cache_dict) # Создаем хеш из нашего нового объекта и возвращаем его.
Результаты
Итоговый результат: сводная таблица с пагинацией. И работающая сортировка!
Итоги
Для использования нашей сводной таблицы в ваших проектах нужно лишь применить описанные выше изменения к вашему Superset, пересобрать образ Docker и начать пользоваться новой таблицей!
Так как изначально при разработке мы применили изменения только к нашей новой таблице, то ваши существующие отчеты никак не будут затронуты.
Сейчас, например, мы сохранили изменения в виде патча к Apache Superset в нашем git. Патч можно применить к любой вашей версии Superset, независимо от того, вносили ли вы в неё изменения.
Наше решение может стать хорошим дополнением к уже существующим отчетам, оно универсально и легковесно, так что использовать его можно в проектах любой сложности.
Мир за умным кастомным Open Source!