Есть у Майкрософта такая не очень известная вещь, как Service Bus for Windows Server. И так случилось, что в нескольких проектах подряд довелось с ней поработать. В итоге получилось собрать небольшой набор подводных камней, встречавшихся в проектах чаще других. Чем и делюсь.

Краткое описание о том, что такое Service Bus for Windows Server вообще
Это реализация Service Bus от Майкрософт, которая очень близка к Windows Azure Service Bus on Windows, но не требующая самого Azure. То есть этакая довольно удобная и продвинутая шина. Умеет предоставлять как стандартные очереди (queue), так и их продвинутый вариант топики (topic), который умеет отдавать одно и то же сообщение на несколько разных своих подписок (subscription). Так как сталкиваться на практике мне удалось только с топиками/подписками, то дальше речь пойдет только о них.
image
То есть потребители публикуют свои сообщения в топик. Топик передает их во все свои подписки. Подписки, в свою очередь, проверяют сообщения на предмет того, нужны ли они им или нет, сопоставляя их со своим списком правил (фильтр). Все пригодные сообщения далее передаются тем клиентам, кто подписан на эти самые подписки. Причем если на одну и ту же подписку подписано несколько клиентов, то сообщение получит только один из них. Все довольно стандартно.

Первые шаги и первые же грабли


С чего начинается использование этой штуки? С попытки отправить и получить сообщение, конечно.
Внимание, тут и далее приведен не продакшн-код, приложенный код лишь призван служить функционирующей иллюстрацией текста
var messageFactory = MessagingFactory.CreateFromConnectionString(connectionString);
var publisher = messageFactory.CreateTopicClient(topicName);
var listener = messageFactory.CreateSubscriptionClient(topicName, subscriptionName);

listener.OnMessage(message => Console.WriteLine($"Message received: {message.GetBody<string>()}"));

var brokeredMessage = new BrokeredMessage("some test message");
publisher.Send(brokeredMessage);

Все просто, сообщение в консоли появляется. Попробуем опубликовать много сообщений и очень грубо оценить, сколько займет времени из отправка и получение:

var stopwatch = new Stopwatch();
int messagesToSent = 200;
int messagesProccessed = 0;

listener.OnMessage(message => 
{
    Console.WriteLine($"Message received: {message.GetBody<string>()}");
    messagesProccessed++;

    if (messagesProccessed == messagesToSent)
    {
        stopwatch.Stop();
        Console.WriteLine($"Time passed: {stopwatch.Elapsed}");
    }
});

stopwatch.Start();
for (var i = 0; i < messagesToSent; i ++)
{
    var brokeredMessage = new BrokeredMessage($"Message №{i}");
    publisher.Send(brokeredMessage);
}

Если запустить этот код, то получается, что на моем стареньком компьютере-ветеране процесс идет около шести секунд.

А вот следующий шаг часто приводит на первые грабли. Дело в том, что подписчик может получать сообщения в одном из двух режимов:

  • PeekLock — сообщение получается, но не удаляется из подписки, а лишь получает лок. Чтобы оно удалилось, клиент должен явно подтвердить его успешную обработку вызвав Commit(). Иначе же по истечению лока или по вызову Abandon() от самого клиента будет произведена попытка доставить это сообщение снова.

  • ReceiveAndDelete — сообщение получается и сразу удаляется из подписки. Если обработка пошла не так, повторно это сообщение уже не получить. Зато работает чуть быстрее PeekLock, так как не вешает локи.

По умолчанию messageFactory.CreateSubscriptionClient создает PeekLock вариант. Но ввиду неочевидности я практически не видел, чтобы клиент создавали без явного указания режима работы. И, если верить документации, при указанном PeekLock надо вызывать .Complete() для каждого сообщения. Попробуем это сделать:

listener.OnMessage(message =>
{
    Console.WriteLine($"Message received: {message.GetBody<string>()}");
    messagesProccessed++;

    if (messagesProccessed == messagesToSent)
    {
        stopwatch.Stop();
        Console.WriteLine($"Time passed: {stopwatch.Elapsed}");
    }

    message.Complete(); // Это все, что поменялось от прошлого примера с замером
});

И тут случается неожиданное. Несмотря на то, что никаких эксепшнов не кидается, строчки с «Message №X» бегут, происходит все ОЧЕНЬ медленно. Эти 200 сообщений вместо шести секунд потребовали целых четыре минуты и девять секунд! Это не оправдать старым железом. А ведь данную проблему я разок нашел в коде живого проекта, просто за малым числом сообщений просадка производительности не бросалась в глаза.

Почему так происходит? Ведь если что-то было бы не так, можно было бы ожидать эксепшна? На самом деле, эксепшн есть. Просто по какой-то не совсем понятной причине, Майкрософт сделали крайне неочевидный способ получения информации об этих самых исключениях.

Метод подписки на сообщения OnMessage принимает необязательный параметр OnMessageOptions, который позволяет подписаться на событие ExceptionReceived. То есть те самые «скрытые исключения».

var onMessageOptions = new OnMessageOptions();
onMessageOptions.ExceptionReceived += (sender, args) => 
    Console.WriteLine($"Exception received: {args.Exception}");

listener.OnMessage(message => 
{
    Console.WriteLine($"Message received: {message.GetBody<string>()}");
    messagesProccessed++;

    if (messagesProccessed == messagesToSent)
    {
        stopwatch.Stop();
        Console.WriteLine($"Time passed: {stopwatch.Elapsed}");
    }

    message.Complete();
}, onMessageOptions); // не забываем передавать onMessageOptions

Запустив такой код, мы увидим, что на каждое сообщение кидается эксепшн Microsoft.ServiceBus.Messaging.MessageLockLostException:
Предоставленная блокировка недействительна. У нее истек срок действия, либо сообщение уже удалено из очереди..TrackingId:54630ae4-6e4f-4979-8fc8-b66e5314079c_GAPC_BAPC,TimeStamp:24.08.2016 21:20:08

Почему так происходит? Потому что у onMessageOptions есть еще один параметр: AutoCommit. И он по умолчанию выставляется в true. Таким образом, для корректной работы в случае, если вы хотите самостоятельно управлять циклом жизни сообщения, это поле требуется выставлять в false. Попробуем так сделать:

var stopwatch = new Stopwatch();
int messagesToSent = 200;
int messagesProccessed = 0;

var onMessageOptions = new OnMessageOptions
{
    AutoComplete = false // неочевидный параметр
};
onMessageOptions.ExceptionReceived += (sender, args) => 
    Console.WriteLine($"Exception received: {args.Exception}");

listener.OnMessage(message => 
{
    Console.WriteLine($"Message received: {message.GetBody<string>()}");
    messagesProccessed++;

    if (messagesProccessed == messagesToSent)
    {
        stopwatch.Stop();
        Console.WriteLine($"Time passed: {stopwatch.Elapsed}");
    }

    message.Complete();
}, onMessageOptions);

stopwatch.Start();
for (var i = 0; i < messagesToSent; i ++)
{
    var brokeredMessage = new BrokeredMessage($"Message №{i}");
    publisher.Send(brokeredMessage);
}

И вуаля: нет эксепшнов, обработка сообщений занимает всего две с половиной секунды. Похоже на нормальный режим работы.

Резюмируя:

  • Практически всегда надо подписываться на onMessageOptions.ExceptionReceived, иначе можно не заметить ряд проблем в работе кода
  • Помнить, что по умолчанию сервисбас пытается коммитить сообщения за вас. Чаще всего это поведение стоит отключать

Абстрации и грабли два


Второй, не менее часто встречающийся момент, тоже замеченный в продакшн коде, это неправильное создание обертки для подписчика. Создание класса, который бы скрывал внутри себя работу с сервисбасом, дело вообще хорошее. Но есть нюансы. Вот иллюстрация к тому, как не надо делать, но подобный код не раз был замечен в реальности.

Создается вот такого вида класс:

class Listener : IListener
{
    private readonly MessagingFactory _messageFactory;
    private readonly SubscriptionClient _client;

    public event Func<string, Task> OnReceivedAsync;

    public Listener(string connectionString, string topicName, string subscriptionName, ReceiveMode receiveMode)
    {
        _messageFactory = MessagingFactory.CreateFromConnectionString(connectionString);
        _client = _messageFactory.CreateSubscriptionClient(topicName, subscriptionName, receiveMode);

        var onMessageOptions = new OnMessageOptions
        {
            AutoComplete = false
        };

        onMessageOptions.ExceptionReceived += (sender, args) =>
            Console.WriteLine($"Exception received: {args.Exception}");

        _client.OnMessageAsync(bm => OnReceivedAsync?.Invoke(bm.GetBody<string>()), onMessageOptions);
    }
}

Который далее используется примерно так:

var messageFactory = MessagingFactory.CreateFromConnectionString(connectionString);
var publisher = messageFactory.CreateTopicClient(topicName);
            
int messagesToSent = 20;
for (var i = 0; i < messagesToSent; i++)
{
    var brokeredMessage = new BrokeredMessage($"Message №{i}");
    publisher.Send(brokeredMessage);
}

var listener = new Listener(connectionString, topicName, subscriptionName, ReceiveMode.ReceiveAndDelete);
listener.OnReceivedAsync += x =>
{
    Console.WriteLine($"Message received: {x}");
    return Task.FromResult(true);
};

Если запустить этот код, то все вроде бы работает, но вместо первого сообщения будет ошибка "NullReferenceException: Object reference not set to an instance of an object."

Причем ошибка будет выловлена только в случае подписки на onMessageOptions.ExceptionReceived, если этого не сделать (а не делают это почему-то очень часто), то о наличии проблемы можно узнать только по косвенным и иногда очень трудноуловимым багам в поведении кода.

Что тут не так? Ну, ответ довольно очевиден, и если бы не встречался так часто, я бы о нем, наверное, и не упоминал. Когда в конструкторе абстракции Listener вызывается _client.OnMessageAsync — подписчик уже начинает принимать сообщения. Поэтому ряд из них (в зависимости от того, как далеко разнесены конструктор и подписка на listener.OnReceivedAsync будет пропущен и попадет на пустой OnReceivedAsync?.Invoke, логично возвращая null. Отсюда и NullReferenceException.

Что с этим делать? Самое простое — разнести создание экземпляра и подписку, например так:

class Listener : IListener
{
    private readonly MessagingFactory _messageFactory;
    private readonly SubscriptionClient _client;

    public Listener(string connectionString, string topicName, string subscriptionName, ReceiveMode receiveMode)
    {
        _messageFactory = MessagingFactory.CreateFromConnectionString(connectionString);
        _client = _messageFactory.CreateSubscriptionClient(topicName, subscriptionName, receiveMode);
    }

    public void Subscribe(Func<string, Task> handleMessage)
    {
        var onMessageOptions = new OnMessageOptions
        {
            AutoComplete = false
        };

        onMessageOptions.ExceptionReceived += (sender, args) =>
            Console.WriteLine($"Exception received: {args.Exception}");

        _client.OnMessageAsync(bm => handleMessage(bm.GetBody<string>()), onMessageOptions);
    }
}

И подписываться примерно так:

var listener = new Listener(connectionString, topicName, subscriptionName, ReceiveMode.ReceiveAndDelete);
listener.Subscribe(x =>
{
    Console.WriteLine($"Message received: {x}");
    return Task.FromResult(true);
});


Теперь потери сообщений при создании класса не происходит.

Резюмируя:

  • Подписка на onMessageOptions.ExceptionReceived актуальней актуального
  • Помнить, что получение сообщений начинается сразу после вызова client.OnMessageAsync и учитывать это в проектировании абстраций

Грабли номер три


Есть у подписчика замечательный метод Close(). Но его поведение не совсем предсказуемо умозрительно. Попробуем выполнить вот такой вот код, который после отправки первой половины сообщений вызывает этот самый Close() и получает вторую половину собщений уже через другой экземпляр подписчика.

var messageFactory = MessagingFactory.CreateFromConnectionString(connectionString);
var publisher = messageFactory.CreateTopicClient(topicName);
var listener1 = messageFactory.CreateSubscriptionClient(topicName, subscriptionName, ReceiveMode.ReceiveAndDelete);
var listener2 = messageFactory.CreateSubscriptionClient(topicName, subscriptionName, ReceiveMode.ReceiveAndDelete);
            
int messagesToSent = 10;
int messagesProccessed = 0;

var onMessageOptions = new OnMessageOptions
{
    AutoComplete = false
};
onMessageOptions.ExceptionReceived += (sender, args) =>
    Console.WriteLine($"Exception received: {args.Exception}");

listener1.OnMessage(message =>
{
    Console.WriteLine($"listener1: message received: {message.GetBody<string>()}, listener1 is closed: {listener1.IsClosed}");
    messagesProccessed++;
}, onMessageOptions);
            
for (var i = 0; i < messagesToSent; i++)
{
    var brokeredMessage = new BrokeredMessage($"Message №{i}");
    publisher.Send(brokeredMessage);
    Thread.Sleep(50);
                
    if (i == 4)
    {
        Console.WriteLine("Closing listener1");
        listener1.Close();
    }
}
            
listener2.OnMessage(message =>
{
    Console.WriteLine($"listener2: message received : {message.GetBody<string>()}, listener2 is closed: {listener2.IsClosed}");
    messagesProccessed++;
}, onMessageOptions);

Но результат в консоли будет вот такой:
listener1: message received: Message №0, listener1 is closed: False
listener1: message received: Message №1, listener1 is closed: False
listener1: message received: Message №2, listener1 is closed: False
listener1: message received: Message №3, listener1 is closed: False
listener1: message received: Message №4, listener1 is closed: False
Closing listener1
listener1: message received: Message №5, listener1 is closed: True
listener2: message received: Message №6, listener2 is closed: False
listener2: message received: Message №7, listener2 is closed: False
listener2: message received: Message №8, listener2 is closed: False
listener2: message received: Message №9, listener2 is closed: False

Неочевидно, правда? Если сделать то же самое, но для режима работы PeekLock вместо ReceiveAndDelete, то результат будет схож, разве что .Complete() выкинет эксепшн System.OperationCanceledException: This messaging entity has already been closed, aborted, or disposed. Причем если ловить ошибки в обработчике сообщений, чтобы делать Abandon() руками, то и сам Abandon() выкинет ошибку. Причем оба эти эксепшна обычные, не прячущиеся внутри OnMessageOptions.

А само пропущенное сообщение, в отличие от ReceiveAndDelete, все таки будет обработано позже, когда произойдет повторная отправка.

Код с Complete и вывод в консоль
var messageFactory = MessagingFactory.CreateFromConnectionString(connectionString);
var messageFactory1 = MessagingFactory.CreateFromConnectionString(connectionString);
var messageFactory2 = MessagingFactory.CreateFromConnectionString(connectionString);
var publisher = messageFactory.CreateTopicClient(topicName);
var listener1 = messageFactory1.CreateSubscriptionClient(topicName, subscriptionName, ReceiveMode.PeekLock);
var listener2 = messageFactory2.CreateSubscriptionClient(topicName, subscriptionName, ReceiveMode.PeekLock);
            
int messagesToSent = 10;
int messagesProccessed = 0;

var onMessageOptions = new OnMessageOptions
{
    AutoComplete = false
};
onMessageOptions.ExceptionReceived += (sender, args) =>
    Console.WriteLine($"Exception received: {args.Exception}");

listener1.OnMessage(message =>
{
    try
    {
        Console.WriteLine($"listener1: message received: {message.GetBody<string>()}, listener1 is closed: {listener1.IsClosed}");
        messagesProccessed++;
        message.Complete();
    }
    catch (Exception ex1)
    {
        Console.WriteLine($"listener1 Complete() exception: {ex1.Message}");
        try
        {
            message.Abandon();
        }
        catch (Exception ex2)
        {
            Console.WriteLine($"listener1 Abandon() exception: {ex2.Message}");
        }
    }
}, onMessageOptions);
            
for (var i = 0; i < messagesToSent; i++)
{
    var brokeredMessage = new BrokeredMessage($"Message №{i}");
    publisher.Send(brokeredMessage);
    Thread.Sleep(50);
                
    if (i == 4)
    {
        Console.WriteLine("Closing listener1");
        listener1.Close();
    }
}
            
listener2.OnMessage(message =>
{
    Console.WriteLine($"listener2: message received : {message.GetBody<string>()}, listener2 is closed: {listener2.IsClosed}");
    messagesProccessed++;
    message.Complete();
}, onMessageOptions);


listener1: message received: Message №0, listener1 is closed: False
listener1: message received: Message №1, listener1 is closed: False
listener1: message received: Message №2, listener1 is closed: False
listener1: message received: Message №3, listener1 is closed: False
listener1: message received: Message №4, listener1 is closed: False
Closing listener1
listener1: message received: Message №5, listener1 is closed: True
listener1 Complete() exception: This messaging entity has already been closed, aborted, or disposed.
listener1 Abandon() exception: This messaging entity has already been closed, aborted, or disposed.

listener2: message received: Message №6, listener2 is closed: False
listener2: message received: Message №7, listener2 is closed: False
listener2: message received: Message №8, listener2 is closed: False
listener2: message received: Message №9, listener2 is closed: False
listener2: message received: Message №5, listener2 is closed: False


Что с этим делать и как с этим жить? Ну, про это надо просто помнить и учитывать это в коде. Вариантов бороться с подобным поведением всезнающий stackoverflow предлагает достаточно. Например, там, где это уместно, можно вызывать messageFactory.Close() совместно с закрытием подписчика. Либо проверять в обработчике, не закрыт ли сейчас подписчик чем-то типа if(listener.IsClosed) { /***/ }, и т.д.

Резюмируя:

  • Не все экспешны от сервисбаса прилетают только в onMessageOptions.ExceptionReceived
  • При завершение работы с подписчиком надо учитывать особенности работы метода Close()

Заключение


В целом, Service Bus for Windows Server довольно неплохая штука и вполне справляется со своими задачами, но некоторые мелочи на старте могут несколько попить крови. Надеюсь, изложенные в статье пункты окажутся кому-нибудь полезными и уберегут от набивания на них собственных шишек.
Поделиться с друзьями
-->

Комментарии (1)


  1. toxicdream
    13.09.2016 06:57
    +1

    Спасибо. Помогло.