Как-то я написал статью Аналитика рынка профессии в три клика. В ней мы подключались к сайту HH.ru из Power BI, забирали вакансии и строили дашборд для анализа.

Но подключение, описанное в статье, лишает анализ вакансий историчности. В дашборде хранится только один «снимок» данных на момент загрузки в него. Другими словами, при нажатии кнопки Обновить все данные в дашборде перезатрутся. Было бы интересно видеть общую картину во времени? Мне да. Тем более, сделать это не особо сложно, и стоит пару сотен рублей в год. При выборе инструментов был принцип - максимально просто и дешево.

Для того чтобы добавить историчности в дашборд обратимся к облачным технологиям. Нам нужно место, куда мы будем складывать и хранить наши «снимки» данных. Причем сырые ответы сайта в формате JSON. К этому хранилищу мы и будем подключаться из Power BI. Небольшая схема архитектуры на рисунке ниже. Алгоритм следующий, по расписанию запускается CloudFunction на Python, которая делает запрос к API сайта и складывает ответы в Stroge. Далее сырые ответы трансформируются и сохраняются в формате parquet, к которому и будет подключаться Power BI. Причем, при таком способе подключения в личном кабинете Power BI можно запланировать обновление (запланировать обновление можно не со всеми источниками).

Архитектура
Архитектура

Я решил использовать инфраструктуру Яндекс Облака. На мой взгляд, облака западных вендоров, например Microsoft Azure, более зрелые и удобные. Но проблема в том, что при регистрации нужно указать банковскую карту, вероятно это отрежет большую часть читателей.

Далее по шагам.
Заходим в Яндекс Облако, регистрируемся, привязываем банковскую карту и создаем два сервиса:

  • Яндекс функции

  • Хранилище S3, оно же Data Lake, оно же Yandex Object Stroge

Yandex Object Stroge

В Yandex Object Storage нужно создать каталог, далее перейти в Storage и создать bucket. В bucket настройка доступа Публичный,  я не особо заморачивался с доступом. Так же необходимо создать статический ключ доступа к бакету. Действуем по инструкции от Яндекса.

Yandex Cloud Functions

Это самое сложное, но вы держитесь :-). В Cloud Functions нужно создать три функции со средой выполнения на Python. Код функций приложил ниже. Таймаут функций не менее 300 секунд. И триггеры к ним, разделенные по интервалам не менее 30 минут. У меня получилось вот так:

Триггеры функций
Триггеры функций

В редакторе кода для каждой функции нужно создать два файла, код прилагаю. Первый requirements.txt с необходимыми пакетами Python. И второй с кодом Python. В коде меняем значения переменных на ваши, они помечены, найдете.

Код функций
Функция 1
def handler(event, context):
    import requests
    import datetime
    import boto3
    import json
    import pandas as pd
    import csv
    import io
    from io import StringIO

    def create_con():
        os = boto3.client('s3',
            aws_access_key_id = 'ВАШ КЛЮЧ',
            aws_secret_access_key = 'ВАШ СЕКРЕТНЫЙ КЛЮЧ',
            region_name = 'central1',
            endpoint_url = 'https://storage.yandexcloud.net'
            )
        return os
    def get_maxpage(API):
        adress = API
        todos = requests.get(adress)
        stat = todos.status_code
        tbody  = json.loads(todos.text)
        maxpage = tbody["pages"]
        return maxpage

    def getAPI(text, API, page):
        adress = API + '&page=' + page
        todos = requests.get(adress)
        stat = todos.status_code
        todos.encoding = 'utf-8'
        todos.text
        tbody = todos.json()
        t2body = json.dumps(tbody, ensure_ascii=False)
        df = pd.DataFrame(columns=['id', 'text', 'page'])
        for item in tbody["items"]:
            df = df.append({'id': item['id'], 'text': text, 'page': page}, ignore_index=True)
        return {
            'statusCode': 200,
            'body': df,
        }
    
    def get_list_req(os):
        obj = os.get_object(Bucket='ВАШ БАКЕТ',Key='ПУТЬ К ФАЙЛУ requests.csv')
        body = obj['Body'].read()
        df = pd.read_csv(io.BytesIO(body))
        return df

    os = create_con() #Подключение

    list_req = get_list_req(os) #Получаю датафрейм запросов
    df = pd.DataFrame(columns=['id', 'text', 'page'])
    for index, row in list_req.iterrows(): #Цикл по запросам, генерю файлы json
        maxpage = get_maxpage(row['API']) #получаю кол-во страниц
        int(maxpage)
        for i in range(0, maxpage):
            req = getAPI(row['text'], row['API'], str(i))
            df = df.append(req["body"], ignore_index=True)
    csv_buffer = StringIO()
    df.to_csv(csv_buffer)
    os.put_object(Bucket="ВАШ БАКЕТ", Body=csv_buffer.getvalue(), Key="Lakehouse/Config/" + "All_id_for_parse.csv", StorageClass='STANDARD')

Функция 2
def handler(event, context):
    import requests
    import datetime
    import boto3
    import json
    import pandas as pd
    import csv
    import io
    from io import StringIO

    def create_con():
        os = boto3.client('s3',
            aws_access_key_id = 'ВАШ КЛЮЧ',
            aws_secret_access_key = 'ВАШ СЕКРЕТНЫЙ КЛЮЧ',
            region_name = 'central1',
            endpoint_url = 'https://storage.yandexcloud.net'
            )
        return os

    def getAPI(text, id):
        adress = 'https://api.hh.ru/' + 'vacancies/' + str(id)
        todos = requests.get(adress)
        stat = todos.status_code
        todos.encoding = 'utf-8'
        todos.text
        tbody = todos.json()
        t2body = json.dumps(tbody, ensure_ascii=False)
        now = datetime.date.today()
        file_key = 'Lakehouse/Data/raw/' + now.strftime('%Y-%m-%d') + '/' + text + '/' + str(id) + '.json'
        os.put_object(Bucket='ВАШ БАКЕТ', Body=t2body, Key=file_key, StorageClass='STANDARD')
    
    def get_list_req(os):
        obj = os.get_object(Bucket='ВАШ БАКЕТ',Key='Lakehouse/Config/All_id_for_parse.csv')
        body = obj['Body'].read()
        df = pd.read_csv(io.BytesIO(body))
        return df

    os = create_con() #Подключение

    list_req = get_list_req(os) #Получаю датафрейм запросов
    df = pd.DataFrame(columns=['id', 'text', 'page'])
    for index, row in list_req.iterrows(): #Цикл по запросам, генерю файлы json
        req = getAPI(row['text'], row['id'])

Функция 3
def handler(event, context):
    import pandas as pd
    import requests
    from datetime import datetime, date
    import boto3
    import json
    import io
    from io import StringIO
    from io import BytesIO
    import pyarrow, fastparquet
    import urllib3
    urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
    mydate = str(date.today())
    data=[]
    columns=['id', 'name', 'salary', 'experience', 'schedule', 'employment', 'key_skills', 'employerName', 'accept_temporary',\
                                     'publishedAt', 'arhived', 'areaId', 'areaName', 'alternate_url', 'working_days', 'working_time_intervals', 'DateLoad', 'fileKey']
    df = pd.DataFrame(data, columns=columns)
                      
    df3 = df
    df_mart = df
    def create_con():
        os = boto3.client('s3',
            aws_access_key_id = 'ВАШ КЛЮЧ',
            aws_secret_access_key = 'ВАШ СЕКРЕТНЫЙ КЛЮЧ',
            region_name = 'central1',
            endpoint_url = 'https://storage.yandexcloud.net',
            verify=False
            )
        return os
    
    def get_all_s3_keys(os, bucket, prefix):
        """Get a list of all keys in an S3 bucket."""
        df = pd.DataFrame(columns=['file_key'])
        kwargs = {'Bucket': bucket, 'Prefix': prefix}
        while True:
            resp = os.list_objects_v2(**kwargs)
            for obj in resp['Contents']:
                df = pd.concat([pd.DataFrame([[obj['Key']]], columns=df.columns), df], ignore_index=True)
            try:
             kwargs['ContinuationToken'] = resp['NextContinuationToken']
            except KeyError:
                break   
        return df 

    def go_file(os, Key):
        obj = os.get_object(Bucket='ВАШ БАКЕТ',Key=Key) #Получаю файл джон
        todos = json.loads(obj['Body'].read())
        if todos["experience"] is not None:
            experience = todos["experience"]["id"]
        if todos["schedule"] is not None:
            schedule = todos["schedule"]["id"]
        if todos["employment"] is not None:
            employment = todos["employment"]["id"]
        try:
            employerName = todos["employer"]["name"]
        except:
            employerName = None
        try:
            areaId = todos["area"]["id"]
            areaName = todos["area"]["name"]
        except:
            areaId = None
            areaName = None
        row = [todos["id"], todos["name"], todos["salary"], experience, schedule, employment, todos["key_skills"], employerName, todos["accept_temporary"],\
                todos["published_at"],\
               todos["archived"], areaId, areaName, todos["alternate_url"], todos["working_days"], todos["working_time_intervals"], datetime.now(), Key]
        return pd.DataFrame([row], columns=columns)
    
    os = create_con() #Подключение
    df_list_file_key = get_all_s3_keys(os, 'ВАШ БАКЕТ', 'Lakehouse/Data/raw/' + mydate) #Получаю все ключи файлов
    for ind in df_list_file_key.index:
            df_res = go_file(os, df_list_file_key['file_key'][ind])
            df3 = pd.concat([df3, df_res])
    out_buffer = BytesIO()
    try:
        obj_mart = os.get_object(Bucket='ВАШ БАКЕТ',Key='Lakehouse/Data/proccessed/datamart') #Получаю файл parquet
        df_mart =  pd.read_parquet(io.BytesIO(obj_mart['Body'].read()))
    except:
        y = 1
    df_datamart = pd.concat([df_mart,df3]).drop_duplicates(['fileKey'],keep='last').sort_values('fileKey')
    df_datamart.astype(str).to_parquet(out_buffer, index=False)
    os.put_object(Bucket="ВАШ БАКЕТ", Body=out_buffer.getvalue(), Key='Lakehouse/Data/proccessed/datamart', StorageClass='STANDARD')
    return 'OK'

Внутренности файла requirements.txt

boto3
datetime
pandas
fsspec
requests
pyarrow
fastparquet
urllib3

Внутренности файла requests.csv
"number","text","API"
"1","Azure","https://api.hh.ru/vacancies?text=Azure&search_field=name"
"2","data scientist","https://api.hh.ru/vacancies?text=data scientist&search_field=name"
"3","data engineer","https://api.hh.ru/vacancies?text=data engineer&search_field=name"

Файл requests.csv нужен для хранения списка запросов, по которым вам требуется произвести парсинг. Этот файл нужно положить в ВАШ БАКЕТ - Lakehouse - Config - requests.csv.

Функция 1 считывает из бакета файл requests.csv с вакансиями, по которым мы хотим скачать ответы hh.ru, и записывает id вакансий в файл All_id_for_parse.csv. Питонисты, слишком не критикуйте код :). После этого функция 2 запрашивает информацию из hh.ru отдельно по каждому id вакансии. Два шага нужны потому что обращение по id каждой вакансии позволит получить более детальный json, чем при общей поисковой выдаче в функции 1. Функция 3 создает витрину - файл datamart.parquet c извлеченными данными из файлов json. Этот файл и будет нужен для Power BI.

Вместо трех функций можно было бы оставить две, если использовать новый сервис Yandex query. Но на данный момент сервис не умеет читать вложенный json, коими являются наши json.

Если все получилось, и функции отработали, можно запускать Power BI.

Power BI

Для скачивания витрины в Power BI нужно использовать источник Файл - Parquet. В источнике прописать ссылку на файл, как
"http://ВАШБАКЕТ.storage.yandexcloud.net/Lakehouse/Data/proccessed/datamart".
В файле datamart.parquet некоторые поля сохранены как словари и списки в виде текста, поэтому нужно будет применить преобразование в Power BI. В Power qwery делается это в два клика. В принципе все. Можно строить дашборд. Мой дашборд открыт и доступен по ссылке. Он двухстраничный.

Итог

Мы получили целый проект - Data Lake + BI. Его можно использовать в целях быстрого развертывания для демонстраций. Стоит дешево. Разумеется, решение нацелено на небольшие и возможно средние объемы данных. На данный момент цифры такие:

  • За 15 дней пришло 700 MB сырых данных, формат json

  • 700 MB сырых данных = 0,85 МB данных в витрине, формат parquet

  • В год витрина прирастает на 21 МB, то есть загрузка может жить лет 10

  • Каждый день тратится по 84 копейки, в год 295 р.

  • Загрузка полностью автоматизирована; просто открывая отчет, можно просматривать новые данные

Комментарии (11)


  1. HexExort
    00.00.0000 00:00
    +3

    а итоги то можно посмотреть?) какая аналитика рынка профессий то в итоге?


    1. 220-380 Автор
      00.00.0000 00:00

      Если вы про написание самих выводов, то их может быть много, всего не опишешь. Я старался сделать упор на саму архитектуру. А выводы каждый сделает сам, просматривая дашборд (ссылка не него есть в статье), те выводы которые ему нужны.


      1. Trabant_Vishnya
        00.00.0000 00:00

        А что тут кардинально нового в инфраструктуре, что этому целая статья посвящена? Вы бы хоть модель данных показали


        1. 220-380 Автор
          00.00.0000 00:00

          Какая модель данных :-). Только витрина.


  1. ant_sol
    00.00.0000 00:00
    +1

    Другими словами, при нажатии кнопки Обновить все данные в дашборде перезатрутся. 

    Используйте инкрементальное обновление в power bi service на основе потоков данных ????

    Для вашего проекта этого будет достаточно и не нужно dwh городить


    1. 220-380 Автор
      00.00.0000 00:00

      Это было бы хороший вариант, но инкрементальное обновление не работает с источником Power BI как "Интернет", API сайта возвращает json. Или сейчас они уже что-то доработали?
      Тут еще и второй пункт, потоки данных это 240$ в год.


      1. Trabant_Vishnya
        00.00.0000 00:00

        10 per User, Dataflows есть и в Pro версии
        Тут уж как по кол-ву учеток пойдет.
        Если совсем жаться, то можно через Free версию на остальных распространять через publish to web, но страдает security.


        1. 220-380 Автор
          00.00.0000 00:00
          +1

          Из туториала по Power BI
          "Добавочное обновление (только для выпуска Premium). Для потоков данных можно также настроить добавочное обновление. ". Это 240 в год.
          Вы бы сделали сначала, что советуете, чтобы быть уверенным в своих словах.


          1. Trabant_Vishnya
            00.00.0000 00:00

            Incremental refresh можно и не через PBI средства делать.
            С Yandex Data lake не пробовал, конечно, а вот использовать фичи Azure для этого можно.
            Ссылка с Яндекс.Директом, там пробовал и реализовывал сам, имея на руках ту самую Pro.


  1. Yo1
    00.00.0000 00:00

    я не особо заморачивался с доступом

    а каковы шансы настроить с ключами ?


    1. 220-380 Автор
      00.00.0000 00:00

      Думаю, вполне реально. Функции Python работают с ключами. Место без ключа - это подключение Power BI к облаку: подключение к одному файлу parquet. Если настроите, то дайте знать.