Как-то я написал статью Аналитика рынка профессии в три клика. В ней мы подключались к сайту HH.ru из Power BI, забирали вакансии и строили дашборд для анализа.
Но подключение, описанное в статье, лишает анализ вакансий историчности. В дашборде хранится только один «снимок» данных на момент загрузки в него. Другими словами, при нажатии кнопки Обновить все данные в дашборде перезатрутся. Было бы интересно видеть общую картину во времени? Мне да. Тем более, сделать это не особо сложно, и стоит пару сотен рублей в год. При выборе инструментов был принцип - максимально просто и дешево.
Для того чтобы добавить историчности в дашборд обратимся к облачным технологиям. Нам нужно место, куда мы будем складывать и хранить наши «снимки» данных. Причем сырые ответы сайта в формате JSON. К этому хранилищу мы и будем подключаться из Power BI. Небольшая схема архитектуры на рисунке ниже. Алгоритм следующий, по расписанию запускается CloudFunction на Python, которая делает запрос к API сайта и складывает ответы в Stroge. Далее сырые ответы трансформируются и сохраняются в формате parquet, к которому и будет подключаться Power BI. Причем, при таком способе подключения в личном кабинете Power BI можно запланировать обновление (запланировать обновление можно не со всеми источниками).
Я решил использовать инфраструктуру Яндекс Облака. На мой взгляд, облака западных вендоров, например Microsoft Azure, более зрелые и удобные. Но проблема в том, что при регистрации нужно указать банковскую карту, вероятно это отрежет большую часть читателей.
Далее по шагам.
Заходим в Яндекс Облако, регистрируемся, привязываем банковскую карту и создаем два сервиса:
Яндекс функции
Хранилище S3, оно же Data Lake, оно же Yandex Object Stroge
Yandex Object Stroge
В Yandex Object Storage нужно создать каталог, далее перейти в Storage и создать bucket. В bucket настройка доступа Публичный, я не особо заморачивался с доступом. Так же необходимо создать статический ключ доступа к бакету. Действуем по инструкции от Яндекса.
Yandex Cloud Functions
Это самое сложное, но вы держитесь :-). В Cloud Functions нужно создать три функции со средой выполнения на Python. Код функций приложил ниже. Таймаут функций не менее 300 секунд. И триггеры к ним, разделенные по интервалам не менее 30 минут. У меня получилось вот так:
В редакторе кода для каждой функции нужно создать два файла, код прилагаю. Первый requirements.txt с необходимыми пакетами Python. И второй с кодом Python. В коде меняем значения переменных на ваши, они помечены, найдете.
Код функций
Функция 1
def handler(event, context):
import requests
import datetime
import boto3
import json
import pandas as pd
import csv
import io
from io import StringIO
def create_con():
os = boto3.client('s3',
aws_access_key_id = 'ВАШ КЛЮЧ',
aws_secret_access_key = 'ВАШ СЕКРЕТНЫЙ КЛЮЧ',
region_name = 'central1',
endpoint_url = 'https://storage.yandexcloud.net'
)
return os
def get_maxpage(API):
adress = API
todos = requests.get(adress)
stat = todos.status_code
tbody = json.loads(todos.text)
maxpage = tbody["pages"]
return maxpage
def getAPI(text, API, page):
adress = API + '&page=' + page
todos = requests.get(adress)
stat = todos.status_code
todos.encoding = 'utf-8'
todos.text
tbody = todos.json()
t2body = json.dumps(tbody, ensure_ascii=False)
df = pd.DataFrame(columns=['id', 'text', 'page'])
for item in tbody["items"]:
df = df.append({'id': item['id'], 'text': text, 'page': page}, ignore_index=True)
return {
'statusCode': 200,
'body': df,
}
def get_list_req(os):
obj = os.get_object(Bucket='ВАШ БАКЕТ',Key='ПУТЬ К ФАЙЛУ requests.csv')
body = obj['Body'].read()
df = pd.read_csv(io.BytesIO(body))
return df
os = create_con() #Подключение
list_req = get_list_req(os) #Получаю датафрейм запросов
df = pd.DataFrame(columns=['id', 'text', 'page'])
for index, row in list_req.iterrows(): #Цикл по запросам, генерю файлы json
maxpage = get_maxpage(row['API']) #получаю кол-во страниц
int(maxpage)
for i in range(0, maxpage):
req = getAPI(row['text'], row['API'], str(i))
df = df.append(req["body"], ignore_index=True)
csv_buffer = StringIO()
df.to_csv(csv_buffer)
os.put_object(Bucket="ВАШ БАКЕТ", Body=csv_buffer.getvalue(), Key="Lakehouse/Config/" + "All_id_for_parse.csv", StorageClass='STANDARD')
Функция 2
def handler(event, context):
import requests
import datetime
import boto3
import json
import pandas as pd
import csv
import io
from io import StringIO
def create_con():
os = boto3.client('s3',
aws_access_key_id = 'ВАШ КЛЮЧ',
aws_secret_access_key = 'ВАШ СЕКРЕТНЫЙ КЛЮЧ',
region_name = 'central1',
endpoint_url = 'https://storage.yandexcloud.net'
)
return os
def getAPI(text, id):
adress = 'https://api.hh.ru/' + 'vacancies/' + str(id)
todos = requests.get(adress)
stat = todos.status_code
todos.encoding = 'utf-8'
todos.text
tbody = todos.json()
t2body = json.dumps(tbody, ensure_ascii=False)
now = datetime.date.today()
file_key = 'Lakehouse/Data/raw/' + now.strftime('%Y-%m-%d') + '/' + text + '/' + str(id) + '.json'
os.put_object(Bucket='ВАШ БАКЕТ', Body=t2body, Key=file_key, StorageClass='STANDARD')
def get_list_req(os):
obj = os.get_object(Bucket='ВАШ БАКЕТ',Key='Lakehouse/Config/All_id_for_parse.csv')
body = obj['Body'].read()
df = pd.read_csv(io.BytesIO(body))
return df
os = create_con() #Подключение
list_req = get_list_req(os) #Получаю датафрейм запросов
df = pd.DataFrame(columns=['id', 'text', 'page'])
for index, row in list_req.iterrows(): #Цикл по запросам, генерю файлы json
req = getAPI(row['text'], row['id'])
Функция 3
def handler(event, context):
import pandas as pd
import requests
from datetime import datetime, date
import boto3
import json
import io
from io import StringIO
from io import BytesIO
import pyarrow, fastparquet
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
mydate = str(date.today())
data=[]
columns=['id', 'name', 'salary', 'experience', 'schedule', 'employment', 'key_skills', 'employerName', 'accept_temporary',\
'publishedAt', 'arhived', 'areaId', 'areaName', 'alternate_url', 'working_days', 'working_time_intervals', 'DateLoad', 'fileKey']
df = pd.DataFrame(data, columns=columns)
df3 = df
df_mart = df
def create_con():
os = boto3.client('s3',
aws_access_key_id = 'ВАШ КЛЮЧ',
aws_secret_access_key = 'ВАШ СЕКРЕТНЫЙ КЛЮЧ',
region_name = 'central1',
endpoint_url = 'https://storage.yandexcloud.net',
verify=False
)
return os
def get_all_s3_keys(os, bucket, prefix):
"""Get a list of all keys in an S3 bucket."""
df = pd.DataFrame(columns=['file_key'])
kwargs = {'Bucket': bucket, 'Prefix': prefix}
while True:
resp = os.list_objects_v2(**kwargs)
for obj in resp['Contents']:
df = pd.concat([pd.DataFrame([[obj['Key']]], columns=df.columns), df], ignore_index=True)
try:
kwargs['ContinuationToken'] = resp['NextContinuationToken']
except KeyError:
break
return df
def go_file(os, Key):
obj = os.get_object(Bucket='ВАШ БАКЕТ',Key=Key) #Получаю файл джон
todos = json.loads(obj['Body'].read())
if todos["experience"] is not None:
experience = todos["experience"]["id"]
if todos["schedule"] is not None:
schedule = todos["schedule"]["id"]
if todos["employment"] is not None:
employment = todos["employment"]["id"]
try:
employerName = todos["employer"]["name"]
except:
employerName = None
try:
areaId = todos["area"]["id"]
areaName = todos["area"]["name"]
except:
areaId = None
areaName = None
row = [todos["id"], todos["name"], todos["salary"], experience, schedule, employment, todos["key_skills"], employerName, todos["accept_temporary"],\
todos["published_at"],\
todos["archived"], areaId, areaName, todos["alternate_url"], todos["working_days"], todos["working_time_intervals"], datetime.now(), Key]
return pd.DataFrame([row], columns=columns)
os = create_con() #Подключение
df_list_file_key = get_all_s3_keys(os, 'ВАШ БАКЕТ', 'Lakehouse/Data/raw/' + mydate) #Получаю все ключи файлов
for ind in df_list_file_key.index:
df_res = go_file(os, df_list_file_key['file_key'][ind])
df3 = pd.concat([df3, df_res])
out_buffer = BytesIO()
try:
obj_mart = os.get_object(Bucket='ВАШ БАКЕТ',Key='Lakehouse/Data/proccessed/datamart') #Получаю файл parquet
df_mart = pd.read_parquet(io.BytesIO(obj_mart['Body'].read()))
except:
y = 1
df_datamart = pd.concat([df_mart,df3]).drop_duplicates(['fileKey'],keep='last').sort_values('fileKey')
df_datamart.astype(str).to_parquet(out_buffer, index=False)
os.put_object(Bucket="ВАШ БАКЕТ", Body=out_buffer.getvalue(), Key='Lakehouse/Data/proccessed/datamart', StorageClass='STANDARD')
return 'OK'
Внутренности файла requirements.txt
boto3
datetime
pandas
fsspec
requests
pyarrow
fastparquet
urllib3
Внутренности файла requests.csv
"number","text","API"
"1","Azure","https://api.hh.ru/vacancies?text=Azure&search_field=name"
"2","data scientist","https://api.hh.ru/vacancies?text=data scientist&search_field=name"
"3","data engineer","https://api.hh.ru/vacancies?text=data engineer&search_field=name"
Файл requests.csv нужен для хранения списка запросов, по которым вам требуется произвести парсинг. Этот файл нужно положить в ВАШ БАКЕТ - Lakehouse - Config - requests.csv.
Функция 1 считывает из бакета файл requests.csv с вакансиями, по которым мы хотим скачать ответы hh.ru, и записывает id вакансий в файл All_id_for_parse.csv. Питонисты, слишком не критикуйте код :). После этого функция 2 запрашивает информацию из hh.ru отдельно по каждому id вакансии. Два шага нужны потому что обращение по id каждой вакансии позволит получить более детальный json, чем при общей поисковой выдаче в функции 1. Функция 3 создает витрину - файл datamart.parquet c извлеченными данными из файлов json. Этот файл и будет нужен для Power BI.
Вместо трех функций можно было бы оставить две, если использовать новый сервис Yandex query. Но на данный момент сервис не умеет читать вложенный json, коими являются наши json.
Если все получилось, и функции отработали, можно запускать Power BI.
Power BI
Для скачивания витрины в Power BI нужно использовать источник Файл - Parquet. В источнике прописать ссылку на файл, как
"http://ВАШБАКЕТ.storage.yandexcloud.net/Lakehouse/Data/proccessed/datamart".
В файле datamart.parquet некоторые поля сохранены как словари и списки в виде текста, поэтому нужно будет применить преобразование в Power BI. В Power qwery делается это в два клика. В принципе все. Можно строить дашборд. Мой дашборд открыт и доступен по ссылке. Он двухстраничный.
Итог
Мы получили целый проект - Data Lake + BI. Его можно использовать в целях быстрого развертывания для демонстраций. Стоит дешево. Разумеется, решение нацелено на небольшие и возможно средние объемы данных. На данный момент цифры такие:
За 15 дней пришло 700 MB сырых данных, формат json
700 MB сырых данных = 0,85 МB данных в витрине, формат parquet
В год витрина прирастает на 21 МB, то есть загрузка может жить лет 10
Каждый день тратится по 84 копейки, в год 295 р.
Загрузка полностью автоматизирована; просто открывая отчет, можно просматривать новые данные
Комментарии (11)
ant_sol
00.00.0000 00:00+1Другими словами, при нажатии кнопки Обновить все данные в дашборде перезатрутся.
Используйте инкрементальное обновление в power bi service на основе потоков данных ????
Для вашего проекта этого будет достаточно и не нужно dwh городить
220-380 Автор
00.00.0000 00:00Это было бы хороший вариант, но инкрементальное обновление не работает с источником Power BI как "Интернет", API сайта возвращает json. Или сейчас они уже что-то доработали?
Тут еще и второй пункт, потоки данных это 240$ в год.Trabant_Vishnya
00.00.0000 00:0010 per User, Dataflows есть и в Pro версии
Тут уж как по кол-ву учеток пойдет.
Если совсем жаться, то можно через Free версию на остальных распространять через publish to web, но страдает security.220-380 Автор
00.00.0000 00:00+1Из туториала по Power BI
"Добавочное обновление (только для выпуска Premium). Для потоков данных можно также настроить добавочное обновление. ". Это 240 в год.
Вы бы сделали сначала, что советуете, чтобы быть уверенным в своих словах.Trabant_Vishnya
00.00.0000 00:00Incremental refresh можно и не через PBI средства делать.
С Yandex Data lake не пробовал, конечно, а вот использовать фичи Azure для этого можно.
Ссылка с Яндекс.Директом, там пробовал и реализовывал сам, имея на руках ту самую Pro.
Yo1
00.00.0000 00:00я не особо заморачивался с доступом
а каковы шансы настроить с ключами ?
220-380 Автор
00.00.0000 00:00Думаю, вполне реально. Функции Python работают с ключами. Место без ключа - это подключение Power BI к облаку: подключение к одному файлу parquet. Если настроите, то дайте знать.
HexExort
а итоги то можно посмотреть?) какая аналитика рынка профессий то в итоге?
220-380 Автор
Если вы про написание самих выводов, то их может быть много, всего не опишешь. Я старался сделать упор на саму архитектуру. А выводы каждый сделает сам, просматривая дашборд (ссылка не него есть в статье), те выводы которые ему нужны.
Trabant_Vishnya
А что тут кардинально нового в инфраструктуре, что этому целая статья посвящена? Вы бы хоть модель данных показали
220-380 Автор
Какая модель данных :-). Только витрина.