Введение: два источника правды - одна большая проблема

Представьте: вы строите систему верификации дипломов. Требования простые - данные должны быть неизменяемыми (привет, блокчейн) и при этом быстро доступными для запросов (привет, PostgreSQL). Казалось бы, идеальное решение - писать в оба хранилища. Но дьявол, как всегда, кроется в деталях.

Наш проект использует паттерн двойной записи (Dual-Write):

  • Solana — гарантирует неизменность и прозрачность данных о выданных дипломах

  • PostgreSQL (Supabase) — обеспечивает быстрые выборки и сложные запросы

Звучит красиво на архитектурных диаграммах, но в production всё не так радужно. Главная проблема — частичные сбои. Транзакция в Solana прошла успешно, диплом записан в блокчейн навечно, а вот запись в PostgreSQL упала. Пользователь получил подтверждение, но половина системы о его дипломе не знает.

Сегодня я покажу, как мы столкнулись с этой проблемой лицом к лицу и какие паттерны применили для её решения.

Анатомия сбоя: где именно всё ломается?

Давайте посмотрим на реальный код из нашего internal/api/handlers.rs. Функция issue_diploma - это то место, где происходит магия... и где всё может пойти не так:

pub async fn issue_diploma(
    State(state): State<Arc<AppState>>,
    mut multipart: Multipart,
) -> Result<Json<IssueResponse>, AppError> {
    // ... парсинг multipart данных ...
    
    // Генерируем хеш диплома
    let hash = hashing::generate_hash(
        &file_bytes,
        &req.issuer_id,
        &req.recipient_id,
        issued_at,
        req.serial.as_deref(),
    );
    
    // Подписываем хеш приватным ключом
    let signature = hashing::sign_hash(&hash, &state.issuer_keypair)?;
    
    let diploma = Diploma {
        hash: hash.clone(),
        issuer_id: req.issuer_id.clone(),
        recipient_id: req.recipient_id.clone(),
        signature: Some(signature.clone()),
        issued_at,
        serial: req.serial.clone(),
        ipfs_cid: None,
    };
    
    // КРИТИЧЕСКАЯ ТОЧКА №1: Запись в блокчейн Solana
    // Эта операция может занять 1-3 секунды и стоит денег (газ)
    let chain_record = state.chain_client.write_hash(&hash, &diploma).await?;
    
    // Подготавливаем данные для базы
    let credential_data = serde_json::json!({
        "hash": &diploma.hash,
        "issuer_id": &diploma.issuer_id,
        "recipient_id": &diploma.recipient_id,
        "solana_tx_id": &chain_record.tx_id,
        "issued_at": diploma.issued_at.to_rfc3339(),
    });
    
    // КРИТИЧЕСКАЯ ТОЧКА №2: Запись в PostgreSQL
    // А вот здесь может произойти катастрофа
    let db_response = state
        .db_client
        .from("credentials")
        .insert(credential_data.to_string())
        .execute()
        .await;
    
    // Обработка ошибки базы данных
    let db_response = match db_response {
        Ok(response) => response,
        Err(e) => {
            tracing::error!("Database request failed: {}", e);
            return Err(AppError::Database(format!("Database request failed: {}", e)));
        }
    };
    
    if !db_response.status().is_success() {
        let status = db_response.status();
        let error_body = db_response.text().await.unwrap_or_default();
        
        // ВОТ ОНА - КРИТИЧЕСКАЯ НЕСОГЛАСОВАННОСТЬ!
        tracing::error!(
            "CRITICAL INCONSISTENCY: Failed to save to Supabase after successful Solana transaction. \
             tx_id: {}, hash: {}. Status: {}. Body: {}",
            chain_record.tx_id,
            hash,
            status,
            error_body
        );
        
        // Мы уже записали в блокчейн, откатить нельзя!
        return Err(AppError::Internal(
            "Failed to save credential record after blockchain confirmation.".to_string(),
        ));
    }
    
    // Если всё хорошо - возвращаем успешный ответ
    Ok(Json(IssueResponse {
        hash,
        tx_id: chain_record.tx_id,
        signature: Some(signature),
        issued_at,
    }))
}

Вот как выглядит последовательность операций и точка отказа:

┌──────────┐      ┌────────────┐     ┌─────────┐      ┌────────────┐
│  Клиент  │────▶│ Rust API    │───▶│ Solana  │────▶│   УСПЕХ    │
└──────────┘      └────────────┘     └─────────┘      └────────────┘
                        │                                    │
                        │                                    ▼
                        │                             ┌────────────┐
                        └───────────────────────────▶│ PostgreSQL  │
                                                      └────────────┘
                                                            │
                                                            ▼
                                                     ┌────────────┐
                                                     │   СБОЙ!    │
                                                     └────────────┘
                                                            │
                                                            ▼
                                                ┌──────────────────────┐
                                                │ РАССИНХРОН:          │
                                                │ • Блокчейн: ✓ есть   |
                                                │ • База: ✗ нет        |
                                                └──────────────────────┘

Последствия рассинхрона

Что происходит после такого сбоя? У нас есть несколько неприятных сценариев:

  1. Пользователь не может найти свой диплом через API запросы к базе данных

  2. Невозможность построить аналитику — данные в базе неполные

  3. Проблемы с аудитом — в блокчейне есть запись, в отчётах её нет

  4. Дублирование при повторной попытке — пользователь может попробовать выпустить диплом ещё раз

Теория на практике: паттерны для спасения данных

Проблема согласованности: выбираем стратегию

В распределённых системах есть два основных подхода к согласованности:

1. Строгая согласованность (Strong Consistency) - все узлы видят одинаковые данные в один момент времени. Это дорого и сложно, особенно когда один из узлов - публичный блокчейн.

2. Итоговая согласованность (Eventual Consistency) - данные могут временно различаться, но в конечном итоге придут к согласованному состоянию.

Мы выбрали итоговую согласованность. Почему? Откатить транзакцию в Solana после подтверждения - невозможно. Значит, нужно гарантировать, что PostgreSQL рано или поздно получит эти данные.

Паттерн Saga: длинные транзакции с компенсацией

Паттерн Saga разбивает длинную распределённую транзакцию на последовательность локальных транзакций. Каждый шаг может иметь компенсирующую транзакцию для отката.

Как это могло бы выглядеть в нашем случае:

// Псевдокод Saga для выпуска диплома
enum SagaStep {
    SaveToDatabase,      // Шаг 1
    WriteToBlockchain,   // Шаг 2
    UpdateDatabaseStatus // Шаг 3
}

async fn issue_diploma_saga(diploma: Diploma) -> Result<(), SagaError> {
    // Шаг 1: Сохраняем в БД со статусом "pending"
    let db_record = match save_to_database_with_status(&diploma, "pending").await {
        Ok(record) => record,
        Err(e) => {
            // Ничего откатывать не нужно, просто выходим
            return Err(SagaError::DatabaseFailed(e));
        }
    };
    
    // Шаг 2: Пишем в блокчейн
    let tx_id = match write_to_blockchain(&diploma).await {
        Ok(tx) => tx,
        Err(e) => {
            // Компенсирующая транзакция: помечаем запись как неудачную
            mark_database_record_failed(&db_record.id).await?;
            return Err(SagaError::BlockchainFailed(e));
        }
    };
    
    // Шаг 3: Обновляем статус в БД на "confirmed"
    match update_database_status(&db_record.id, "confirmed", &tx_id).await {
        Ok(_) => Ok(()),
        Err(e) => {
            // Здесь компенсация сложная: в блокчейне уже есть запись
            // Можно только пометить в БД для ручного вмешательства
            mark_for_manual_reconciliation(&db_record.id, &tx_id).await?;
            Err(SagaError::InconsistentState(e))
        }
    }
}

Проблема с Saga в блокчейне: Компенсирующие транзакции в Solana стоят денег (газ) и не отменяют предыдущие записи, а добавляют новые. Это делает паттерн дорогим и сложным.

Идемпотентность и повторные попытки

Идемпотентность - это свойство операции давать один и тот же результат при повторных вызовах. В нашем контексте это критически важно.

Вот как мы могли бы добавить механизм повторных попыток:

use tokio::time::{sleep, Duration};

async fn write_to_database_with_retry(
    db_client: &Postgrest,
    data: serde_json::Value,
    max_retries: u32,
) -> Result<(), AppError> {
    let mut retries = 0;
    let mut backoff = Duration::from_millis(100);
    
    loop {
        match db_client
            .from("credentials")
            .insert(data.to_string())
            .execute()
            .await 
        {
            Ok(response) if response.status().is_success() => {
                return Ok(());
            }
            Ok(response) if response.status() == 409 => {
                // Конфликт - запись уже существует (идемпотентность!)
                tracing::info!("Record already exists, considering it success");
                return Ok(());
            }
            Ok(_) | Err(_) if retries < max_retries => {
                retries += 1;
                tracing::warn!(
                    "Database write failed, retry {}/{} after {:?}",
                    retries, max_retries, backoff
                );
                sleep(backoff).await;
                backoff *= 2; // Экспоненциальная задержка
            }
            _ => {
                return Err(AppError::Database(
                    "Failed after maximum retries".to_string()
                ));
            }
        }
    }
}

Недостаток: Если база недоступна долго (например, плановое обслуживание), пользователь будет ждать. А блокчейн-транзакция уже выполнена!

Наше решение: паттерн Outbox и фоновый Reconciliation Job

После анализа различных подходов, мы остановились на комбинации двух паттернов:

Паттерн Transactional Outbox

Суть паттерна Outbox - вместо записи напрямую в две системы, мы делаем одну атомарную транзакцию в первичное хранилище, включая событие в таблицу outbox.

Вот как изменится наша архитектура:

// Новая структура для outbox
#[derive(Serialize, Deserialize)]
struct OutboxEvent {
    id: Uuid,
    event_type: String,
    payload: serde_json::Value,
    status: String, // "pending", "processing", "completed", "failed"
    created_at: DateTime<Utc>,
    processed_at: Option<DateTime<Utc>>,
    retry_count: u32,
    error_message: Option<String>,
}

// Изменённая функция issue_diploma
pub async fn issue_diploma_with_outbox(
    State(state): State<Arc<AppState>>,
    mut multipart: Multipart,
) -> Result<Json<IssueResponse>, AppError> {
    // ... парсинг и генерация хеша как раньше ...
    
    // ВАЖНО: Сначала пишем в БД в одной транзакции
    let mut transaction = state.db_client.begin_transaction().await?;
    
    // Сохраняем диплом со статусом "pending_blockchain"
    let credential_data = serde_json::json!({
        "hash": &diploma.hash,
        "issuer_id": &diploma.issuer_id,
        "recipient_id": &diploma.recipient_id,
        "status": "pending_blockchain",
        "issued_at": diploma.issued_at.to_rfc3339(),
    });
    
    transaction
        .from("credentials")
        .insert(credential_data.to_string())
        .execute()
        .await?;
    
    // Добавляем событие в outbox
    let outbox_event = serde_json::json!({
        "id": Uuid::new_v4(),
        "event_type": "WRITE_TO_BLOCKCHAIN",
        "payload": serde_json::to_value(&diploma)?,
        "status": "pending",
        "created_at": Utc::now(),
        "retry_count": 0,
    });
    
    transaction
        .from("outbox_events")
        .insert(outbox_event.to_string())
        .execute()
        .await?;
    
    // Коммитим транзакцию - либо всё, либо ничего!
    transaction.commit().await?;
    
    // Возвращаем ответ пользователю
    Ok(Json(IssueResponse {
        hash: diploma.hash,
        tx_id: "pending".to_string(), // Будет обновлён асинхронно
        signature: Some(signature),
        issued_at: diploma.issued_at,
    }))
}

Теперь нужен фоновый процесс для обработки событий из outbox:

// Фоновый воркер для обработки outbox
async fn outbox_processor(state: Arc<AppState>) {
    loop {
        // Получаем необработанные события
        let events = fetch_pending_outbox_events(&state.db_client).await;
        
        for event in events {
            match event.event_type.as_str() {
                "WRITE_TO_BLOCKCHAIN" => {
                    process_blockchain_write(event, &state).await;
                }
                _ => {
                    tracing::warn!("Unknown event type: {}", event.event_type);
                }
            }
        }
        
        // Спим перед следующей итерацией
        tokio::time::sleep(Duration::from_secs(5)).await;
    }
}

async fn process_blockchain_write(
    event: OutboxEvent, 
    state: &Arc<AppState>
) {
    let diploma: Diploma = serde_json::from_value(event.payload.clone())
        .expect("Failed to deserialize diploma");
    
    // Пытаемся записать в блокчейн
    match state.chain_client.write_hash(&diploma.hash, &diploma).await {
        Ok(chain_record) => {
            // Успех! Обновляем статусы
            let mut transaction = state.db_client.begin_transaction().await.unwrap();
            
            // Обновляем credential
            transaction
                .from("credentials")
                .update(serde_json::json!({
                    "status": "confirmed",
                    "solana_tx_id": chain_record.tx_id,
                }).to_string())
                .eq("hash", &diploma.hash)
                .execute()
                .await
                .unwrap();
            
            // Помечаем событие как обработанное
            transaction
                .from("outbox_events")
                .update(serde_json::json!({
                    "status": "completed",
                    "processed_at": Utc::now(),
                }).to_string())
                .eq("id", event.id.to_string())
                .execute()
                .await
                .unwrap();
            
            transaction.commit().await.unwrap();
        }
        Err(e) => {
            // Ошибка - увеличиваем счётчик попыток
            update_outbox_event_retry(&state.db_client, event.id, e.to_string()).await;
        }
    }
}

Reconciliation Job: сверочный процесс

Даже с Outbox паттерном что-то может пойти не так. Поэтому мы добавили фоновый процесс сверки:

// Периодическая сверка данных между Solana и PostgreSQL
async fn reconciliation_job(state: Arc<AppState>) {
    loop {
        tracing::info!("Starting reconciliation check...");
        
        // Получаем последние транзакции из Solana за последний час
        let cutoff_time = Utc::now() - Duration::from_secs(3600);
        let blockchain_records = fetch_recent_blockchain_transactions(
            &state.chain_client,
            cutoff_time
        ).await;
        
        for record in blockchain_records {
            // Проверяем, есть ли запись в PostgreSQL
            let db_result = state
                .db_client
                .from("credentials")
                .select("hash")
                .eq("hash", &record.hash)
                .single()
                .execute()
                .await;
            
            if db_result.is_err() || !db_result.unwrap().status().is_success() {
                // Запись отсутствует в БД - добавляем
                tracing::warn!(
                    "Found orphaned blockchain record: hash={}, tx_id={}", 
                    record.hash, 
                    record.tx_id
                );
                
                // Восстанавливаем запись из блокчейна
                let recovery_data = serde_json::json!({
                    "hash": record.hash,
                    "solana_tx_id": record.tx_id,
                    "status": "recovered_from_blockchain",
                    "recovered_at": Utc::now(),
                    // Остальные поля берём из метаданных транзакции
                });
                
                match state
                    .db_client
                    .from("credentials")
                    .insert(recovery_data.to_string())
                    .execute()
                    .await 
                {
                    Ok(_) => {
                        tracing::info!("Successfully recovered record: {}", record.hash);
                        
                        // Отправляем алерт команде
                        send_alert(
                            "Data inconsistency detected and fixed",
                            &format!("Recovered hash {} from blockchain", record.hash)
                        ).await;
                    }
                    Err(e) => {
                        tracing::error!("Failed to recover record: {}", e);
                    }
                }
            }
        }
        
        // Запускаем сверку каждые 5 минут
        tokio::time::sleep(Duration::from_secs(300)).await;
    }
}

Визуализация нового подхода:

┌──────────┐     ┌────────────┐      ┌─────────────┐
│  Клиент  │────▶│ Rust API   │────▶│ PostgreSQL  │
└──────────┘     └────────────┘      │ + Outbox    │
                                     └─────────────┘
                                            │
                                            ▼
                                     ┌─────────────┐
                                     │   УСПЕХ     │
                                     │ (Атомарно)  │
                                     └─────────────┘
                                            │
                         ┌──────────────────┼──────────────────┐
                         ▼                  ▼                  ▼
                ┌─────────────────┐ ┌──────────────┐ ┌──────────────┐
                │ Outbox Processor│ │Reconciliation│ │  Monitoring  │
                │   (Async)       │ │     Job      │ │   & Alerts   │
                └─────────────────┘ └──────────────┘ └──────────────┘
                         │                  │
                         ▼                  ▼
                   ┌──────────┐       ┌──────────┐
                   │  Solana  │◀─────│  Проверка │
                   └──────────┘       └──────────┘

Что можно улучшить: взгляд в будущее

Очередь с повторными попытками

Вместо простого outbox в БД, можно использовать полноценную очередь сообщений:

// Интеграция с Redis Streams для более надёжной доставки
use redis::AsyncCommands;

async fn publish_to_queue(
    redis_client: &redis::Client,
    diploma: &Diploma,
) -> Result<(), AppError> {
    let mut conn = redis_client.get_async_connection().await?;
    
    let event = serde_json::json!({
        "type": "WRITE_TO_BLOCKCHAIN",
        "payload": diploma,
        "timestamp": Utc::now().to_rfc3339(),
        "retry_count": 0,
    });
    
    // Добавляем в Redis Stream с автогенерацией ID
    conn.xadd(
        "diploma:outbox",
        "*",
        &[("event", serde_json::to_string(&event)?)],
    ).await?;
    
    Ok(())
}

// Консьюмер с группой для гарантированной доставки
async fn consume_from_queue(redis_client: &redis::Client, state: Arc<AppState>) {
    let mut conn = redis_client.get_async_connection().await.unwrap();
    
    // Создаём consumer группу
    let _: Result<(), _> = conn.xgroup_create_mkstream(
        "diploma:outbox",
        "blockchain_writers",
        "$",
    ).await;
    
    loop {
        // Читаем события из очереди
        let events: Vec<StreamReadReply> = conn.xreadgroup(
            &["diploma:outbox"],
            "blockchain_writers",
            "worker_1",
            &[">"],
            Some(1),
            None,
        ).await.unwrap();
        
        for event in events {
            // Обрабатываем и подтверждаем
            process_event(event, &state).await;
            conn.xack("diploma:outbox", "blockchain_writers", &[event.id]).await.unwrap();
        }
    }
}

Мониторинг и алертинг

Критически важно отслеживать состояние системы:

use prometheus::{register_counter_vec, register_histogram_vec, CounterVec, HistogramVec};

lazy_static! {
    static ref INCONSISTENCY_COUNTER: CounterVec = register_counter_vec!(
        "diploma_inconsistencies_total",
        "Total number of data inconsistencies detected",
        &["type"]
    ).unwrap();
    
    static ref RECONCILIATION_DURATION: HistogramVec = register_histogram_vec!(
        "reconciliation_duration_seconds",
        "Time taken to reconcile records",
        &["status"]
    ).unwrap();
}

// Используем метрики в коде
async fn monitor_inconsistency(inconsistency_type: &str) {
    INCONSISTENCY_COUNTER
        .with_label_values(&[inconsistency_type])
        .inc();
    
    // Алерт если слишком много несогласованностей
    let total = INCONSISTENCY_COUNTER
        .with_label_values(&[inconsistency_type])
        .get();
    
    if total > 10.0 {
        send_critical_alert(
            "High inconsistency rate detected",
            &format!("Type: {}, Count: {}", inconsistency_type, total)
        ).await;
    }
}

Event Sourcing для полной трассировки

Можно пойти ещё дальше и хранить все события как неизменяемый лог:

#[derive(Serialize, Deserialize)]
enum DiplomaEvent {
    Created {
        hash: String,
        issuer_id: String,
        recipient_id: String,
        timestamp: DateTime<Utc>,
    },
    BlockchainWriteRequested {
        hash: String,
        timestamp: DateTime<Utc>,
    },
    BlockchainWriteCompleted {
        hash: String,
        tx_id: String,
        timestamp: DateTime<Utc>,
    },
    BlockchainWriteFailed {
        hash: String,
        error: String,
        retry_count: u32,
        timestamp: DateTime<Utc>,
    },
    ReconciliationDetected {
        hash: String,
        source: String, // "blockchain" или "database"
        timestamp: DateTime<Utc>,
    },
}

// Это даёт нам полную историю каждого диплома
async fn append_event(
    db_client: &Postgrest,
    event: DiplomaEvent,
) -> Result<(), AppError> {
    let event_data = serde_json::json!({
        "event_type": event.variant_name(),
        "payload": serde_json::to_value(&event)?,
        "timestamp": Utc::now(),
    });
    
    db_client
        .from("diploma_events")
        .insert(event_data.to_string())
        .execute()
        .await?;
    
    Ok(())
}

Заключение: уроки, извлечённые из транзакций

Работая с паттерном двойной записи между Solana и PostgreSQL, мы извлекли несколько важных уроков:

  1. Никогда не полагайтесь на последовательные вызовы — если первый прошёл успешно, это не гарантирует успех второго. Особенно когда первый — это необратимая операция в блокчейне.

  2. Проектируйте с учётом сбоев — вопрос не в том, упадёт ли система, а в том, когда это произойдёт. Паттерн Outbox и фоновая сверка — это не избыточность, а необходимость.

  3. Итоговая согласованность — ваш друг, не пытайтесь достичь строгой согласованности между блокчейном и традиционной БД. Это дорого, сложно и часто невозможно.

  4. Мониторинг критичен — лучше получить алерт о рассинхроне через минуту, чем узнать о проблеме от пользователя через неделю.

  5. Идемпотентность спасает — проектируйте операции так, чтобы их можно было безопасно повторять. Это упрощает восстановление после сбоев.

Для коллег, работающих с Web3-бэкендами на Rust: блокчейн — это не серебряная пуля. Это мощный инструмент, но он требует тщательного проектирования всей системы. Двойная запись кажется простым решением, пока вы не столкнётесь с первым production‑сбоем в 3 часа ночи.

Помните: в распределённых системах всё, что может пойти не так, обязательно пойдёт не так. Проектируйте соответственно.


Полезные ссылки


Если у вас есть опыт решения подобных проблем или вопросы по реализации - давайте обсудим в комментариях. Особенно интересно услышать про альтернативные подходы к синхронизации блокчейна с традиционными БД.

Комментарии (0)