Всем привет! Я Станислав Бушуев, Software Engineer в Semrush. Сегодня хочу поделиться идеями, как можно реализовать синхронизацию данных между различными хранилищами. Такие задачи иногда возникают в работе, например, при удалении пользовательских данных в рамках General Data Protection Regulation (GDPR) и California Consumer Privacy Act (CCPA).

Следовать этим законам нетрудно, если вы поддерживаете среднестатистический сайт или небольшой продукт. Скорее всего, в данных случаях используется один из популярных фреймворков, и таблица пользователей хранится в общеизвестной базе данных (MySQL, PostgreSQL).

Пришел запрос на удаление всех данных пользователя? Не проблема! Удаляем строку в таблице, и готово. Не считая логов, трейсов ошибок, бэкапов и других интересных мест. Подразумеваем, конечно, что все настроено верно и личные данные пользователей скрываются звездочками или еще как-то.

В нашем случае ситуация чуть сложнее. За 13 лет в Semrush создали более 50 инструментов, и все они поддерживаются десятками команд разработки. Несколько лет назад каждая команда вынесла код своего инструмента в отдельный микросервис, данные пользователя мы получаем из сервиса пользователей. Но так или иначе мы храним всю информацию, которая необходима для работы инструмента.

Таким образом, перед нами встал вопрос: как синхронизировать хранилища данных микросервиса и сервиса пользователей.

Способы синхронизации 

1. Периодически мы можем сверяться с сервисом пользователей по REST API:

$ curl '<http://user-service.internal.net/api/v1/users/42>'
[
 {
   "id": 42,
   "registration_date": "2015-03-08 01:00:00",
   "email": "john@example.com",
   "name": "John",
...

Можно даже сразу отправить список пользователей на проверку, но это как-то странно. Удаления пользователей происходят не так часто, чтобы совершать DDoS-атаки на сервис.

2. Событийно-ориентированная архитектура — еще один подход для решения нашей задачи. Здесь появляются две сущности: генератор сообщений (Publisher) и подписчик (Subscriber), который читает канал событий (topic).

Можно отказаться от сущности подписчика и всем микросервисам выставить endpoint API, на который сервис пользователей слал бы запросы при появлении нового события. В таком случае нужно согласовать протоколы взаимодействия API: REST, JSON-RPC, gRPC, GraphQL, OpenAPI или что там еще может быть. Кроме того, необходимо держать конфигурационные файлы микросервисов, куда слать запросы, а самое главное: что делать, когда запрос не доходит до микросервиса после третьего повтора?

Плюсы данной архитектуры:

  • Асинхронная автоматическая синхронизация хранилищ данных.

  • Нагрузка на сервис пользователей повышается незначительно, так мы добавляем только асинхронную запись события в канал событий.

  • Синхронизация данных различных хранилищ находится отдельно от (и так нагруженного) сервиса пользователей.

Минусы:

  • Минус, вытекающий из первого плюса: неконсистентность данных между сервисами пользователей и остальными микросервисами.

  • Отсутствие транзакций: формируются простые сообщения.

  • Необходимо учитывать, что сообщения в очереди могут повториться.

Вообще, все перечисленные преимущества и недостатки довольно условны и зависят от конкретных задач. Универсальных решений, на мой взгляд, нет. По этой теме полезно будет почитать о CAP-теореме.

Реализация событийно-ориентированной архитектуры на примере Pub/Sub от Google Cloud

Существует множество альтернатив: Kafka, RabbitMQ, но наша команда выбрала решение от Pub/Sub Google Cloud, так как мы уже используем Google Cloud (подробнее можно почитать в статье моего коллеги Никиты Шальнова: Что такое Immutable Infrastructure), и оно проще в настройке тех же Kafka или RabbitMQ.

В нашем случае Publisher — это сервис пользователей, а Subscriber — микросервис команды определенного инструмента. Subscriber’ов (как и Subscription) может быть сколько угодно. Учитывая, что в Semrush большое количество инструментов и команд, очередь с подписчиками для нас идеальна:

  • Каждый читает очередь с необходимой для него частотой.

  • Кто-то может выставить endpoint, и topic будет его вызывать немедленно при появлении сообщения (если нужно получать новые сообщения моментально).

  • Даже экзотические варианты, например откат базы данных инструмента из старого бэкапа, не вызывают проблем: просто перечитаем сообщения из топика (стоит оговориться, что нужно учитывать идемпотентность запросов, и, возможно, вам стоит читать топик не с начала, а с какого-то момента).

  • Subscriber предоставляет REST протокол, но для упрощения разработки существуют еще клиенты для различных языков программирования: Go, Java, Python, Node.js, C#, C++, PHP, Ruby.

Можно использовать один топик с разными сообщениями, например, у нас есть несколько типов сообщений: изменение пользователя, изменение доступов пользователя и так далее.

Пример реализации

Создание топика и подписчика:

gcloud pubsub topics create topic
gcloud pubsub subscriptions create subscription --topic=topic

Подробнее можно узнать в документации.

Создание пользователя для чтения топика:

gcloud iam service-accounts create SERVICE_ACCOUNT_ID \
   --description="DESCRIPTION" \
   --display-name="DISPLAY_NAME"
gcloud projects add-iam-policy-binding PROJECT_ID \
   --member="serviceAccount:SERVICE_ACCOUNT_ID@PROJECT_ID.iam.gserviceaccount.com" \
   --role="pubsub.subscriber"
gcloud iam service-accounts keys create key-file \
   --iam-account=sa-name@project-id.iam.gserviceaccount.com

Скачанный ключ в формате json нужно сохранить и пробросить сервису. Не забывайте про правила обращения с секретами! Об этом знают все и немного больше мои коллеги из Security-команды. Дайте знать в комментариях, если тема кажется вам полезной и интересной для нашей следующей статьи.

Пользователь для публикации сообщений создается аналогично, за исключением роли: --role="pubsub.subscriber" → --role="pubsub.publisher".

Для примера возьмем один из наших микросервисов, работающих на Python c Celery. Для сообщений из сервиса пользователей есть схема, описанная с помощью Protocol Buffers.

import json
import os

import celery
from google.cloud import pubsub_v1
from google.oauth2 import service_account

from user_pb2 import UserEventData

PUBSUB_SERVICE_ACCOUNT_INFO = json.loads(os.environ.get('PUBSUB_SERVICE_ACCOUNT', '{}'))
PUBSUB_PROJECT = 'your project'
PUBSUB_SUBSCRIBER = 'subscription'

@celery.shared_task
def pubsub_synchronisation() -> None:
   credentials = service_account.Credentials.from_service_account_info(
       PUBSUB_SERVICE_ACCOUNT_INFO, scopes=['<https://www.googleapis.com/auth/pubsub>']
   )

   with pubsub_v1.SubscriberClient(credentials=credentials) as subscriber:
       subscription_path = subscriber.subscription_path(PUBSUB_PROJECT, PUBSUB_SUBSCRIBER)
       response = subscriber.pull(request={"subscription": subscription_path, "max_messages": 10000})

       ack_ids, removed_user_ids = [], []
       for msg in response.received_messages:
           user_event_data = UserEventData()
           user_event_data.ParseFromString(msg.message.data)

           removed_user_ids.append(user_event_data.Id)
           ack_ids.append(msg.ack_id)

       # Here you can do everything with removed users :)

       subscriber.acknowledge(request={"subscription": subscription_path, "ack_ids": ack_ids})

И запускаем задание раз в пять минут, так как удаление пользователей — не такая частая операция:

CELERY_BEAT_SCHEDULE = {
   'pubsub_synchronisation': {
       'task': 'tasks.pubsub_ubs_synchronisation',
       'schedule': timedelta(minutes=5)
   },

Пример публикации сообщений в топик на Python реализуется аналогичным образом. Используйте PublisherClient вместо SubscriberClient и вызывайте метод publish вместо pull.

В результате есть синхронизация удаления пользователей для соответствия законам GDPR/CCPA. Например, в 7:37 произошло массовое удаление аккаунтов из сервиса хранения учетных записей пользователей. В 7:40 сработала задача на получение данных из Topic'а. Все задачи выбраны, локальная база синхронизирована.

В статье мы рассмотрели две архитектуры и остановились на событийно-ориентированной. Вполне вероятно, что в вашем случае можно будет обойтись ручной синхронизацией.

Надеюсь, этот материал окажется полезным. Спасибо за внимание!

Комментарии (0)