Привет, Хабр!

Сегодня поговорим о паттерне Poison Pill и его реализацию в Java.

Poison Pill, или «ядовитая пилюля», — это шаблон, используемый для мягкой остановки потока или процесса. Суть его заключается в отправке специального, заранее определенного сообщения, которое сигнализирует о необходимости завершения работы. Это сообщение обрабатывается в рамках нормального потока сообщений и позволяет потоку корректно завершить работу, освободить ресурсы и закрыть все активные соединения.

Принцип работы

Архитектура паттерна Poison Pill включает несколько компонентов: Message, Producer, Consumer и MessageQueue.

Message который определяет структуру сообщений. Сообщения могут включать различные заголовки и тело сообщения. Пример реализации — SimpleMessage:

MessageQueue объединяет точки публикации MqPublishPoint и подписки MqSubscribePoint. Он представляет собой очередь, через которую сообщения передаются от производителей к потребителям. Пример реализации — SimpleMessageQueue, использующий BlockingQueue для хранения сообщений.

Producer создает сообщения и помещает их в очередь. Т. е. когда производитель завершает свою работу, он отправляет сообщение Poison Pill, чтобы уведомить потребителей о необходимости остановки.

Consumer является потребителем и извлекает сообщения из очереди и обрабатывает их. Если потребитель получает Poison Pill, он завершает свою работу

В целом, принцип работы Poison Pill основан на циклическом взаимодействии между производителями и потребителями через очередь сообщений. Производители генерируют сообщения и помещают их в очередь. Каждое сообщение обрабатывается потребителями, которые извлекают его из очереди. Когда производитель завершает работу и отправляет Poison Pill, потребитель, получив этот сигнал, прекращает обработку и завершает выполнение.

Реализация в Java

Определим интерфейс Message и класс SimpleMessage:

public interface Message {
    void addHeader(String key, String value);
    String getHeader(String key);
    String getBody();
    void setBody(String body);
}

public class SimpleMessage implements Message {
    private Map<String, String> headers = new HashMap<>();
    private String body;

    @Override
    public void addHeader(String key, String value) {
        headers.put(key, value);
    }

    @Override
    public String getHeader(String key) {
        return headers.get(key);
    }

    @Override
    public void setBody(String body) {
        this.body = body;
    }

    @Override
    public String getBody() {
        return body;
    }
}

Определим MessageQueue и его реализации SimpleMessageQueue:

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public interface MessageQueue extends MqPublishPoint, MqSubscribePoint {}

public class SimpleMessageQueue implements MessageQueue {
    private BlockingQueue<Message> queue;

    public SimpleMessageQueue(int capacity) {
        this.queue = new LinkedBlockingQueue<>(capacity);
    }

    @Override
    public void put(Message message) throws InterruptedException {
        queue.put(message);
    }

    @Override
    public Message take() throws InterruptedException {
        return queue.take();
    }
}

Реализация Producer и Consumer:

public class Producer implements Runnable {
    private final MessageQueue queue;
    private volatile boolean isStopped;

    public Producer(MessageQueue queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            while (!isStopped) {
                SimpleMessage message = new SimpleMessage();
                message.setBody("Important data");
                queue.put(message);
                Thread.sleep(1000);  // Имитация работы
            }
            queue.put(new PoisonPillMessage());  // Отправка Poison Pill
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    public void stop() {
        isStopped = true;
    }
}

public class Consumer implements Runnable {
    private final MessageQueue queue;

    public Consumer(MessageQueue queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            Message message;
            while (true) {
                message = queue.take();
                if (message instanceof PoisonPillMessage) break;  // Остановка если Poison Pill
                processMessage(message);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    private void processMessage(Message message) {
        System.out.println("Processed: " + message.getBody());
    }
}

public class PoisonPillMessage extends SimpleMessage {
    public PoisonPillMessage() {
        setBody("POISON_PILL");
    }
}

Запуск и координация потоков:

public class Main {
    public static void main(String[] args) {
        MessageQueue queue = new SimpleMessageQueue(10);

        Producer producer = new Producer(queue);
        Consumer consumer = new Consumer(queue);

        Thread producerThread = new Thread(producer);
        Thread consumerThread = new Thread(consumer);

        producerThread.start();
        consumerThread.start();

        try {
            Thread.sleep(5000);  // позволяем производить сообщения
            producer.stop();     // останавливаем производителя
            producerThread.join();
            consumerThread.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

Альтернативы

Manual Offset Management — один из подходов к управлению отказами заключается в ручном смещении офсета до первого сообщения после Poison Pill. Так можно получить контроль над тем, какие сообщения будут пропущены, но требует доступа к Kafka или другому брокеру сообщений.

Error‑Handling Deserializers: в случае использования Apache Kafka можно применить специальный десериализатор с обработчиком ошибок. Он пропускает некорректные сообщения, тем самым избегая зацикливания при обработке данных, и передаёт сообщения об ошибках в спец. лог.

С конфигурацией Spring Kafka можно использовать SeekToCurrentErrorHandler для управления ошибками при десериализации сообщений. Обработчик смещает офсет в точку после неудачного сообщения, избегая блокировки обработки последующих сообщений.

Используя паттерн команды, можно создать единый интерфейс для различных типов задач и использовать общий механизм завершения их выполнения, что также можно адаптировать под конкретные сценарии.

А можно воспользоваться просто прямым завершением потоков. Это можно реализовать через управление потоком через класс Thread:

public class Main {
    public static void main(String[] args) {
        Thread worker1 = new Thread(new Worker());
        Thread worker2 = new Thread(new Worker());
        worker1.start();
        worker2.start();

        try {
            Thread.sleep(1000); // даем время потокам поработать
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        // прямое завершение потоков
        worker1.interrupt();
        worker2.interrupt();

        System.out.println("Потоки были прямо завершены.");
    }
}

class Worker implements Runnable {
    @Override
    public void run() {
        while (!Thread.currentThread().isInterrupted()) {
            try {
                System.out.println("Работа потока " + Thread.currentThread().getName());
                Thread.sleep(100);
            } catch (InterruptedException e) {
                System.out.println("Поток " + Thread.currentThread().getName() + " прерван.");
                Thread.currentThread().interrupt(); // рекомендуется восстановить статус прерванного состояния
            }
        }
    }
}

Поток выполняет свою работу в цикле до тех пор, пока не будет прерван. Метод interrupt() используется для отправки запроса на прерывание потоку, который проверяет свое состояние с помощью Thread.currentThread().isInterrupted() .


В заключение напомню про открытый урок, который пройдет сегодня вечером в OTUS: «Переопределение, скрытие и передекларация в Java». Записаться можно по ссылке.

Комментарии (9)


  1. DenSigma
    13.05.2024 09:29
    +2

    Крайне неудачное название. Сложно нагуглить, мешается бизнес-термин.

    Второй вопрос. Чем это решение лучше просто переменной в объекте Consumer?


    1. outlingo
      13.05.2024 09:29

      Чем это решение лучше просто переменной в объекте Consumer?

      Тем, что Publisher не должен знать о реализации Consumer, о том есть ли у него эта магическая переменная, о том существует ли вообще Consumer, сколько экземпляров Consumer, какая структура очереди (например либо это очередь классическая, либо множество реплик очереди каждая из которых к своему экземпляру Consumer'а).

      Поэтому Publisher просто отправляет событие EOF (End-Of-File/ End-Of-Stream / End-Of-Data) и всё.


      1. alexxisr
        13.05.2024 09:29

        но публишер знает об очереди, в которую он складывает сообщения. Что если сделать флаг окончания работы публишера в очереди?


        1. outlingo
          13.05.2024 09:29

          И фактически этот флаг и станет тем самым "poison pill" который в этой статье и описан. Только это приведет еще к тому, что надо будет мониторить этот флаг. Добавляется также проблема с тем, что если consumer вошел в блокирующее ожидание на очереди, это ожидание нужно прервать при выставлении флага.

          И вишенка на торте - когда система распределенная и publisher и consumer работают в разных процессах и на разных серверах, и тогда вы с флагом вообще задолбаетесь.


          1. alexxisr
            13.05.2024 09:29

            С другой стороны не надо проверять каждое сообщение. Если сообщение есть - просто обрабатываем его, а вот если нет, то надо проверить флаг. И ждем же мы все равно с таймаутом - вот по его окончании и проверим флаг, и прервемся если больше сообщений не ожидается.

            Распределенные сервера - это и с отравленным сообщением не так просто - например, что делать, если есть несколько publisher-ов? Если один отправит сообщение eof, то все consumer-ы закончат работу? Или что будет если вдруг сообщения встанут в очередь не по порядку и poison pill сработает раньше чем нужно?


      1. DenSigma
        13.05.2024 09:29

        Но ведь задача именно в том, чтобы сторонный объект мог остановить именно этот объект Runnable. Усложнение для программиста в том, что он должен связать через левую очередь взаимодействие Runnable и контролирующий его объект. А если этих палблишеров-консумеров в программе куча? А если чайник воспользуется имеющиейся очередью для отправки сообщения runnable-объекту? А потом эта очередь, предназначавшаяся изначально для чего-то другого, будет некаккуратно переделана? А как обеспечивать пересылку киллер-сообщения конкретному runnable-объекту? Вот посмотрите, сколько сложностей вы вывалили на голову коллегам? При том, что задача решается просто переменной в runnalbe-объекте.

        При том, что "publisher" все-равно должен знать о реализации "Consumer", хотя бы по формату передаваемых данных, по константам команд, которые воспринимает runnable-объект. Опять же тип очереди - появляется зависимость от типа очереди и у runnable-объекта и у его контролера. Ладно вы разрабатываете и то и другое, и сделали через интерфейс. А если разработчик потребителя не вы?

        "Магическая переменная", бгг. А знать, если ли у консумера "магическая очередь", не надо? И не надо знать формат и тип сообщений и константы? И не надо знать, как ее задавать у приемника-передатчика, нужно ли ее создавать, либо надо шариться в приложении искать, есть объект очереди, связан ли он с нужным runnable? Видите, насколько больше приходится не знать?


    1. YegorP
      13.05.2024 09:29

      Я бы этим термином NaN'ы назвал в IEEE754. Оператор(число, NaN) будет NaN. Причем там предусматривается возможность дополнительную инфу закодировать, и она должна проползти через всю арифметику.


  1. MaskedTrench
    13.05.2024 09:29

    Спасибо за статью, хорошее описание методов работы. Единственное что не раскрыто - это различие между State Machine и Poison pill. Понятно, что в данной ситуации, Poison pill - это решение для потока и его самоуправления, но хотелось бы увидеть различие между ними в данной статье. А в остальном прекрасная статься, спасибо ещё раз.


  1. dididididi
    13.05.2024 09:29

    Вопрос, а на чёрта?)

    А почему просто продюсер не может перестать посылать сообщения?)

    а как потом консьюмер запустить? Передеплоить?)

    А в следующий раз вы расскажете как удобно есть пельмени попой, но тоже не объясните зачем?)