Привет, Хабр!
Сегодня поговорим о паттерне Poison Pill и его реализацию в Java.
Poison Pill, или «ядовитая пилюля», — это шаблон, используемый для мягкой остановки потока или процесса. Суть его заключается в отправке специального, заранее определенного сообщения, которое сигнализирует о необходимости завершения работы. Это сообщение обрабатывается в рамках нормального потока сообщений и позволяет потоку корректно завершить работу, освободить ресурсы и закрыть все активные соединения.
Принцип работы
Архитектура паттерна Poison Pill включает несколько компонентов: Message
, Producer
, Consumer
и MessageQueue
.
Message который определяет структуру сообщений. Сообщения могут включать различные заголовки и тело сообщения. Пример реализации — SimpleMessage
:
MessageQueue объединяет точки публикации MqPublishPoint
и подписки MqSubscribePoint
. Он представляет собой очередь, через которую сообщения передаются от производителей к потребителям. Пример реализации — SimpleMessageQueue
, использующий BlockingQueue
для хранения сообщений.
Producer создает сообщения и помещает их в очередь. Т. е. когда производитель завершает свою работу, он отправляет сообщение Poison Pill, чтобы уведомить потребителей о необходимости остановки.
Consumer является потребителем и извлекает сообщения из очереди и обрабатывает их. Если потребитель получает Poison Pill, он завершает свою работу
В целом, принцип работы Poison Pill основан на циклическом взаимодействии между производителями и потребителями через очередь сообщений. Производители генерируют сообщения и помещают их в очередь. Каждое сообщение обрабатывается потребителями, которые извлекают его из очереди. Когда производитель завершает работу и отправляет Poison Pill, потребитель, получив этот сигнал, прекращает обработку и завершает выполнение.
Реализация в Java
Определим интерфейс Message и класс SimpleMessage:
public interface Message {
void addHeader(String key, String value);
String getHeader(String key);
String getBody();
void setBody(String body);
}
public class SimpleMessage implements Message {
private Map<String, String> headers = new HashMap<>();
private String body;
@Override
public void addHeader(String key, String value) {
headers.put(key, value);
}
@Override
public String getHeader(String key) {
return headers.get(key);
}
@Override
public void setBody(String body) {
this.body = body;
}
@Override
public String getBody() {
return body;
}
}
Определим MessageQueue и его реализации SimpleMessageQueue:
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public interface MessageQueue extends MqPublishPoint, MqSubscribePoint {}
public class SimpleMessageQueue implements MessageQueue {
private BlockingQueue<Message> queue;
public SimpleMessageQueue(int capacity) {
this.queue = new LinkedBlockingQueue<>(capacity);
}
@Override
public void put(Message message) throws InterruptedException {
queue.put(message);
}
@Override
public Message take() throws InterruptedException {
return queue.take();
}
}
Реализация Producer и Consumer:
public class Producer implements Runnable {
private final MessageQueue queue;
private volatile boolean isStopped;
public Producer(MessageQueue queue) {
this.queue = queue;
}
@Override
public void run() {
try {
while (!isStopped) {
SimpleMessage message = new SimpleMessage();
message.setBody("Important data");
queue.put(message);
Thread.sleep(1000); // Имитация работы
}
queue.put(new PoisonPillMessage()); // Отправка Poison Pill
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
public void stop() {
isStopped = true;
}
}
public class Consumer implements Runnable {
private final MessageQueue queue;
public Consumer(MessageQueue queue) {
this.queue = queue;
}
@Override
public void run() {
try {
Message message;
while (true) {
message = queue.take();
if (message instanceof PoisonPillMessage) break; // Остановка если Poison Pill
processMessage(message);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
private void processMessage(Message message) {
System.out.println("Processed: " + message.getBody());
}
}
public class PoisonPillMessage extends SimpleMessage {
public PoisonPillMessage() {
setBody("POISON_PILL");
}
}
Запуск и координация потоков:
public class Main {
public static void main(String[] args) {
MessageQueue queue = new SimpleMessageQueue(10);
Producer producer = new Producer(queue);
Consumer consumer = new Consumer(queue);
Thread producerThread = new Thread(producer);
Thread consumerThread = new Thread(consumer);
producerThread.start();
consumerThread.start();
try {
Thread.sleep(5000); // позволяем производить сообщения
producer.stop(); // останавливаем производителя
producerThread.join();
consumerThread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Альтернативы
Manual Offset Management — один из подходов к управлению отказами заключается в ручном смещении офсета до первого сообщения после Poison Pill. Так можно получить контроль над тем, какие сообщения будут пропущены, но требует доступа к Kafka или другому брокеру сообщений.
Error‑Handling Deserializers: в случае использования Apache Kafka можно применить специальный десериализатор с обработчиком ошибок. Он пропускает некорректные сообщения, тем самым избегая зацикливания при обработке данных, и передаёт сообщения об ошибках в спец. лог.
С конфигурацией Spring Kafka можно использовать SeekToCurrentErrorHandler
для управления ошибками при десериализации сообщений. Обработчик смещает офсет в точку после неудачного сообщения, избегая блокировки обработки последующих сообщений.
Используя паттерн команды, можно создать единый интерфейс для различных типов задач и использовать общий механизм завершения их выполнения, что также можно адаптировать под конкретные сценарии.
А можно воспользоваться просто прямым завершением потоков. Это можно реализовать через управление потоком через класс Thread
:
public class Main {
public static void main(String[] args) {
Thread worker1 = new Thread(new Worker());
Thread worker2 = new Thread(new Worker());
worker1.start();
worker2.start();
try {
Thread.sleep(1000); // даем время потокам поработать
} catch (InterruptedException e) {
e.printStackTrace();
}
// прямое завершение потоков
worker1.interrupt();
worker2.interrupt();
System.out.println("Потоки были прямо завершены.");
}
}
class Worker implements Runnable {
@Override
public void run() {
while (!Thread.currentThread().isInterrupted()) {
try {
System.out.println("Работа потока " + Thread.currentThread().getName());
Thread.sleep(100);
} catch (InterruptedException e) {
System.out.println("Поток " + Thread.currentThread().getName() + " прерван.");
Thread.currentThread().interrupt(); // рекомендуется восстановить статус прерванного состояния
}
}
}
}
Поток выполняет свою работу в цикле до тех пор, пока не будет прерван. Метод interrupt()
используется для отправки запроса на прерывание потоку, который проверяет свое состояние с помощью Thread.currentThread().isInterrupted()
.
В заключение напомню про открытый урок, который пройдет сегодня вечером в OTUS: «Переопределение, скрытие и передекларация в Java». Записаться можно по ссылке.
Комментарии (9)
MaskedTrench
13.05.2024 09:29Спасибо за статью, хорошее описание методов работы. Единственное что не раскрыто - это различие между State Machine и Poison pill. Понятно, что в данной ситуации, Poison pill - это решение для потока и его самоуправления, но хотелось бы увидеть различие между ними в данной статье. А в остальном прекрасная статься, спасибо ещё раз.
dididididi
13.05.2024 09:29Вопрос, а на чёрта?)
А почему просто продюсер не может перестать посылать сообщения?)
а как потом консьюмер запустить? Передеплоить?)
А в следующий раз вы расскажете как удобно есть пельмени попой, но тоже не объясните зачем?)
DenSigma
Крайне неудачное название. Сложно нагуглить, мешается бизнес-термин.
Второй вопрос. Чем это решение лучше просто переменной в объекте Consumer?
outlingo
Тем, что Publisher не должен знать о реализации Consumer, о том есть ли у него эта магическая переменная, о том существует ли вообще Consumer, сколько экземпляров Consumer, какая структура очереди (например либо это очередь классическая, либо множество реплик очереди каждая из которых к своему экземпляру Consumer'а).
Поэтому Publisher просто отправляет событие EOF (End-Of-File/ End-Of-Stream / End-Of-Data) и всё.
alexxisr
но публишер знает об очереди, в которую он складывает сообщения. Что если сделать флаг окончания работы публишера в очереди?
outlingo
И фактически этот флаг и станет тем самым "poison pill" который в этой статье и описан. Только это приведет еще к тому, что надо будет мониторить этот флаг. Добавляется также проблема с тем, что если consumer вошел в блокирующее ожидание на очереди, это ожидание нужно прервать при выставлении флага.
И вишенка на торте - когда система распределенная и publisher и consumer работают в разных процессах и на разных серверах, и тогда вы с флагом вообще задолбаетесь.
alexxisr
С другой стороны не надо проверять каждое сообщение. Если сообщение есть - просто обрабатываем его, а вот если нет, то надо проверить флаг. И ждем же мы все равно с таймаутом - вот по его окончании и проверим флаг, и прервемся если больше сообщений не ожидается.
Распределенные сервера - это и с отравленным сообщением не так просто - например, что делать, если есть несколько publisher-ов? Если один отправит сообщение eof, то все consumer-ы закончат работу? Или что будет если вдруг сообщения встанут в очередь не по порядку и poison pill сработает раньше чем нужно?
DenSigma
Но ведь задача именно в том, чтобы сторонный объект мог остановить именно этот объект Runnable. Усложнение для программиста в том, что он должен связать через левую очередь взаимодействие Runnable и контролирующий его объект. А если этих палблишеров-консумеров в программе куча? А если чайник воспользуется имеющиейся очередью для отправки сообщения runnable-объекту? А потом эта очередь, предназначавшаяся изначально для чего-то другого, будет некаккуратно переделана? А как обеспечивать пересылку киллер-сообщения конкретному runnable-объекту? Вот посмотрите, сколько сложностей вы вывалили на голову коллегам? При том, что задача решается просто переменной в runnalbe-объекте.
При том, что "publisher" все-равно должен знать о реализации "Consumer", хотя бы по формату передаваемых данных, по константам команд, которые воспринимает runnable-объект. Опять же тип очереди - появляется зависимость от типа очереди и у runnable-объекта и у его контролера. Ладно вы разрабатываете и то и другое, и сделали через интерфейс. А если разработчик потребителя не вы?
"Магическая переменная", бгг. А знать, если ли у консумера "магическая очередь", не надо? И не надо знать формат и тип сообщений и константы? И не надо знать, как ее задавать у приемника-передатчика, нужно ли ее создавать, либо надо шариться в приложении искать, есть объект очереди, связан ли он с нужным runnable? Видите, насколько больше приходится не знать?
YegorP
Я бы этим термином NaN'ы назвал в IEEE754. Оператор(число, NaN) будет NaN. Причем там предусматривается возможность дополнительную инфу закодировать, и она должна проползти через всю арифметику.