Привет! С вами снова Илья Вязников, инженер по сопровождению СОФРОС. Не так давно, я писал, о том как автоматизировать повторную обработку сообщений из архива в DATAREON Platform.

Было несколько вопросов и уточнений, поэтому хочу дополнительно разобрать код C# для повторной обработки.

Для ЛЛ: Полный алгоритм с комментариями в конце поста

1. Настройка параметров обработки:

В начале задаём список типов сообщений, которые нужно повторно обрабатывать, а также лимит попыток восстановления.

var dataTypes = new List<string>
{
    "Тип1",
    "Тип2"
};
const int MAX_ATTEMPTS = 3;
const string ATTEMPT_PROPERTY = "ArchiveRestoreAttempts";

Что здесь происходит:

  • dataTypes — список типов данных, для которых будет выполняться повторная обработка.
    Это позволяет не восстанавливать весь архив целиком, а работать только с нужными сообщениями.

  • MAX_ATTEMPTS — максимальное число попыток повторной отправки.
    После достижения лимита сообщение больше не будет возвращаться в обработку.

  • ATTEMPT_PROPERTY — имя свойства, в котором хранится счётчик попыток восстановления.

Такой подход защищает систему от бесконечных циклов обработки проблемных сообщений.

2. Логирование запуска и счётчики результатов

Перед началом обработки удобно записать информацию в лог и подготовить счётчики.

Logger.Info($"Запуск восстановления из архива (лимит {MAX_ATTEMPTS} попыток)");

int totalRestored = 0;
int totalSkipped = 0;

Переменные используются так:

  • totalRestored — количество успешно возвращённых сообщений;

  • totalSkipped — количество сообщений, пропущенных из-за исчерпания лимита.

В конце выполнения эти значения попадут в итоговый лог.

3. Получение сообщений из архива

Теперь проходим по каждому типу данных и выбираем сообщения из архива.

foreach (var typeName in dataTypes)
{
   var filter = new DataStorageFilter
   {
       TypeName = typeName,
       Count = 0,
       WithBody = true
   };

   var archivedMessages = ArchiveFacade.Select(filter);
}

Здесь:

  • создаётся фильтр DataStorageFilter;

  • TypeName ограничивает выборку конкретным типом сообщений;

  • Count = 0 означает получение всех сообщений по фильтру;

  • WithBody = true загружает тело сообщения вместе с метаданными.

После этого ArchiveFacade.Select(filter) возвращает коллекцию архивных сообщений по фильтру.

4. Проверка количества предыдущих попыток

Для каждого сообщения проверяем, сколько раз оно уже восстанавливалось ранее.

int attempts = 0;

if (archMsg.OriginalMessage?.Properties != null)
{
   var props = archMsg.OriginalMessage.Properties;

   if (props.Has(ATTEMPT_PROPERTY))
   {
       attempts = props.GetValue<int>(ATTEMPT_PROPERTY);
   }
}

Логика:

  1. По умолчанию считаем, что попыток не было (0).

  2. Проверяем наличие пользовательских свойств сообщения.

  3. Если счётчик уже существует — читаем его.

Это позволяет хранить историю попыток прямо внутри сообщения.

5. Защита от бесконечных повторов

Если лимит достигнут — пропускаем сообщение.

if (attempts >= MAX_ATTEMPTS)
{
   Logger.Info($"Пропущено: лимит попыток исчерпан");

   totalSkipped++;
   сontinue;
}

Что делает блок:

  • сравнивает текущее число попыток с лимитом;

  • увеличивает счётчик пропущенных сообщений;

  • continue переходит к следующему сообщению.

Без этого блока одно проблемное сообщение могло бы бесконечно возвращаться в обработку.

6. Подготовка новых свойств перед восстановлением

Если лимит не исчерпан, увеличиваем счётчик попыток.

var restoreProps = new PropertiesCollection();
restoreProps.AddValue(ATTEMPT_PROPERTY, attempts + 1);

Создаётся новый набор свойств, в который записывается обновлённое число попыток.

Например:

  • было 1

  • станет 2

7. Восстановление сообщения

После подготовки свойств выполняем возврат сообщения в обработку.

ArchiveFacade.Restore(archMsg.Data.Id, archMsg.TargetId, restoreProps);
totalRestored++;

Метод Restore():

  • извлекает сообщение из архива;

  • возвращает его обратно в pipeline обработки;

  • сохраняет обновлённые свойства.

После успешного восстановления увеличивается счётчик totalRestored.

8. Итоговый лог выполнения

После завершения обработки полезно вывести итоговую статистику.

Logger.Info($"Обработка архива завершена. ИТОГО за запуск: восстановлено — {totalRestored}, пропущено (лимит исчерпан) — {totalSkipped}");

По этому сообщению можно быстро понять:

  • сколько сообщений удалось вернуть;

  • сколько было заблокировано лимитом.

Полный код алгоритма

Ниже приведён полный пример сервисного алгоритма, объединяющий все рассмотренные выше блоки.

// Восстановление из архива по списку типов с лимитом 3 попытки

// Настройка списка типов данных для повторной обработки.
var dataTypes = new List<string>
{
    "Тип1",
    "Тип2"
};

// Настройка максимального количества попыток повторной обработки и наименование свойства для их отслеживания.
const int MAX_ATTEMPTS = 3;
const string ATTEMPT_PROPERTY = "ArchiveRestoreAttempts";

// Обработка
Logger.Info($"Запуск восстановления из архива (лимит {MAX_ATTEMPTS} попыток)");

int totalRestored = 0;
int totalSkipped = 0;

foreach (var typeName in dataTypes)
{
    var filter = new DataStorageFilter
    {
        TypeName = typeName,
        Count = 0,
        WithBody = true
    };

    var archivedMessages = ArchiveFacade.Select(filter);

    Logger.Info($"Тип '{typeName}': найдено {archivedMessages.Count} сообщений в архиве");

    foreach (var archMsg in archivedMessages)
    {
        int attempts = 0;

        if (archMsg.OriginalMessage?.Properties != null)
        {
            var props = archMsg.OriginalMessage.Properties;
            if (props.Has(ATTEMPT_PROPERTY))
            {
                attempts = props.GetValue<int>(ATTEMPT_PROPERTY);
            }
        }

        if (attempts >= MAX_ATTEMPTS)
        {
            Logger.Info($"Пропущено (лимит исчерпан): ID сообщения ={archMsg.Data.Id}, попыток={attempts}");
            totalSkipped++;
            continue;
        }

        // Подготавливаем свойства для восстановления (только счётчик)
        var restoreProps = new PropertiesCollection();
        restoreProps.AddValue(ATTEMPT_PROPERTY, attempts + 1);

        // Восстанавливаем сообщение и передаём обновлённое свойство
        ArchiveFacade.Restore(archMsg.Data.Id, archMsg.TargetId, restoreProps);

        totalRestored++;
        Logger.Info($"Восстановлено: ID={archMsg.Data.Id}, попытка {attempts + 1}/{MAX_ATTEMPTS}");
    }
}

Logger.Info($"Обработка архива завершена. ИТОГО за запуск: восстановлено — {totalRestored}, пропущено (лимит исчерпан) — {totalSkipped}");

Если есть вопросы не стесняйтесь писать комментарии, а если интересно узнать больше о нашей компании, новости, вебинары, специальные предложения подписывайтесь на наш канал в Telegram

Комментарии (0)