Привет! Меня зовут Григорий Митраков, я BI-аналитик в рекламном агентстве.
В статье расскажу вам как создать приложение в Power BI по пожеланиям заказчиков.

Предыстория

После того, как я устроился работать в агентство ко мне обратилась руководитель HR отдела для разработки небольшого автоматизированного отчета в BI. До этого основные показатели (текучесть, вовлеченность и др.) рассчитывались в табличках Excel полу ручным способом.

Так выглядели отчеты в Excel файлах:

Данные с опросников - % участия
Данные с опросников - % участия
Данные с опросников - соцдем
Данные с опросников - соцдем

Причину автоматизации мне объяснили тем, что около 4-х часов в неделю сотрудник HR отдела тратил на расчет показателей в таблицах Excel. (Что хорошо – видно, что какими-то графиками и диаграммами уже пользовались).

Сбор требований к отчету

После первой встречи с HR отделом в лице руководителя, специалиста, занимавшегося сбором данных и оформлением отчетов в таблицах Excel, выяснилось, что источники с необходимыми данными находятся в регулярных выгрузках из 1С Предприятие, Google Sheets, Битрикс24, PostgreSQL, iSpring. По визуальному оформлению как таковых «жестких» требований не было, была лишь просьба, учесть наглядность представляемых данных и «желательно как в таблицах Excel».

Работа над дашбордом

Задача показалась мне интересной ввиду достаточно большого количества разнообразных источников данных и свободы в выборе визуализации данных.

Для прозрачности работы, контроля текущего статуса над задачей и внесение своевременных правок в оформление и расчет метрик решили собираться раз в неделю по четвергам (этот день назвали «День HR»).

Для всех источников данных решил использовать БД PostgreSQL (была развернута в компании). Для разработки ETL-процессов — Apache Airflow.

Ниже представлена схема ETL процесса:

ETL
ETL

Данные раз в сутки (по расписанию) загружаю:

  • с помощью API REST с опросников (google sheets),

  • с помощью API REST с платформы для обучения сотрудников iSpring,

  • с Битрикс24 веб-скрейпингом (у Битрикс24 есть за дополнительную плату расширение BI-Аналитика, но в компании бесплатная версия, поэтому получение данных организовано таким образом),

  • с сетевых папок, в которых хранятся регулярные выгрузки в формате .xlsx из 1С.  

Пример DAG (таблица с курсами iSpring):

import json 
import time 
import datetime 
import requests 
import pandas as pd 
from airflow.decorators import dag, task
from airflow.models import Variable 
from sqlalchemy import create_engine 
from airflow.providers.telegram.hooks.telegram import TelegramHook 


# Параметры для аутентификации и входа

user = 'grigoriy'
host = '101.100.9.43'
db = 'internal_data'
pwd = Variable.get('planning_datas_password') 

postgresql_url = f'postgresql+psycopg2://{user}:{pwd}@{host}/{db}' 
engine = create_engine(postgresql_url) 

# запрос к БД для получения id различных курсов

query_db = '''SELECT t.module_id, t.content_item_id, t.course_id 
            FROM ispring_course_modules_table AS t 
            GROUP BY t.module_id, t.content_item_id, t.course_id''' 

URL_TOKEN = 'https://api-learn.ispringlearn.ru/api/v3/token' 
URL_CONTENT = 'https://api-learn.ispringlearn.ru/courses/modules' 

HEADER = { 
        'Content-Type': 'application/x-www-form-urlencoded', 
        'Accept': 'application/json'
        } 

HEADERS_R = { 
            'X-Target-Locale': 'en-US', 
            'Accept': 'application/json'
            } 

CLIENT_SECRET = Variable.get('ispring_api_client_secret') 

DATA_URLENCODE = { 
                'client_id': 'xxbbaaxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx', 
                'client_secret': CLIENT_SECRET, 
                'grant_type': 'client_credentials'
                } 

default_args = {'owner': 'xxxxxxx' } 

TELEGRAM_TOKEN = Variable.get('tg_bot_token') 
CHAT_ID = '0000000000' 

# Необходимые функции: 

def error_to_telegram(token: str, chat_id: str, e, name_dag: str): 
    '''
    Функция отправляет сообщения об ошибках в чат телеграмм 
    :param: token: "xxx:xxx" API токен для telegram telegram_token
    :param: chat_id куда отправлять сообщение
    :param: e - ошибка 
    :param: name_dag - название DAG
    ''' 
    telegram_conn_id = 'telegram_default' 
    
    telegram_hook = TelegramHook(telegram_conn_id, 
                                token, 
                                chat_id) 
    
    t = datetime.datetime.now() 
    
    date_and_time = str(t.date()) + ' ' + str(t.time()) 
    
    message = f'? В *DAG  {name_dag}* произошла ошибка {e}' 
    message_text = f'{message}. Время ошибки: {date_and_time}' 
    
    telegram_hook.send_message({
                                'text': message_text, 
                                'disable_notification': True 
                                }) 



@dag(default_args=default_args,
     schedule_interval='30 12 * * *',
     start_date=datetime.datetime(2023, 1, 1),
     catchup=False 
     ) 
def get_data_course_modules_from_ispring(): 
    @task
    def get_id_courses_db(query: str, engine) -> pd.DataFrame: 
        """
        Функция получает список различных id курсов из БД
        :param query: запрос к БД
        :return: список
        """
        dis_id_db = pd.read_sql(query, engine) 
        
        return dis_id_db 
    
    @task
    def get_token_ispring(url_token: str, 
                        header: dict, 
                        data_urlcode: dict, 
                        telegram_token: str, 
                        id_chat: str) -> str: 
        '''
        Функция получает токен 
        :param: url_token - url 
        :param: header - headers 
        :param: data_urlcode - params 
        :return: token
        ''' 
        res = requests.post(url=url_token, 
                            headers=header, 
                            data=data_urlcode) 
        
        if res.status_code == 200: 
            
            # Результаты получения токена: 

            res_token = json.loads(res.text) 
            
            # Токен для доступа: 

            authorization_token = res_token['access_token'] 
            
            return authorization_token 
        
        else: 
            error_to_telegram(token=telegram_token, 
                              chat_id=id_chat, 
                              e=str(res.status_code) + ' ' + str(res.text), 
                              name_dag='get_data_course_modules_from_ispring') 

    @task
    def get_data_courses(url_content: str, 
                        header_r: dict, 
                        autho_token: str, 
                        id_courses_t: pd.DataFrame, 
                        telegram_token: str, 
                        id_chat: str) -> pd.DataFrame: 

        # Присвоим токен к заголовку: 

        header_r['Authorization'] = autho_token 
        
        result = requests.get(url=url_content, headers=header_r) 
        
        if result.status_code == 200: 
            
            # Таблица со списком курсов: 

            course_content = pd.DataFrame(json.loads(result.text)["modules"]) 
            
            # Переименуем столбцы: 

            course_content = course_content.rename(columns={'moduleId': 'module_id', 
                                                          'contentItemId': 'content_item_id', 
                                                          'courseId': 'course_id', 
                                                          'authorId': 'author_id', 
                                                          'addedDate': 'added_date', 
                                                          'viewUrl': 'view_url'}) 
            
            # Приведем в необходимый тип данные: 

            course_content['added_date'] = pd.to_datetime(course_content['added_date'], utc=True).dt.tz_localize(None).astype('datetime64[ns]') 
            course_content = course_content.astype({'module_id': str, 
                                                  'content_item_id': str, 
                                                  'course_id': str, 
                                                  'title': str, 
                                                  'description': str, 
                                                  'author_id': str, 
                                                  'added_date': 'datetime64[ns]', 
                                                  'view_url': str}) 
            
            # Выберем необходимые столбцы: 

            cols = ['module_id', 'content_item_id', 'course_id', 'title', 'description', 
                   'author_id', 'added_date', 'view_url'] 

            course_content = course_content[cols]  
        
            return course_content[~((course_content['module_id'].isin(id_courses_t['module_id'])) 
                                  & (course_content['content_item_id'].isin(id_courses_t['content_item_id'])) 
                                  & (course_content['course_id'].isin(id_courses_t['course_id'])))].reset_index(drop=True) 
        
        else: 
            error_to_telegram(token=telegram_token, 
                              chat_id=id_chat, 
                              e=str(result.status_code) + ' ' + str(result.text), 
                              name_dag='get_data_course_modules_from_ispring') 
    
    @task
    def append_data_to_db(data: pd.DataFrame, table: str, engine):
        """
        Функция добавляет данные из датафрейма в БД
        :param data: датафрейм
        :param table: имя таблицы
        :return: None
        """
        for i in range(len(data) // 10000 + 1):
            data.iloc[i * 10000: (i + 1) * 10000].to_sql(con=engine,
                                                         name=table,
                                                         if_exists='append',
                                                         index=False)
            time.sleep(3) 


    # Загрузка таблицы со списком курсов в БД
    
    dis_id_db = get_id_courses_db(query=query_db, engine=engine) 
    
    token_ispring = get_token_ispring(url_token=URL_TOKEN, 
                                     header=HEADER, 
                                     data_urlcode=DATA_URLENCODE, 
                                     telegram_token=TELEGRAM_TOKEN, 
                                     id_chat=CHAT_ID) 
    
    df = get_data_courses(url_content=URL_CONTENT, 
                        header_r=HEADERS_R, 
                        autho_token=token_ispring, 
                        id_courses_t=dis_id_db, 
                        telegram_token=TELEGRAM_TOKEN, 
                        id_chat=CHAT_ID) 
    
    append_data_to_db(data=df, table='ispring_course_modules_table', engine=engine) 

get_data_course_modules_from_ispring = get_data_course_modules_from_ispring()

С каждого Python модуля получаю алерты в Телеграм-чат, в случае, если загрузка прошла неудачно.

Пример такого алерта:

Чат с ошибками
Чат с ошибками

Из БД PostgreSQL получаю данные в дашборды, некоторые таблички получаю с вьюшек, вьюшки с материализованных представлений. Схема в БД:

Получение таблиц в дашборд
Получение таблиц в дашборд

Финальная обработка данных осуществляется в Power Query.

Разработку начали с важных метрик, двигаясь постепенно к второстепенным по мере выполнения отчетов.

Первый дашборд представлял метрику «Текучесть в компании» (ниже представлены первые 2 страницы):

Кадровые изменения
Кадровые изменения
Стаж работы сотрудников
Стаж работы сотрудников

Модель данных получилась такая:

Схема данных
Схема данных

Второй дашборд представлял метрику «Вовлеченность в компании» (ниже представлены первые 2 страницы):

Вовлеченность
Вовлеченность
Вовлеченность подробно
Вовлеченность подробно

Модель данных получилась такая:

Схема данных
Схема данных

Следующий дашборд по оценке «Внутреннего NPS» - считается по формуле: (кол-во положительных оценок – кол-во отрицательных оценок) / общее кол-во оценок. Этот дашборд показывает количество и оценку внутренних задач в компании, выполняемых отделами или сотрудниками и оцениваемых постановщиками задач.

Ниже представлены первые две страницы:

Внутренний NPS
Внутренний NPS
Оценки по дням
Оценки по дням

Модель данных:

Схема данных
Схема данных

Следующий дашборд по метрике «Изменения грейдов сотрудников».

Ниже представлены первые две страницы:

Изменения в грейдах
Изменения в грейдах
Изменения в грейдах подробно
Изменения в грейдах подробно

Модель данных:

Схема данных
Схема данных

Следующий дашборд по метрике «Адаптация сотрудников в компании».

Ниже представлены первые две страницы дашборда:

Адаптация
Адаптация
Адаптация подробно
Адаптация подробно

Модель данных:

Схема данных
Схема данных

Следующий дашборд показывает результаты прохождения сотрудниками компании курсов на iSpring:

Результаты прохождения курсов
Результаты прохождения курсов

Модель данных:

Схема данных
Схема данных

Итоговый дашборд включает объединение предыдущих дашбордов в один: «Карта здоровья».

Получился такой:

Данные в дашборд загружаются через Центр данных Одного озера подключением к семантическим моделям предыдущих дашбордов. Показатели на «Карте здоровья» подсвечиваются цветами, в зависимости от значений.

Общая схема источников данных:

Представление происхождения
Представление происхождения

Заключение

Все эти дашборды объединил в приложение, которым удобно пользоваться, так как основные показатели выведены в одном окне. При необходимости легко можно просмотреть более детально по каждому показателю.

Комментарии (2)


  1. vtal007
    06.07.2024 13:31
    +1

    Ох ничего себе объём работы и зоопарк систем хранения данных (как начинающий DA и бывший HR смотрю)

    Но есть один вопросик, если уж аирфлоу, то почему Power Query, а не там же в БД (посгрес) или пандас


  1. mBlaze
    06.07.2024 13:31

    Разработка ETL очень хорошо, работа с данными очень хорошо, дизайн отчётов, цветовая палитра - нужно поработать, пока вырви глаз, после просмотра уже устанешь.

    Что касательно названия статьи, сквозная аналитика у вас чем представлена? Сквозная аналитика это отслеживание, исследование, анализ например заказа от создания до отгрузки со всеми рядом вытекающими данными. В вашем случае сквозной аналитикой бы являлся анализ, исследование сотрудников от начала, то есть проведения собеседования и привлечения до увольнения или его текущего состояния. Анализ почему сотрудник уволился или почему плохо учился или почему текучка большая, то есть сквозная аналитика выявляет причины. В вашем случае это представление данных, верхнеуровневое состояние компании, по которому можно понять примерно, что в компании много пенсионеров или сотрудникам не интересно, они не вовлечены. Так же, что много открытых вакансий и мало закрытых.

    Скрипт в закладки, ибо эйрфлоу захватывает мир)))

    Очень близкое к вашей истории, разрабатывал несколько лет назад для одного очень очень "неизвестного" банка из Татарстана, но мне повезло, у меня была постановка с дизайном)

    И ещё момент, например верхнеуровневые метрики, представленные прямоугольниками, это хорошо, но использовать в них лишь процент - не интересно, интереснее мульти КПИ, где так же отражены обсолютные значения из которых этот процент получился. Отклонение туда же можно с переключателем на MTD QTD YTD. Принять решение руководителю это конечно вряд ли поможет,но вот заинтересованность в просмотре повысит и будут потом картинку из вашего отчёта в презы вставлять;)