Возможности конкурентной обработки появляются в программе по разным причинам: некоторые связаны с расширением возможностей платформы, другие вводятся вместе с новым API, поступающим в стандартную библиотеку, некоторые связаны со сменой парадигмы и переменами в наших представлениях. В этой статье будет рассмотрено три способа решения одной и той же задачи, но с применением отличающихся инструментов и парадигм.

Задача производителя-потребителя

Это классическая задача на синхронизацию множества процессов, где две группы потоков используют общий буфер. Одна из этих групп называется «производители», они постоянно закидывают данные в буфер, а а другая называется «потребители», они постоянно вытягивают данные из буфера. Наша цель – написать код, который бы гарантировал следующее:

  • Все произведенные элементы будут потреблены.

  • Потребители не будут пытаться тянуть данные из пустого буфера.

  • Потоки не окажутся в состоянии взаимной блокировки.

  • Пропускная способность будет доведена до максимума.

Мы воспользуемся неограниченным буфером, но тот же самый подход позволит вам решить эту задачу и с ограниченным буфером.

Решение студента

Первое решение могло бы быть предложено студентом с факультета информатики. Он только что сдал экзамен по предмету “Вводный курс по конкурентности в Java” и кое-что знает о ключевом слове  synchronized, а также о методах ожидания и уведомления. Он намерен решить задачу самостоятельно. В конце концов, что же может пойти не так? Студенту много не надо, есть старый добрый List плюс два синхронизированных метода – вот и решение готово:

package concurency;

import java.util.LinkedList;
import java.util.List;
import java.util.Random;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public class Student {
    List<integer> buffer = new LinkedList<>();

    private Runnable producer = () -> {
        Random r = new Random();
        while (true) {
            try {
                int item = r.nextInt();
                System.out.println("Produced: " + item);
                produce(item);
                Thread.sleep(1000 + r.nextInt(1000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

    };

    private Runnable consumer = () -> {
        while(true) {
            try {
                Integer consumed = consume();
                System.out.println("Consumed: " + consumed);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    };


    private void produce(Integer item) {
        synchronized (buffer) {
            buffer.add(item);
            buffer.notifyAll();
        }
    }

    private Integer consume() throws InterruptedException {
        synchronized (buffer) {
            while (buffer.isEmpty()) {
                buffer.wait();
            }
            Integer removed = buffer.remove(0);
            buffer.notifyAll();
            return removed;
        }
    }

    public void play() {
        List<thread> threads = IntStream.range(0, 10).boxed()
                .map(i -> new Thread(producer))
                .collect(Collectors.toList());
        IntStream.range(0, 10).boxed()
                .map(i -> new Thread(consumer))
                .forEach(threads::add);
        threads.forEach(Thread::start);
        threads.forEach(thread -> {
            try {
                thread.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
    }

    public static void main(String[] args) {
        new Student().play();
    }
}

Код прямолинеен и банален. Все критически важные фрагменты защищены при помощи блока synchronized, потребители постоянно проверяют, есть ли в буфере какие-либо элементы, которые можно взять. Потоки получают уведомления, как только будет произведен новый элемент, поэтому взаимной блокировки возникнуть не должно. Да, этот код корректен, но его можно было бы сделать проще. Давайте рассмотрим еще одно решение.

Решение кодера

Следующее решение предлагает кодер, который считает себя «инженером-программистом на Java». Некоторый опыт у него уже есть, он слышал о новых пакетах, появившихся в Java 1.5 (да-да!) и чувствует, что справился бы с задачей лучше. Прослушав краткую лекцию на тему «Конкурентность Java на практике» и быстро порывшись на StackOverflow, он может выдать следующий код:

package concurency;

import java.util.Random;
import java.util.concurrent.*;

import java.util.stream.IntStream;

public class Engineer {
    BlockingQueue<integer> buffer = new LinkedBlockingQueue<>();

    private Runnable producer = () -> {
        Random r = new Random();
        while (true) {
            try {
                int item = r.nextInt();
                System.out.println("Produced: " + item);
                buffer.put(item);
                Thread.sleep(1000 + r.nextInt(1000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

    };

    private Runnable consumer = () -> {
        while(true) {
            try {
                Integer consumed = buffer.take();
                System.out.println("Consumed: " + consumed);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    };

    public void play() {
        ExecutorService executor = Executors.newCachedThreadPool();

        IntStream.range(0, 10).boxed()
                .map(i -> producer)
                .forEach(executor::submit);
        IntStream.range(0, 10).boxed()
                .map(i -> consumer)
                .forEach(executor::submit);
        try {
            executor.awaitTermination(Long.MAX_VALUE,TimeUnit.DAYS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        new Engineer().play();
    }
}

Вышеприведенный код определенно лучше, чем первое решение. Он существенно короче, и в нем нет блоков synchronized. Вместо него введена блокирующая очередь, которая и обеспечивает синхронизацию между потоками. Поскольку синхронизированные блоки, сопровождаемые методами уведомления и ожидания – это огромное поле для ошибок, можем считать, что такая перемена – хороший шаг вперед. Более того, Кодер решает, что лучше не заниматься ручным управлением потоками, а воспользоваться ExecutorService – это удобная в обращении абстракция, и работать с ней гораздо лучше, чем с сырыми потоками, которые мы видели у студента. Кроме того, ExecutorService позволяет нам предоставить интерфейс Callable, а не Runnable, что может нам пригодиться при решении сложных реалистичных задач.

Но самая большая проблема тут по-прежнему не решена: в коде все еще используются блокирующие операторы. Элементы производятся медленнее, чем потребляются, поэтому потребляющие потоки большую часть времени остаются заблокированными. Чтобы с этим справиться, нам нужно третье, еще более качественное решение.

Решение гика

И вот за дело берется гик. Ему нравится неблокирующий подход к решению задач, связанных с конкурентностью. Поэтому он попытается решить задачу производителя и потребителя строго асинхронным образом. Вот его код:

package concurency;

import java.util.Random;
import java.util.concurrent.*;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.IntStream;

public class Geek {
    private Random r = new Random();

    private Supplier<integer> producer = () -> {
        int item = r.nextInt();
        System.out.println("Produced: " + item + " by " + Thread.currentThread().getName());
        return item;
    };

    private Consumer<integer> consumer = i -> {
        System.out.println("Consumed: " + i + " by " + Thread.currentThread().getName());
    };

    public void play() {
        while (true) {
            IntStream.range(0, 10).boxed().forEach(i ->
            CompletableFuture.supplyAsync(producer)
                    .thenAcceptAsync(consumer));
            try {
                Thread.sleep(500 + r.nextInt(500));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        new Geek().play();
    }
}

На этот раз код не только короче и проще – но в нем и просматривается полная смена парадигмы. Вместо того, чтобы прибегать к реализациям Runnable, мы используем интерфейсы Supplier и Consumer – совершенно не связанные с конкурентностью. Вместо того, чтобы создавать ExecutionService, которому поставляются долгоиграющие задачи, мы используем CompletableFuture для мелких задач, каждая из которых решается одним махом.

Также можем обойтись без буфера – результат выполнения задачи производителя передается напрямую потребителю и выполняется асинхронно, а производитель тем временем справляется со своей работой. И все это удается сделать, не написав ни единой строки для блокирующих операций.

Вышеприведенное решение лишь слегка затрагивает возможности, предоставляемые CompletableFuture. Имея дело со сложными реалистичными проблемами, можно запрограммировать их решение поэтапно, выражая такие этапы как FunctionConsumer или Runnable (в зависимости от того, требуют ли функции аргументов и/или производят результаты). Этапы выполнения можно сцеплять и запускать, ориентируясь на результаты выполнения других этапов (или этапа). Асинхронным образом можно обрабатывать даже исключения, как будто эти исключения – обычные результаты вычислений.

Заключение

Современная модель программирования конкурентности в Java изменилась и продолжает меняться.  CompletableFuture с поддержкой лямбда-выражений значительно приблизили язык к нашему времени; теперь разработчики могут писать лаконичный асинхронный код, ориентированный на использование функций, и не особенно задумываться о низкоуровневых механизмах синхронизации и ручном управлении потоками. Потоки в Java использовались с самого начала существования языка, все, что можно о них сказать – нет однозначного ответа на вопрос, можно ли без них обойтись. Единственное универсальное и безопасное решение - System.exit(), но применить его удается редко. Правда, это уже тема для другой статьи.

Комментарии (5)


  1. ris58h
    11.03.2022 11:45
    +2

    Отличный пример того как не надо писать producer-consumer. Речь про "гик" решение. Вместо producer-consumer у автора получилось producer + async-callback.


    1. Dmitry2019
      12.03.2022 19:35

      Приведите пожалуйста пример правильной реализации


      1. ris58h
        13.03.2022 13:20

        Не знаю что значит "правильная" реализация, но "гик" решение - это не producer-consumer. Тут стоит определиться с терминами, конечно, а то окажется что последовательный вызов двух методов consume(produce()) - это тоже producer-consumer в чьём-то понимании.

        В producer-consumer есть три независимые сущности: producer, buffer, consumer. Автор о них упоминал в требованиях, кстати. Причём количество продюсеров и консюмеров может быть произвольным и даже меняться. К сожалению, в "гик" решении нет возможности регулировать число консюмеров - они задаются сразу же вместе с продюсерами. К тому же, автор решил "обойтись без буфера".

        В итоге, автор настроил pipeline обработки данных и запустил его. Да быстро, да неблокирующе, но изначальная задача не решена.


  1. Plesser
    11.03.2022 13:47

    На правах оффтопа, а у Вас планируется издание второй редакции книги по Kotlin от BigNerdRanch, если да то когда?


  1. Filex
    11.03.2022 18:35

    В книге редакции 2022 года издания есть отличия от предыдущего выпуска?