
Привет! С вами снова Илья Вязников, инженер по сопровождению СОФРОС. Не так давно, я писал, о том как автоматизировать повторную обработку сообщений из архива в DATAREON Platform.
Было несколько вопросов и уточнений, поэтому хочу дополнительно разобрать код C# для повторной обработки.
Для ЛЛ: Полный алгоритм с комментариями в конце поста
1. Настройка параметров обработки:
В начале задаём список типов сообщений, которые нужно повторно обрабатывать, а также лимит попыток восстановления.
var dataTypes = new List<string> { "Тип1", "Тип2" }; const int MAX_ATTEMPTS = 3; const string ATTEMPT_PROPERTY = "ArchiveRestoreAttempts";
Что здесь происходит:
dataTypes — список типов данных, для которых будет выполняться повторная обработка.
Это позволяет не восстанавливать весь архив целиком, а работать только с нужными сообщениями.MAX_ATTEMPTS — максимальное число попыток повторной отправки.
После достижения лимита сообщение больше не будет возвращаться в обработку.ATTEMPT_PROPERTY — имя свойства, в котором хранится счётчик попыток восстановления.
Такой подход защищает систему от бесконечных циклов обработки проблемных сообщений.
2. Логирование запуска и счётчики результатов
Перед началом обработки удобно записать информацию в лог и подготовить счётчики.
Logger.Info($"Запуск восстановления из архива (лимит {MAX_ATTEMPTS} попыток)"); int totalRestored = 0; int totalSkipped = 0;
Переменные используются так:
totalRestored — количество успешно возвращённых сообщений;
totalSkipped — количество сообщений, пропущенных из-за исчерпания лимита.
В конце выполнения эти значения попадут в итоговый лог.
3. Получение сообщений из архива
Теперь проходим по каждому типу данных и выбираем сообщения из архива.
foreach (var typeName in dataTypes) { var filter = new DataStorageFilter { TypeName = typeName, Count = 0, WithBody = true }; var archivedMessages = ArchiveFacade.Select(filter); }
Здесь:
создаётся фильтр DataStorageFilter;
TypeName ограничивает выборку конкретным типом сообщений;
Count = 0 означает получение всех сообщений по фильтру;
WithBody = true загружает тело сообщения вместе с метаданными.
После этого ArchiveFacade.Select(filter) возвращает коллекцию архивных сообщений по фильтру.
4. Проверка количества предыдущих попыток
Для каждого сообщения проверяем, сколько раз оно уже восстанавливалось ранее.
int attempts = 0; if (archMsg.OriginalMessage?.Properties != null) { var props = archMsg.OriginalMessage.Properties; if (props.Has(ATTEMPT_PROPERTY)) { attempts = props.GetValue<int>(ATTEMPT_PROPERTY); } }
Логика:
По умолчанию считаем, что попыток не было (0).
Проверяем наличие пользовательских свойств сообщения.
Если счётчик уже существует — читаем его.
Это позволяет хранить историю попыток прямо внутри сообщения.
5. Защита от бесконечных повторов
Если лимит достигнут — пропускаем сообщение.
if (attempts >= MAX_ATTEMPTS) { Logger.Info($"Пропущено: лимит попыток исчерпан"); totalSkipped++; сontinue; }
Что делает блок:
сравнивает текущее число попыток с лимитом;
увеличивает счётчик пропущенных сообщений;
continue переходит к следующему сообщению.
Без этого блока одно проблемное сообщение могло бы бесконечно возвращаться в обработку.
6. Подготовка новых свойств перед восстановлением
Если лимит не исчерпан, увеличиваем счётчик попыток.
var restoreProps = new PropertiesCollection(); restoreProps.AddValue(ATTEMPT_PROPERTY, attempts + 1);
Создаётся новый набор свойств, в который записывается обновлённое число попыток.
Например:
было 1
станет 2
7. Восстановление сообщения
После подготовки свойств выполняем возврат сообщения в обработку.
ArchiveFacade.Restore(archMsg.Data.Id, archMsg.TargetId, restoreProps); totalRestored++;
Метод Restore():
извлекает сообщение из архива;
возвращает его обратно в pipeline обработки;
сохраняет обновлённые свойства.
После успешного восстановления увеличивается счётчик totalRestored.
8. Итоговый лог выполнения
После завершения обработки полезно вывести итоговую статистику.
Logger.Info($"Обработка архива завершена. ИТОГО за запуск: восстановлено — {totalRestored}, пропущено (лимит исчерпан) — {totalSkipped}");
По этому сообщению можно быстро понять:
сколько сообщений удалось вернуть;
сколько было заблокировано лимитом.
Полный код алгоритма
Ниже приведён полный пример сервисного алгоритма, объединяющий все рассмотренные выше блоки.
// Восстановление из архива по списку типов с лимитом 3 попытки // Настройка списка типов данных для повторной обработки. var dataTypes = new List<string> { "Тип1", "Тип2" }; // Настройка максимального количества попыток повторной обработки и наименование свойства для их отслеживания. const int MAX_ATTEMPTS = 3; const string ATTEMPT_PROPERTY = "ArchiveRestoreAttempts"; // Обработка Logger.Info($"Запуск восстановления из архива (лимит {MAX_ATTEMPTS} попыток)"); int totalRestored = 0; int totalSkipped = 0; foreach (var typeName in dataTypes) { var filter = new DataStorageFilter { TypeName = typeName, Count = 0, WithBody = true }; var archivedMessages = ArchiveFacade.Select(filter); Logger.Info($"Тип '{typeName}': найдено {archivedMessages.Count} сообщений в архиве"); foreach (var archMsg in archivedMessages) { int attempts = 0; if (archMsg.OriginalMessage?.Properties != null) { var props = archMsg.OriginalMessage.Properties; if (props.Has(ATTEMPT_PROPERTY)) { attempts = props.GetValue<int>(ATTEMPT_PROPERTY); } } if (attempts >= MAX_ATTEMPTS) { Logger.Info($"Пропущено (лимит исчерпан): ID сообщения ={archMsg.Data.Id}, попыток={attempts}"); totalSkipped++; continue; } // Подготавливаем свойства для восстановления (только счётчик) var restoreProps = new PropertiesCollection(); restoreProps.AddValue(ATTEMPT_PROPERTY, attempts + 1); // Восстанавливаем сообщение и передаём обновлённое свойство ArchiveFacade.Restore(archMsg.Data.Id, archMsg.TargetId, restoreProps); totalRestored++; Logger.Info($"Восстановлено: ID={archMsg.Data.Id}, попытка {attempts + 1}/{MAX_ATTEMPTS}"); } } Logger.Info($"Обработка архива завершена. ИТОГО за запуск: восстановлено — {totalRestored}, пропущено (лимит исчерпан) — {totalSkipped}");
Если есть вопросы не стесняйтесь писать комментарии, а если интересно узнать больше о нашей компании, новости, вебинары, специальные предложения подписывайтесь на наш канал в Telegram