Всем привет! Я Станислав Бушуев, Software Engineer в Semrush. Сегодня хочу поделиться идеями, как можно реализовать синхронизацию данных между различными хранилищами. Такие задачи иногда возникают в работе, например, при удалении пользовательских данных в рамках General Data Protection Regulation (GDPR) и California Consumer Privacy Act (CCPA).
Следовать этим законам нетрудно, если вы поддерживаете среднестатистический сайт или небольшой продукт. Скорее всего, в данных случаях используется один из популярных фреймворков, и таблица пользователей хранится в общеизвестной базе данных (MySQL, PostgreSQL).
Пришел запрос на удаление всех данных пользователя? Не проблема! Удаляем строку в таблице, и готово. Не считая логов, трейсов ошибок, бэкапов и других интересных мест. Подразумеваем, конечно, что все настроено верно и личные данные пользователей скрываются звездочками или еще как-то.
В нашем случае ситуация чуть сложнее. За 13 лет в Semrush создали более 50 инструментов, и все они поддерживаются десятками команд разработки. Несколько лет назад каждая команда вынесла код своего инструмента в отдельный микросервис, данные пользователя мы получаем из сервиса пользователей. Но так или иначе мы храним всю информацию, которая необходима для работы инструмента.
Таким образом, перед нами встал вопрос: как синхронизировать хранилища данных микросервиса и сервиса пользователей.
Способы синхронизации
1. Периодически мы можем сверяться с сервисом пользователей по REST API:
$ curl '<http://user-service.internal.net/api/v1/users/42>'
[
{
"id": 42,
"registration_date": "2015-03-08 01:00:00",
"email": "john@example.com",
"name": "John",
...
Можно даже сразу отправить список пользователей на проверку, но это как-то странно. Удаления пользователей происходят не так часто, чтобы совершать DDoS-атаки на сервис.
2. Событийно-ориентированная архитектура — еще один подход для решения нашей задачи. Здесь появляются две сущности: генератор сообщений (Publisher) и подписчик (Subscriber), который читает канал событий (topic).
Можно отказаться от сущности подписчика и всем микросервисам выставить endpoint API, на который сервис пользователей слал бы запросы при появлении нового события. В таком случае нужно согласовать протоколы взаимодействия API: REST, JSON-RPC, gRPC, GraphQL, OpenAPI или что там еще может быть. Кроме того, необходимо держать конфигурационные файлы микросервисов, куда слать запросы, а самое главное: что делать, когда запрос не доходит до микросервиса после третьего повтора?
Плюсы данной архитектуры:
Асинхронная автоматическая синхронизация хранилищ данных.
Нагрузка на сервис пользователей повышается незначительно, так мы добавляем только асинхронную запись события в канал событий.
Синхронизация данных различных хранилищ находится отдельно от (и так нагруженного) сервиса пользователей.
Минусы:
Минус, вытекающий из первого плюса: неконсистентность данных между сервисами пользователей и остальными микросервисами.
Отсутствие транзакций: формируются простые сообщения.
Необходимо учитывать, что сообщения в очереди могут повториться.
Вообще, все перечисленные преимущества и недостатки довольно условны и зависят от конкретных задач. Универсальных решений, на мой взгляд, нет. По этой теме полезно будет почитать о CAP-теореме.
Реализация событийно-ориентированной архитектуры на примере Pub/Sub от Google Cloud
Существует множество альтернатив: Kafka, RabbitMQ, но наша команда выбрала решение от Pub/Sub Google Cloud, так как мы уже используем Google Cloud (подробнее можно почитать в статье моего коллеги Никиты Шальнова: Что такое Immutable Infrastructure), и оно проще в настройке тех же Kafka или RabbitMQ.
В нашем случае Publisher — это сервис пользователей, а Subscriber — микросервис команды определенного инструмента. Subscriber’ов (как и Subscription) может быть сколько угодно. Учитывая, что в Semrush большое количество инструментов и команд, очередь с подписчиками для нас идеальна:
Каждый читает очередь с необходимой для него частотой.
Кто-то может выставить endpoint, и topic будет его вызывать немедленно при появлении сообщения (если нужно получать новые сообщения моментально).
Даже экзотические варианты, например откат базы данных инструмента из старого бэкапа, не вызывают проблем: просто перечитаем сообщения из топика (стоит оговориться, что нужно учитывать идемпотентность запросов, и, возможно, вам стоит читать топик не с начала, а с какого-то момента).
Subscriber предоставляет REST протокол, но для упрощения разработки существуют еще клиенты для различных языков программирования: Go, Java, Python, Node.js, C#, C++, PHP, Ruby.
Можно использовать один топик с разными сообщениями, например, у нас есть несколько типов сообщений: изменение пользователя, изменение доступов пользователя и так далее.
Пример реализации
Создание топика и подписчика:
gcloud pubsub topics create topic
gcloud pubsub subscriptions create subscription --topic=topic
Подробнее можно узнать в документации.
Создание пользователя для чтения топика:
gcloud iam service-accounts create SERVICE_ACCOUNT_ID \
--description="DESCRIPTION" \
--display-name="DISPLAY_NAME"
gcloud projects add-iam-policy-binding PROJECT_ID \
--member="serviceAccount:SERVICE_ACCOUNT_ID@PROJECT_ID.iam.gserviceaccount.com" \
--role="pubsub.subscriber"
gcloud iam service-accounts keys create key-file \
--iam-account=sa-name@project-id.iam.gserviceaccount.com
Скачанный ключ в формате json нужно сохранить и пробросить сервису. Не забывайте про правила обращения с секретами! Об этом знают все и немного больше мои коллеги из Security-команды. Дайте знать в комментариях, если тема кажется вам полезной и интересной для нашей следующей статьи.
Пользователь для публикации сообщений создается аналогично, за исключением роли: --role="pubsub.subscriber" → --role="pubsub.publisher".
Для примера возьмем один из наших микросервисов, работающих на Python c Celery. Для сообщений из сервиса пользователей есть схема, описанная с помощью Protocol Buffers.
import json
import os
import celery
from google.cloud import pubsub_v1
from google.oauth2 import service_account
from user_pb2 import UserEventData
PUBSUB_SERVICE_ACCOUNT_INFO = json.loads(os.environ.get('PUBSUB_SERVICE_ACCOUNT', '{}'))
PUBSUB_PROJECT = 'your project'
PUBSUB_SUBSCRIBER = 'subscription'
@celery.shared_task
def pubsub_synchronisation() -> None:
credentials = service_account.Credentials.from_service_account_info(
PUBSUB_SERVICE_ACCOUNT_INFO, scopes=['<https://www.googleapis.com/auth/pubsub>']
)
with pubsub_v1.SubscriberClient(credentials=credentials) as subscriber:
subscription_path = subscriber.subscription_path(PUBSUB_PROJECT, PUBSUB_SUBSCRIBER)
response = subscriber.pull(request={"subscription": subscription_path, "max_messages": 10000})
ack_ids, removed_user_ids = [], []
for msg in response.received_messages:
user_event_data = UserEventData()
user_event_data.ParseFromString(msg.message.data)
removed_user_ids.append(user_event_data.Id)
ack_ids.append(msg.ack_id)
# Here you can do everything with removed users :)
subscriber.acknowledge(request={"subscription": subscription_path, "ack_ids": ack_ids})
И запускаем задание раз в пять минут, так как удаление пользователей — не такая частая операция:
CELERY_BEAT_SCHEDULE = {
'pubsub_synchronisation': {
'task': 'tasks.pubsub_ubs_synchronisation',
'schedule': timedelta(minutes=5)
},
Пример публикации сообщений в топик на Python реализуется аналогичным образом. Используйте PublisherClient вместо SubscriberClient и вызывайте метод publish вместо pull.
В результате есть синхронизация удаления пользователей для соответствия законам GDPR/CCPA. Например, в 7:37 произошло массовое удаление аккаунтов из сервиса хранения учетных записей пользователей. В 7:40 сработала задача на получение данных из Topic'а. Все задачи выбраны, локальная база синхронизирована.
В статье мы рассмотрели две архитектуры и остановились на событийно-ориентированной. Вполне вероятно, что в вашем случае можно будет обойтись ручной синхронизацией.
Надеюсь, этот материал окажется полезным. Спасибо за внимание!