В предыдущих статьях мы несколько раз упоминали о такой проблеме, как перегрузка агентов. Что это такое? Чем это грозит? Как с этим бороться? Обо всем этом мы и поговорим сегодня.


Проблема перегрузки агентов возникает, когда какому-то агенту отсылается больше сообщений, чем он успевает обрабатывать. В результате очереди сообщений постоянно увеличиваются в размерах. Растущие очереди расходуют память. Расход памяти ведет к замедлению работы приложения. Из-за замедления проблемный агент начинает обрабатывать сообщения дольше, что увеличивает скорость роста очередей сообщений. Что способствует более быстрому расходу памяти. Что ведет к еще большему замедлению приложения. Что ведет к еще более медленной работе проблемного агента… Как итог, приложение медленно и печально деградирует до полной неработоспособности.


Проблема усугубляется еще и тем, что взаимодействие посредством асинхронных сообщений и использование подхода fire-and-forget прямо таки провоцирует возникновение перегрузок (fire-and-forget – это когда агент A получает входящее сообщение M1, выполняет его обработку и отсылает исходящее сообщение M2 агенту B не заботясь о последствиях).


Действительно, когда send отвечает только за постановку сообщения в очередь агента-приемника, то отправитель не блокируется на send-е, даже если получатель не справляется со своим потоком сообщений. Это и создает предпосылки для того, чтобы отправители сообщений не заботились о том, а способен ли получатель справится с нагрузкой.


К сожалению, нет универсального рецепта по борьбе с перегрузками, пригодного для всех ситуаций. Где-то можно просто терять новые сообщения, адресованные перегруженному агенту. Где-то это недопустимо, но зато можно выбрасывать самые старые сообщения, которые стоят в очереди слишком долго и уже перестали быть актуальными. Где-то вообще нельзя терять сообщения, но зато можно при перегрузке переходить к другому способу обработки сообщений. Например, если агент отвечает за изменение размера фотографий для отображения на Web-страничке, то при перегрузке агент может переключиться на более грубый алгоритм ресайзинга: качество фотографий упадет, зато очередь фотографий для обработки будет рассасываться быстрее.


Поэтому хороший инструмент для защиты от перегрузок должен быть заточен под конкретную задачу. Из-за этого долгое время в SObjectizer никаких готовых механизмов, доступных пользователю «из коробки», не было. Пользователь решал свои проблемы сам.


Одним из самых практичных способов защиты от перегрузки, по нашему опыту, оказался подход с использованием двух агентов: collector и performer. Агент-collector накапливает входящие сообщения, подлежащие обработке, в каком-то подходящем контейнере. Агент-performer обрабатывает сообщения, которые собрал агент-collector.



Фокус в том, что агенты collector и performer привязываются к разным рабочим контекстам. Поэтому, если агент-performer начинает притормаживать, то это не сказывается на работе collector-а. Как правило, агент-collector обрабатывает свои события очень быстро: обработка обычно заключается в сохранении нового сообщения в какую-то отдельную очередь. Если эта отдельная очередь переполнена, то агент-collector может сразу вернуть отрицательный ответ на новое сообщение. Или же выбросить какое-то старое сообщение из очереди. Так же агент-collector может периодически проверять время пребывания сообщений в этой отдельной очереди: если какое-то сообщение ждало обработки слишком долго и перестало быть актуальным, то агент-collector выбрасывает его из очереди (возможно, отсылая при этом какой-то отрицательный ответ инициатору данного сообщения).


Агент-performer же, как правило, обрабатывает свои входящие сообщения намного дольше, чем агент-collector, что логично, т.к. ответственность за собственно прикладную работу лежит на performer-е. Агент-performer сам запрашивает у агента-collector-а следующую порцию сообщений для обработки.


В самом простом случае агент-performer завершает обработку текущего сообщения и отсылает агенту-collector-у запрос на выдачу очередного подлежащего обработки сообщения. Получив этот запрос агент-collector выдает агенту-performer-у первое сообщение из своей очереди.


В состав SObjectizer-а включено несколько примеров, демонстрирующих различные вариации на тему collector-ов и performer-ов. С описаниями этих примеров можно ознакомиться в Wiki проекта (№1, №2, №3, №4).


По мере накопления опыта использования SObjectizer и по результатам многочисленных обсуждений мы пришли к тому, что не смотря на возможность сделать своими руками схемы защиты от перегрузок разной степени сложности и эффективности, в самом SObjectizer так же хотелось бы иметь какой-то базовый механизм. Пусть не продвинутый, но зато доступный прямо «из коробки», что особенно востребовано при быстром прототипировании.


Таким механизмом стали лимиты для сообщений (т.н. message limits). Агент может указать, сколько экземпляров сообщений конкретного типа он разрешает сохранить в очереди сообщений. И что следует делать с «лишними» экземплярами. Превышение заданного лимита – это перегрузка и SObjectizer реагирует на нее одним из следующих способов:


  • выбрасывает новое сообщение как будто его не было;
  • пересылает сообщение на другой mbox. Эта реакция выполняется в предположении, что другой получатель сможет обработать лишнее сообщение;
  • трансформировать лишнее сообщение в сообщение другого типа и отослать новое сообщение на некоторый mbox. Эта реакция может позволить, например, сразу отослать отрицательный ответ отправителю сообщения;
  • прерывает работу приложения посредством вызова std::abort(). Этот вариант подходит для случаев, когда нагрузка превышает все мыслимые пределы и шансов на восстановление работоспособности практически нет, поэтому лучше прерваться и рестартовать, чем продолжать накапливать сообщение в очередях.

Давайте посмотрим, как лимиты сообщений могут помогать справляться с перегрузками (исходный текст примера можно найти в этом репозитории).


Предположим, что нам нужно обрабатывать запросы на ресайз картинок до заданного размера. Если таких запросов скапливается больше 10, то нам нужно обрабатывать новые запросы с помощью другого алгоритма, более быстрого, но менее точного. Если же и это не помогает, то лишние запросы мы будем специальным образом логировать и отсылать инициатору пустую картинку результирующего размера.


Создадим трех агентов. Первый агент будет выполнять нормальную обработку изображений. Лишние изображения, которые он не в состоянии быстро отресайзить, посредством message_limit будут отсылаться второму агенту.


// Тип агента, который выполняет нормальную обработку картинок.
class accurate_resizer final : public agent_t {
public :
	accurate_resizer( context_t ctx, mbox_t overload_mbox )
		// Лимиты настраиваются при создании агента и не могут меняться
		// впоследствии. Поэтому конструируем лимиты сообщений как уточнение
		// контекста, в котором агенту предстоит работать дальше.
		:	agent_t( ctx
				// Ограничиваем количество ждущих соообщений resize_request
				// с автоматическим редиректом лишних экземпляров другому
				// обработчику.
				+ limit_then_redirect< resize_request >(
					// Разрешаем хранить всего 10 запросов...
					10,
					// Остальное пойдет на этот mbox.
					[overload_mbox]() { return overload_mbox; } ) )
	{...}
...
};

Второй агент выполняет более быструю, но более грубую обработку изображений. Поэтому у него лимит на количество сообщений в очереди будет повыше. Лишние сообщения редиректятся третьему агенту:


// Тип агента, который выполняет грубую обработку картинок.
class inaccurate_resizer final : public agent_t {
public :
	inaccurate_resizer( context_t ctx, mbox_t overload_mbox )
		:	agent_t( ctx
				+ limit_then_redirect< resize_request >(
					// Разрешаем хранить всего 20 запросов...
					20,
					// Остальное пойдет на этот mbox.
					[overload_mbox]() { return overload_mbox; } ) )
	{...}
...
};

Третий агент вместо ресайза выполняет генерацию пустой картинки. Т.е. когда все плохо и нагрузка оказалась ну очень высокой, то лучше уж оставлять вместо картинок пустые места, чем впадать в полный ступор. Однако, генерация пустой картинки не происходит мгновенно, поэтому можно ожидать, что и у третьего агента будет некий разумный лимит для ожидающих запросов. А вот если этот лимит превышается, то, пожалуй, лучше грохнуть все приложение через std::abort, дабы после рестарта оно могло начать работу заново. Возможно, тормоза с обработкой потока запросов вызваны каким-то неправильным поведением приложения, а рестарт позволит начать все заново «с чистого листа». Поэтому третий агент тупо заставляет вызвать std::abort при превышении лимита. Примитивно, но чрезвычайно действенно:


// Тип агента, который не выполняет никакой обработки картинки,
// а вместо этого возвращает пустую картинку заданного размера.
class empty_image_maker final : public agent_t {
public :
	empty_image_maker( context_t ctx )
		:	agent_t( ctx
				// Ограничиваем количество запросов 50-тью штуками.
				// Если их все-таки оказывается больше, значит что-то идет
				// совсем не так, как задумывалось и лучше прервать работу
				// приложения.
				+ limit_then_abort< resize_request >( 50 ) )
	{...}
...
};

А все вместе агенты могут создаваться, например, таким образом:


// Вспомогательная функция для создания агентов для рейсайзинга картинок.
// Возвращается mbox, на который следует отсылать запросы для ресайзинга.
mbox_t make_resizers( environment_t & env ) {
	mbox_t resizer_mbox;
	// Все агенты будут иметь собственный контекст исполнения (т.е. каждый агент
	// будет активным объектом), для чего используется приватный диспетчер
	// active_obj.
	env.introduce_coop(
		disp::active_obj::create_private_disp( env )->binder(),
		[&resizer_mbox]( coop_t & coop ) {
			// Создаем агентов в обратном порядке, т.к. нам нужны будут
			// mbox-ы на которые следует перенаправлять "лишние" сообщения.
			auto third = coop.make_agent< empty_image_maker >();
			auto second = coop.make_agent< inaccurate_resizer >( third->so_direct_mbox() );
			auto first = coop.make_agent< accurate_resizer >( second->so_direct_mbox() );

			// Последним создан агент, который будет первым в цепочке агентов
			// для ресайзинга картинок. Его почтовый ящик и будет ящиком для
			// отправки запросов.
			resizer_mbox = first->so_direct_mbox();
		} );

	return resizer_mbox;
}

В общем, механизм message limits позволяет в простых случаях обходиться без разработки кастомных агентов collector-ов и performer-ов. Хотя полностью заменить их и не может (так, message limits не позволяет автоматически выбрасывать из очереди самые старые сообщения – это связано с организацией очередей заявок у диспетчеров).


Итак, попробуем кратко резюмировать. Если ваше приложение состоит из агентов, взаимодействующих исключительно через асинхронные сообщения, и вы любите использовать подход fire-and-forget, то перегрузка агентов вам практически гарантирована. В идеале для защиты своих агентов вам бы следовало использовать что-то вроде пар из collector-performer-агентов, логика поведения которых заточена под вашу задачу. Но, если вам не нужно идеальное решение, а достаточно «дешево и сердито», то на помощь придут лимиты для сообщений, которые SObjectizer предоставляет «из коробки».


PS. Возникновение перегрузок возможно не только для акторов/агентов в рамках Модели Акторов, но и при использовании CSP-шных каналов. Поэтому в SObjectizer аналоги CSP-шных каналов, message chains, так же содержат средства защиты от перегрузки, в чем-то похожие на message limits.

Поделиться с друзьями
-->

Комментарии (6)


  1. laphroaig
    23.09.2016 17:02

    В подобных очередях, в нашей системе, помимо ограничения размера очереди, при достижении которого сообщения начинают теряться, мы устанавливаем лимит для срабатываний предупреждений, при достижении которого идет запись в лог или уведомляется система мониторинга. Лимит для предупреждений выбираем на порядок или два меньше ограничения размера очереди, но он должен срабатывать относительно редко. Например ограничение очереди 100000 элементов, а предупреждения уже начинают идти при ее разрастании более 1000. Лимит на размер очереди выбирается так, чтобы при полном ее заполнении система смогла бы ее «разгрести» за разумное время при возвращении системы в рабочий режим.


    1. eao197
      23.09.2016 17:13

      Это как раз то, о чем речь шла в статье: хороший механизм overload control затачивается под определенные условия. Тут пространство для вариантов весьма большое. Например, можно настроить несколько уведомлений для очереди: когда ее объем превышает 1000, 10000 и 100000 экземпляров. Причем уведомления могут быть как о превышении (т.е. когда объем превышает 10000), так и о падении ниже порога (т.е. когда объем опускается ниже 1000). Можно много чего придумать.

      Поинт был несколько в другом: если фреймворк претендует на универсальность, то он вряд ли сможет предложить большое количество готовых вариантов, доступных из коробки. Посему хорошее решение придется программировать пользователю фреймворка. А если такой возможности нет, то придется пользоваться чем-то готовым, но не факт, что это будет удобно и оптимально.


  1. gridem
    23.09.2016 17:58

    К сожалению, нет универсального рецепта по борьбе с перегрузками

    Универсального нет, но есть такой, который может подойти большинству и на мой взгляд должен использоваться по умолчанию: backpressure. Т.е. это когда сущность, которая перегружает другую сущность, останавливается и не выполняет работу до тех пор, пока очередь не разгребется до определенного значения.


    В связи с этим вопрос: есть ли такая возможность у SObjectizer?


    1. eao197
      23.09.2016 18:06

      Обычный send в SObjectizer асинхронный — он не блокируется. Сделано это для того, чтобы не заблокировать самого себя, если и получатель, и отправитель работают на одном и том же рабочем контексте. Это запросто возможно, т.к. агенты могут быть привязаны к одному и тому же диспетчеру.

      Приостанавливает отправителя синхронный запрос (т.е. request_value вместо send). Но он приостанавливает в любом случае. Даже если нагрузка на получателя находится в разумных пределах.

      А вот если send выполняется для message chain с ограничением на размер, то send может быть приостановлен на некоторый тайм-аут. Если в течении этого времени места для сообщения не нашлось, то SO-5 предпринимает одно из следующих действий: бросает исключение (т.е. игнорирует новое сообщение, но с большим треском), тихо выбрасывает новое сообщение, тихо выбрасывает самое старое сообщение или же вызывает std::abort. Так что на message chain можно делать backpressure, но здесь будут свои сложности. В том числе и в случае, когда отправитель и получатель работают на одном и том же контексте.


  1. Deosis
    26.09.2016 08:01

    Есть ли в SObjectizer способ заменять сообщения в очереди?
    Т.е. отправили запрос на ресайз картинки, потом второй запрос на ресайз этой же картинки.
    Если обработчик еще не приступил к выполнению первого запроса, то можно исключить его из очереди, т.к. его результат уже не нужен.


    1. eao197
      26.09.2016 10:06

      Нет, в штатных очередях такой возможности нет.
      В этом случае отлично работает схема collector-performer. Collector сначала складирует запросы у себе. Поскольку он знает структуру запроса, он знает, по какому критерию проверять сообщения на уникальность (т.е. что в сообщении является уникальным ключом). Плюс, Collector может использовать структуры данных, которые поддерживают возможность удаления элементов по ключу (например, это может быть Boost.Multindex, первым ключом может быть имя картинки, вторым ключом — порядковый номер).

      Мы несколько раз использовали такую схему. И одной из задач Collector-а была как раз отбрасывание запросов-дубликатов.