Введение: два источника правды - одна большая проблема
Представьте: вы строите систему верификации дипломов. Требования простые - данные должны быть неизменяемыми (привет, блокчейн) и при этом быстро доступными для запросов (привет, PostgreSQL). Казалось бы, идеальное решение - писать в оба хранилища. Но дьявол, как всегда, кроется в деталях.
Наш проект использует паттерн двойной записи (Dual-Write):
Solana — гарантирует неизменность и прозрачность данных о выданных дипломах
PostgreSQL (Supabase) — обеспечивает быстрые выборки и сложные запросы
Звучит красиво на архитектурных диаграммах, но в production всё не так радужно. Главная проблема — частичные сбои. Транзакция в Solana прошла успешно, диплом записан в блокчейн навечно, а вот запись в PostgreSQL упала. Пользователь получил подтверждение, но половина системы о его дипломе не знает.
Сегодня я покажу, как мы столкнулись с этой проблемой лицом к лицу и какие паттерны применили для её решения.
Анатомия сбоя: где именно всё ломается?
Давайте посмотрим на реальный код из нашего internal/api/handlers.rs. Функция issue_diploma - это то место, где происходит магия... и где всё может пойти не так:
pub async fn issue_diploma(
State(state): State<Arc<AppState>>,
mut multipart: Multipart,
) -> Result<Json<IssueResponse>, AppError> {
// ... парсинг multipart данных ...
// Генерируем хеш диплома
let hash = hashing::generate_hash(
&file_bytes,
&req.issuer_id,
&req.recipient_id,
issued_at,
req.serial.as_deref(),
);
// Подписываем хеш приватным ключом
let signature = hashing::sign_hash(&hash, &state.issuer_keypair)?;
let diploma = Diploma {
hash: hash.clone(),
issuer_id: req.issuer_id.clone(),
recipient_id: req.recipient_id.clone(),
signature: Some(signature.clone()),
issued_at,
serial: req.serial.clone(),
ipfs_cid: None,
};
// КРИТИЧЕСКАЯ ТОЧКА №1: Запись в блокчейн Solana
// Эта операция может занять 1-3 секунды и стоит денег (газ)
let chain_record = state.chain_client.write_hash(&hash, &diploma).await?;
// Подготавливаем данные для базы
let credential_data = serde_json::json!({
"hash": &diploma.hash,
"issuer_id": &diploma.issuer_id,
"recipient_id": &diploma.recipient_id,
"solana_tx_id": &chain_record.tx_id,
"issued_at": diploma.issued_at.to_rfc3339(),
});
// КРИТИЧЕСКАЯ ТОЧКА №2: Запись в PostgreSQL
// А вот здесь может произойти катастрофа
let db_response = state
.db_client
.from("credentials")
.insert(credential_data.to_string())
.execute()
.await;
// Обработка ошибки базы данных
let db_response = match db_response {
Ok(response) => response,
Err(e) => {
tracing::error!("Database request failed: {}", e);
return Err(AppError::Database(format!("Database request failed: {}", e)));
}
};
if !db_response.status().is_success() {
let status = db_response.status();
let error_body = db_response.text().await.unwrap_or_default();
// ВОТ ОНА - КРИТИЧЕСКАЯ НЕСОГЛАСОВАННОСТЬ!
tracing::error!(
"CRITICAL INCONSISTENCY: Failed to save to Supabase after successful Solana transaction. \
tx_id: {}, hash: {}. Status: {}. Body: {}",
chain_record.tx_id,
hash,
status,
error_body
);
// Мы уже записали в блокчейн, откатить нельзя!
return Err(AppError::Internal(
"Failed to save credential record after blockchain confirmation.".to_string(),
));
}
// Если всё хорошо - возвращаем успешный ответ
Ok(Json(IssueResponse {
hash,
tx_id: chain_record.tx_id,
signature: Some(signature),
issued_at,
}))
}
Вот как выглядит последовательность операций и точка отказа:
┌──────────┐ ┌────────────┐ ┌─────────┐ ┌────────────┐
│ Клиент │────▶│ Rust API │───▶│ Solana │────▶│ УСПЕХ │
└──────────┘ └────────────┘ └─────────┘ └────────────┘
│ │
│ ▼
│ ┌────────────┐
└───────────────────────────▶│ PostgreSQL │
└────────────┘
│
▼
┌────────────┐
│ СБОЙ! │
└────────────┘
│
▼
┌──────────────────────┐
│ РАССИНХРОН: │
│ • Блокчейн: ✓ есть |
│ • База: ✗ нет |
└──────────────────────┘
Последствия рассинхрона
Что происходит после такого сбоя? У нас есть несколько неприятных сценариев:
Пользователь не может найти свой диплом через API запросы к базе данных
Невозможность построить аналитику — данные в базе неполные
Проблемы с аудитом — в блокчейне есть запись, в отчётах её нет
Дублирование при повторной попытке — пользователь может попробовать выпустить диплом ещё раз
Теория на практике: паттерны для спасения данных
Проблема согласованности: выбираем стратегию
В распределённых системах есть два основных подхода к согласованности:
1. Строгая согласованность (Strong Consistency) - все узлы видят одинаковые данные в один момент времени. Это дорого и сложно, особенно когда один из узлов - публичный блокчейн.
2. Итоговая согласованность (Eventual Consistency) - данные могут временно различаться, но в конечном итоге придут к согласованному состоянию.
Мы выбрали итоговую согласованность. Почему? Откатить транзакцию в Solana после подтверждения - невозможно. Значит, нужно гарантировать, что PostgreSQL рано или поздно получит эти данные.
Паттерн Saga: длинные транзакции с компенсацией
Паттерн Saga разбивает длинную распределённую транзакцию на последовательность локальных транзакций. Каждый шаг может иметь компенсирующую транзакцию для отката.
Как это могло бы выглядеть в нашем случае:
// Псевдокод Saga для выпуска диплома
enum SagaStep {
SaveToDatabase, // Шаг 1
WriteToBlockchain, // Шаг 2
UpdateDatabaseStatus // Шаг 3
}
async fn issue_diploma_saga(diploma: Diploma) -> Result<(), SagaError> {
// Шаг 1: Сохраняем в БД со статусом "pending"
let db_record = match save_to_database_with_status(&diploma, "pending").await {
Ok(record) => record,
Err(e) => {
// Ничего откатывать не нужно, просто выходим
return Err(SagaError::DatabaseFailed(e));
}
};
// Шаг 2: Пишем в блокчейн
let tx_id = match write_to_blockchain(&diploma).await {
Ok(tx) => tx,
Err(e) => {
// Компенсирующая транзакция: помечаем запись как неудачную
mark_database_record_failed(&db_record.id).await?;
return Err(SagaError::BlockchainFailed(e));
}
};
// Шаг 3: Обновляем статус в БД на "confirmed"
match update_database_status(&db_record.id, "confirmed", &tx_id).await {
Ok(_) => Ok(()),
Err(e) => {
// Здесь компенсация сложная: в блокчейне уже есть запись
// Можно только пометить в БД для ручного вмешательства
mark_for_manual_reconciliation(&db_record.id, &tx_id).await?;
Err(SagaError::InconsistentState(e))
}
}
}
Проблема с Saga в блокчейне: Компенсирующие транзакции в Solana стоят денег (газ) и не отменяют предыдущие записи, а добавляют новые. Это делает паттерн дорогим и сложным.
Идемпотентность и повторные попытки
Идемпотентность - это свойство операции давать один и тот же результат при повторных вызовах. В нашем контексте это критически важно.
Вот как мы могли бы добавить механизм повторных попыток:
use tokio::time::{sleep, Duration};
async fn write_to_database_with_retry(
db_client: &Postgrest,
data: serde_json::Value,
max_retries: u32,
) -> Result<(), AppError> {
let mut retries = 0;
let mut backoff = Duration::from_millis(100);
loop {
match db_client
.from("credentials")
.insert(data.to_string())
.execute()
.await
{
Ok(response) if response.status().is_success() => {
return Ok(());
}
Ok(response) if response.status() == 409 => {
// Конфликт - запись уже существует (идемпотентность!)
tracing::info!("Record already exists, considering it success");
return Ok(());
}
Ok(_) | Err(_) if retries < max_retries => {
retries += 1;
tracing::warn!(
"Database write failed, retry {}/{} after {:?}",
retries, max_retries, backoff
);
sleep(backoff).await;
backoff *= 2; // Экспоненциальная задержка
}
_ => {
return Err(AppError::Database(
"Failed after maximum retries".to_string()
));
}
}
}
}
Недостаток: Если база недоступна долго (например, плановое обслуживание), пользователь будет ждать. А блокчейн-транзакция уже выполнена!
Наше решение: паттерн Outbox и фоновый Reconciliation Job
После анализа различных подходов, мы остановились на комбинации двух паттернов:
Паттерн Transactional Outbox
Суть паттерна Outbox - вместо записи напрямую в две системы, мы делаем одну атомарную транзакцию в первичное хранилище, включая событие в таблицу outbox.
Вот как изменится наша архитектура:
// Новая структура для outbox
#[derive(Serialize, Deserialize)]
struct OutboxEvent {
id: Uuid,
event_type: String,
payload: serde_json::Value,
status: String, // "pending", "processing", "completed", "failed"
created_at: DateTime<Utc>,
processed_at: Option<DateTime<Utc>>,
retry_count: u32,
error_message: Option<String>,
}
// Изменённая функция issue_diploma
pub async fn issue_diploma_with_outbox(
State(state): State<Arc<AppState>>,
mut multipart: Multipart,
) -> Result<Json<IssueResponse>, AppError> {
// ... парсинг и генерация хеша как раньше ...
// ВАЖНО: Сначала пишем в БД в одной транзакции
let mut transaction = state.db_client.begin_transaction().await?;
// Сохраняем диплом со статусом "pending_blockchain"
let credential_data = serde_json::json!({
"hash": &diploma.hash,
"issuer_id": &diploma.issuer_id,
"recipient_id": &diploma.recipient_id,
"status": "pending_blockchain",
"issued_at": diploma.issued_at.to_rfc3339(),
});
transaction
.from("credentials")
.insert(credential_data.to_string())
.execute()
.await?;
// Добавляем событие в outbox
let outbox_event = serde_json::json!({
"id": Uuid::new_v4(),
"event_type": "WRITE_TO_BLOCKCHAIN",
"payload": serde_json::to_value(&diploma)?,
"status": "pending",
"created_at": Utc::now(),
"retry_count": 0,
});
transaction
.from("outbox_events")
.insert(outbox_event.to_string())
.execute()
.await?;
// Коммитим транзакцию - либо всё, либо ничего!
transaction.commit().await?;
// Возвращаем ответ пользователю
Ok(Json(IssueResponse {
hash: diploma.hash,
tx_id: "pending".to_string(), // Будет обновлён асинхронно
signature: Some(signature),
issued_at: diploma.issued_at,
}))
}
Теперь нужен фоновый процесс для обработки событий из outbox:
// Фоновый воркер для обработки outbox
async fn outbox_processor(state: Arc<AppState>) {
loop {
// Получаем необработанные события
let events = fetch_pending_outbox_events(&state.db_client).await;
for event in events {
match event.event_type.as_str() {
"WRITE_TO_BLOCKCHAIN" => {
process_blockchain_write(event, &state).await;
}
_ => {
tracing::warn!("Unknown event type: {}", event.event_type);
}
}
}
// Спим перед следующей итерацией
tokio::time::sleep(Duration::from_secs(5)).await;
}
}
async fn process_blockchain_write(
event: OutboxEvent,
state: &Arc<AppState>
) {
let diploma: Diploma = serde_json::from_value(event.payload.clone())
.expect("Failed to deserialize diploma");
// Пытаемся записать в блокчейн
match state.chain_client.write_hash(&diploma.hash, &diploma).await {
Ok(chain_record) => {
// Успех! Обновляем статусы
let mut transaction = state.db_client.begin_transaction().await.unwrap();
// Обновляем credential
transaction
.from("credentials")
.update(serde_json::json!({
"status": "confirmed",
"solana_tx_id": chain_record.tx_id,
}).to_string())
.eq("hash", &diploma.hash)
.execute()
.await
.unwrap();
// Помечаем событие как обработанное
transaction
.from("outbox_events")
.update(serde_json::json!({
"status": "completed",
"processed_at": Utc::now(),
}).to_string())
.eq("id", event.id.to_string())
.execute()
.await
.unwrap();
transaction.commit().await.unwrap();
}
Err(e) => {
// Ошибка - увеличиваем счётчик попыток
update_outbox_event_retry(&state.db_client, event.id, e.to_string()).await;
}
}
}
Reconciliation Job: сверочный процесс
Даже с Outbox паттерном что-то может пойти не так. Поэтому мы добавили фоновый процесс сверки:
// Периодическая сверка данных между Solana и PostgreSQL
async fn reconciliation_job(state: Arc<AppState>) {
loop {
tracing::info!("Starting reconciliation check...");
// Получаем последние транзакции из Solana за последний час
let cutoff_time = Utc::now() - Duration::from_secs(3600);
let blockchain_records = fetch_recent_blockchain_transactions(
&state.chain_client,
cutoff_time
).await;
for record in blockchain_records {
// Проверяем, есть ли запись в PostgreSQL
let db_result = state
.db_client
.from("credentials")
.select("hash")
.eq("hash", &record.hash)
.single()
.execute()
.await;
if db_result.is_err() || !db_result.unwrap().status().is_success() {
// Запись отсутствует в БД - добавляем
tracing::warn!(
"Found orphaned blockchain record: hash={}, tx_id={}",
record.hash,
record.tx_id
);
// Восстанавливаем запись из блокчейна
let recovery_data = serde_json::json!({
"hash": record.hash,
"solana_tx_id": record.tx_id,
"status": "recovered_from_blockchain",
"recovered_at": Utc::now(),
// Остальные поля берём из метаданных транзакции
});
match state
.db_client
.from("credentials")
.insert(recovery_data.to_string())
.execute()
.await
{
Ok(_) => {
tracing::info!("Successfully recovered record: {}", record.hash);
// Отправляем алерт команде
send_alert(
"Data inconsistency detected and fixed",
&format!("Recovered hash {} from blockchain", record.hash)
).await;
}
Err(e) => {
tracing::error!("Failed to recover record: {}", e);
}
}
}
}
// Запускаем сверку каждые 5 минут
tokio::time::sleep(Duration::from_secs(300)).await;
}
}
Визуализация нового подхода:
┌──────────┐ ┌────────────┐ ┌─────────────┐
│ Клиент │────▶│ Rust API │────▶│ PostgreSQL │
└──────────┘ └────────────┘ │ + Outbox │
└─────────────┘
│
▼
┌─────────────┐
│ УСПЕХ │
│ (Атомарно) │
└─────────────┘
│
┌──────────────────┼──────────────────┐
▼ ▼ ▼
┌─────────────────┐ ┌──────────────┐ ┌──────────────┐
│ Outbox Processor│ │Reconciliation│ │ Monitoring │
│ (Async) │ │ Job │ │ & Alerts │
└─────────────────┘ └──────────────┘ └──────────────┘
│ │
▼ ▼
┌──────────┐ ┌──────────┐
│ Solana │◀─────│ Проверка │
└──────────┘ └──────────┘
Что можно улучшить: взгляд в будущее
Очередь с повторными попытками
Вместо простого outbox в БД, можно использовать полноценную очередь сообщений:
// Интеграция с Redis Streams для более надёжной доставки
use redis::AsyncCommands;
async fn publish_to_queue(
redis_client: &redis::Client,
diploma: &Diploma,
) -> Result<(), AppError> {
let mut conn = redis_client.get_async_connection().await?;
let event = serde_json::json!({
"type": "WRITE_TO_BLOCKCHAIN",
"payload": diploma,
"timestamp": Utc::now().to_rfc3339(),
"retry_count": 0,
});
// Добавляем в Redis Stream с автогенерацией ID
conn.xadd(
"diploma:outbox",
"*",
&[("event", serde_json::to_string(&event)?)],
).await?;
Ok(())
}
// Консьюмер с группой для гарантированной доставки
async fn consume_from_queue(redis_client: &redis::Client, state: Arc<AppState>) {
let mut conn = redis_client.get_async_connection().await.unwrap();
// Создаём consumer группу
let _: Result<(), _> = conn.xgroup_create_mkstream(
"diploma:outbox",
"blockchain_writers",
"$",
).await;
loop {
// Читаем события из очереди
let events: Vec<StreamReadReply> = conn.xreadgroup(
&["diploma:outbox"],
"blockchain_writers",
"worker_1",
&[">"],
Some(1),
None,
).await.unwrap();
for event in events {
// Обрабатываем и подтверждаем
process_event(event, &state).await;
conn.xack("diploma:outbox", "blockchain_writers", &[event.id]).await.unwrap();
}
}
}
Мониторинг и алертинг
Критически важно отслеживать состояние системы:
use prometheus::{register_counter_vec, register_histogram_vec, CounterVec, HistogramVec};
lazy_static! {
static ref INCONSISTENCY_COUNTER: CounterVec = register_counter_vec!(
"diploma_inconsistencies_total",
"Total number of data inconsistencies detected",
&["type"]
).unwrap();
static ref RECONCILIATION_DURATION: HistogramVec = register_histogram_vec!(
"reconciliation_duration_seconds",
"Time taken to reconcile records",
&["status"]
).unwrap();
}
// Используем метрики в коде
async fn monitor_inconsistency(inconsistency_type: &str) {
INCONSISTENCY_COUNTER
.with_label_values(&[inconsistency_type])
.inc();
// Алерт если слишком много несогласованностей
let total = INCONSISTENCY_COUNTER
.with_label_values(&[inconsistency_type])
.get();
if total > 10.0 {
send_critical_alert(
"High inconsistency rate detected",
&format!("Type: {}, Count: {}", inconsistency_type, total)
).await;
}
}
Event Sourcing для полной трассировки
Можно пойти ещё дальше и хранить все события как неизменяемый лог:
#[derive(Serialize, Deserialize)]
enum DiplomaEvent {
Created {
hash: String,
issuer_id: String,
recipient_id: String,
timestamp: DateTime<Utc>,
},
BlockchainWriteRequested {
hash: String,
timestamp: DateTime<Utc>,
},
BlockchainWriteCompleted {
hash: String,
tx_id: String,
timestamp: DateTime<Utc>,
},
BlockchainWriteFailed {
hash: String,
error: String,
retry_count: u32,
timestamp: DateTime<Utc>,
},
ReconciliationDetected {
hash: String,
source: String, // "blockchain" или "database"
timestamp: DateTime<Utc>,
},
}
// Это даёт нам полную историю каждого диплома
async fn append_event(
db_client: &Postgrest,
event: DiplomaEvent,
) -> Result<(), AppError> {
let event_data = serde_json::json!({
"event_type": event.variant_name(),
"payload": serde_json::to_value(&event)?,
"timestamp": Utc::now(),
});
db_client
.from("diploma_events")
.insert(event_data.to_string())
.execute()
.await?;
Ok(())
}
Заключение: уроки, извлечённые из транзакций
Работая с паттерном двойной записи между Solana и PostgreSQL, мы извлекли несколько важных уроков:
Никогда не полагайтесь на последовательные вызовы — если первый прошёл успешно, это не гарантирует успех второго. Особенно когда первый — это необратимая операция в блокчейне.
Проектируйте с учётом сбоев — вопрос не в том, упадёт ли система, а в том, когда это произойдёт. Паттерн Outbox и фоновая сверка — это не избыточность, а необходимость.
Итоговая согласованность — ваш друг, не пытайтесь достичь строгой согласованности между блокчейном и традиционной БД. Это дорого, сложно и часто невозможно.
Мониторинг критичен — лучше получить алерт о рассинхроне через минуту, чем узнать о проблеме от пользователя через неделю.
Идемпотентность спасает — проектируйте операции так, чтобы их можно было безопасно повторять. Это упрощает восстановление после сбоев.
Для коллег, работающих с Web3-бэкендами на Rust: блокчейн — это не серебряная пуля. Это мощный инструмент, но он требует тщательного проектирования всей системы. Двойная запись кажется простым решением, пока вы не столкнётесь с первым production‑сбоем в 3 часа ночи.
Помните: в распределённых системах всё, что может пойти не так, обязательно пойдёт не так. Проектируйте соответственно.
Полезные ссылки
Telegram — автор
Если у вас есть опыт решения подобных проблем или вопросы по реализации - давайте обсудим в комментариях. Особенно интересно услышать про альтернативные подходы к синхронизации блокчейна с традиционными БД.