Привет, Хабр! Меня зовут Андрей Скиба и я Python-разработчик в ML-команде Учи.ру. Разным командам в нашей компании важно получать доступ к ключевым метрикам пользователей (количество сессий, DAU и другим) в режиме реального времени. Поэтому мы создали свою собственную систему real-time аналитики — быструю, простую и с удобным для нас функционалом. Сегодня я расскажу, как она устроена. 

Причины отказа от старого решения

Некоторое время назад мы использовали довольно простой аналитический кластер, в основе которого была обычная база Postgres. В него писались события, там же они собирались в отчеты, на основе которых строились графики на фронтенде. На платформе было сравнительно мало событий, соответственно, в базе было мало строк. Этого хватало до тех пор, пока Учи.ру не стала полноценным highload-сервисом — сейчас на платформе занимаются почти 20 млн активных пользователей, которые совершают несколько миллионов действий в сутки.

Ресурсов на поддержку и доработку старого функционала у нас не было. Также с ростом сервиса большая часть аналитики переехала в более тяжелый кластер, где уже использовались ClickHouse, S3, Airflow, Tableau и ряд других технологий. Это решение имеет очень широкий функционал и прекрасно подходит для построения больших ежедневных, ежемесячных и ежеквартальных отчетов для бизнеса, но оно работает слишком медленно для real-time мониторинга. К тому же лицензии Tableau платные и имеют ограничения на использование.

Помимо смены аналитического стека, произошли изменения и в архитектуре платформы. События теперь не собираются в базе, а попадают в Kafka-кластер. Так у нас появилась задача собрать простую и легковесную систему, где можно было бы оперативно смотреть основные метрики компании, при этом сами метрики должны были быть основаны на событиях из Kafka напрямую.

Также от бизнеса к нам пришли дополнительные требования: 

  • мы должны уметь фильтровать данные по классу, сегменту или некоторым другим параметрам в разрезе пользователей; 

  • на главной странице должны быть часовые и дневные графики, а также простой и накопительный итоги.

Стек

Бэкенд у нас традиционно написан на Django. Отдельно хотел бы отметить расширение Django Extensions, в котором, среди прочего, есть возможность запускать отдельные скрипты внутри django-проекта. В Учи.ру микросервисная архитектура, и любой сервис или воркер по умолчанию запускается в отдельном контейнере. Воркеры, необходимые для работы сервиса, мы запускаем как раз через Django Extensions. Об устройстве самих воркеров напишу чуть ниже. 

С Kafka мы работаем через библиотеку AIOKafka, также используем fastavro для десериализации бинарных сообщений

Для фронтенда используем библиотеку Svelte, графики строим в Highcharts JS. Для долгого хранения агрегатов используется Postgres, а в Redis у нас хранятся те агрегаты, которые формируются в текущий момент. О том, что такое агрегаты, расскажу далее.

Общая схема работы сервиса

Из Kafka к нам приходят события. Специальный воркер (назовем его просто KafkaWorker) читает их, обогащает и записывает в Redis в виде агрегатов. Также есть отдельный воркер для сохранения данных из Redis в базу — DB Saver. И уже API читает агрегаты из Redis и базы, отдает их на фронтенд. Концептуальная схема работы выглядит так:

Аналитика сессий

Дальнейший рассказ буду иллюстрировать примерами сессий. В Учи.ру сессия — это событие, в рамках которого происходит решение студентом определенного задания. Когда пользователь сделал упражнение, в Kafka приходит информация о том, что завершилась сессия. Среди ее параметров, например, есть: ID студента, ID карточки, затраченное время, число правильных ответов и некоторые системные параметры.

{
'student_id': 43248949,
'subject_id': 46, 
'card_id': 443813, 
'created_at': '2023-02-22 15:05:43.755000', 
'right': 1,
'total': 1,
'spent': 5, 
'part_id': 0, 
'offset': 17489515
}

При получении такого события мы должны его обогатить, то есть, получить некоторый набор других атрибутов, которых здесь явно нет, но которые нужны для фильтрации данных. Например, это может быть сегмент, класс или ID предмета. В данном примере поле subject_id у нас уже есть в исходном сообщении (но так бывает не всегда), а вот класс и сегмент нам нужно каким-то образом восстанавливать из других источников или баз данных.

Особенности работы KafkaWorker

KafkaWorker собирает данные из Kafka и записывает их в Redis, но у него есть несколько особенностей. Например, одна метрика может содержаться в разных топиках с разными схемами, поскольку у нас множество продуктов, и в них тоже есть свои циклы решения заданий пользователями. Вот две разные схемы: 

{
	"name": "card_completions",
	"namespace": "uchiru",
	"doc": "Card completion event",
	"fields": [
		{
			"name": "id",
			"type": "long"
		},
		{
			"name": "student_id",
			"type": "long"
		},
		{
      "name": "last_activity",
      "type": [
        {
          "type": "long",
          "logicalType": "timestamp-millis"
        },
        "null"
      ]
		},
		...
	]
}
{
	"name": "card_completions",
	"namespace": "modern_subjects",
	"doc": "Card completions for the Modern Subjects",
	"fields": [
		{
			"name": "student_id",
			"type": "string"
		},
		{
			"name": "subject_id",
			"type": "int"
		},
		{
      "name": "finished_at",
      "type": [
        {
          "type": "long",
          "logicalType": "timestamp-millis"
        },
        "null"
      ]
		},

		...
	]
}	

Первая схема — это сессии в общем core-продукте, карточных курсах по школьным предметам. Вторая — это сессии в дополнительных продуктах, новых интерактивных курсах. Можно увидеть, что в первой схеме каждая сессия имеет свой ID, и мы можем, к примеру, довольно легко дедуплицировать эти события, чтобы метрики не задваивались. В новых предметах ID нет, и нам приходится убирать дубликаты по сумме всех полей. 

Также у нас есть такой параметр, как последняя активность: она показывает время завершения сессии.Из приведенных выше примеров видно, что в двух разных схемах эти параметры называются по-разному: в одной схеме — last activity, в другой — finished at. Приведение к общему виду делается по словарю, в котором происходит сопоставление различных названий общему приведенному списку слов. И если мы хотим увидеть график всех сессий, то нам нужно их все привести к одному виду и просуммировать. Ниже приведена часть словаря сопоставления имен разных топиков.

topics = {
	'uchiru.card_comletions': {
		'session_id': 'id',
		'last_activity': 'last_activity'
	},
	'modern_subjects.card_completions': {
		'session_id': None,
		'last_activity': 'finished_at'
}

Отмечу, что чтение из Kafka у нас асинхронное, но это сделано больше не для скорости, а для удобства написания кода. Парсинг полей и обогащение сообщений удобно вынести в отдельную callback-функцию, а обработку событий из разных топиков удобно производить в едином цикле событий.

Атрибуция или обогащение данных также происходит внутри callback-функции, но здесь для разных топиков алгоритм может быть совсем разным: какие-то данные хранятся в кэшах Redis, за какими-то данными нужно ходить в базу. Ниже мы рассмотрим примеры того, как происходит поиск класса для DAU — это более интересный кейс. 

После обогащения мы должны создать агрегат или найти уже существующий. Агрегат — это запись в Redis, которая сохраняет в себе количество событий или сумму какой-то величины (например, платежей) с учетом всех возможных атрибутов на определенный час. Грубо говоря, это результат выполнения GROUP BY по всем возможным полям. 

Как устроены сами агрегаты? В названии записи мы храним дату и час, в котором у нас сохраняются события, в ключах — набор всех необходимых атрибутов, а значения — это и есть сами метрики. Ниже приведен схематичный код работы KafkaWorker.

async def callback_sessions(msg, topic):
	...

def consumer_factory():
	for topic in topics:
		yield AsyncKafkaConsumer(
			topic=t,
			...
			callback=partial(callback_session, topic=t)
		)

def run():
	for c in consumer_factory():
		asyncio.ensure_future(c.consume())

	loop = asyncio.get_event_loop()
	loop.run_forever()

На сниппете ниже можно увидеть пример работы с Redis через интерактивный режим Python. Здесь для сессий есть ключ, у которого указана дата — 22 февраля 2023 года. И 14 часов — это тот час, в который мы сохраняем наши события. Также в этих записях хранятся ключи с информацией, какие именно атрибуты здесь присутствуют. Атрибуты приведенного ниже ключа — класс, сегмент и предмет. Количество событий — два.

>>> r.keys('sessions*')
[b'sessions_2023-02-22_14', ...]
>>> r.hkeys('sessions_2023-02-22_14')
[b'grade_2_segment_regions_subject_other', ...]
>>> r.hget(
'sessions_2023-02-22_14', 'grade_2_segment_regions_subject_other'
)
b'2'

Как работает DB Saver

Далее в игру вступает DB Saver. Это воркер, который занимается сохранением наших данных из Redis в базу. Здесь же мы сохраняем как часовые, так и дневные агрегаты, потому что нам не захотелось перегружать фронтенд лишними расчетами, которые легко можно выполнять фоном.

Одна из самых частых проблем с инфраструктурой — сброс подключения к базе. В таких случаях мы используем объект django.db.connection. Он хорошо справляется с обрывами соединений. С определенной периодичностью мы синхронизируем Redis с базой и обновляем часовые (иногда и дневные) значения. Если у нас возникает какая-то ошибка, например, interface error или operational error, то мы просто вызываем функцию connect, которая переподключает наше соединение. И пока у нас запись не попала в базу, мы не сбрасываем ключ из Redis.

from django.db import connection

def run():
	while True:
		try:
			time.sleep(SYNC_INTERVAL)
			save_redis_data_to_db()
			update_daily_counts()
		except (InterfaceError, OperationalError): 
			connection.connect()
			continue

Ниже представлен код модели, как у нас хранится сущность агрегата уже в базе. Среди прочего здесь есть пометка frame о том, какой это агрегат — часовой или дневной. Также приведены поля для фильтрации: предмет, сегмент и другие.

class SessionsEntity(models.Model):
	count = models.IntegerField()
	grade = models.IntegerField()
	segment = models.CharField(max_length=10)
	subject = models.CharField(max_length=25)
	frame = models.CharField(max_length=10)
	ts = models.DateTimeField(db_index=True)
	created_at = models.DateTimeField()

	objects = DBManager()

	class Meta:
		db_table = 'session_entities'

Как работает сбор данных для API

Данные для фронтенда собираются как из базы, так и из Redis. События из Kafka могут долетать с задержкой (иногда очень большой). Повлиять на это нельзя, поскольку это особенность работы самой системы. Поэтому наш DB Saver может записывать данные в достаточно далекое прошлое: это может измеряться днями и иногда неделями. Получив запоздавшие ключи в Redis, DB Saver должен их правильно записать в базу.

В нашем интерфейсе есть: 

  • почасовой график; 

  • график накопительный; 

  • блок, где можно выставить различные фильтры; 

  • и общий график по году, где у нас собираются дневные агрегаты.

Как мы работаем с DAU

В какой-то момент бизнес попросил реализовать аналитику количества уникальных дневных посетителей. Здесь у нас появились сложности, которых не было ранее с сессиями и другими метриками, а именно:

  • С сессиями нам не была нужна уникальность в течение часа или дня — с DAU это не так, и именно момент с подсчетом уникальности нужно было реализовать особенно аккуратно. 

  • У нас нет какого-то специального топика с пользователями, но есть pageviews — посещения страниц. И все бы хорошо, но в этом топике примерно в 15 раз больше событий, то есть, их примерно 1500 в секунду (в сессиях у нас около 100 событий в секунду). 

  • Как следствие предыдущего пункта, мы уже не успеваем искать атрибуты так, как мы это делали раньше. 

Ввиду требования уникальности по часовым и дневным фреймам мы решили, что будем хранить в базе и простые, и кумулятивные агрегаты. Наши воркеры суммируют количество посещений пользователей и записывают их базу с учетом уже имеющихся данных, поскольку здесь не работает простое суммирование. В силу такого большого потока событий мы вынуждены сохранять данные в Redis и искать атрибуты в базе батчами в несколько потоков.

Схема сохранения агрегатов DAU в Redis: 

  • KafkaWorker записывает данные не сразу в Redis, а в очередь;

  • из этой очереди воркеры, которые занимаются обогащением данных, извлекают события и в параллельном режиме ищут им атрибуты;

  • как только эти атрибуты найдены, данные записываются в очередь на сохранение в Redis;

  • далее воркеры, которые занимаются сохранением данных в Redis, суммируют и сохраняют их с нужными ключами.

Вот как, например, в случае с DAU выглядит поиск параллели по базе:

    select st.id as student_id, cb.parallel as parallel
    from students st
    join class_books cb on st.class_book_id = cb.id
    where st.id=any(%(student_ids)s)

У нас есть некий список ID студентов, далее мы делаем запрос с использованием any. Поскольку по ID есть индекс, подобный запрос работает достаточно быстро. Мы можем вставить в наш массив определенное количество значений — 100, 1000 — и оперативно восстановить нашу параллель.

Выводы

Нам удалось построить легковесную систему аналитики, из которой можно получать актуальные данные на текущий момент о том, как ведут себя пользователи на платформе. При этом сохраняется несколько челленджей: 

  • Словари для парсинга разных топиков приходится вручную поддерживать в актуальном состоянии — не то, чтобы большая проблема, но явно напрашивается какая-то автоматизация.

  • Многие атрибуты мы кешируем, и всегда есть trade-off между временем жизни кэша и актуальностью информации. Например, мы можем прочитать из кэша неправильную информацию, и, регулируя размер кэша, мы можем увеличивать точность наших данных, но уменьшать при этом скорость обогащения — возможно, с этим тоже придется что-то делать. 

  • И, разумеется, в любой архитектуре есть вопрос масштабирования: что случится, когда данных станет в 10, 100 или 1000 раз больше? Время покажет! :-) 


Хочешь развивать школьный EdTech вместе с нами — присоединяйся к команде Учи.ру!

Комментарии (0)