Задача производитель/потребитель


Статья рассчитана для новичков, которые недавно начали свое знакомство с миром многопоточноcти на JAVA.

Не так давно я проходил стажировку в компании EPAM и среди списка заданий, которые необходимо выполнить юному падавану, было задание с многопоточностью. Справедливости ради, стоит отметить, что никто не принуждал делать это низкоуровневыми инструментами такими как wait() и notify(), когда можно без великих хитростей использовать ArrayBlockingQueue, Semophore и в целом мощный API(java.util.concurrent). Тем более это считается плохой практикой, когда вместо готового для любого сценария многопоточтности изобретать свой велосипед, в то время как API скрывает ошеломляющую сложность параллелизма. Я реализовал оба варианта native и при помощи API. Сегодня покажу 1 вариант.

Для начала, предлагаю ознакомиться со следующей статьей.
Ссылка на данный проект.

Задание было следующее:

  1. Есть транспортные корабли, которые подплывают к туннели и далее плывут к причалам для погрузки разного рода товара.
  2. Они проходят через узкий туннель где одновременно могут находиться только 5 кораблей. Под словом “подплывают к туннели” имеется ввиду то, что корабли должны откуда-то появляться. Их может быть ограниченное количество, то есть 10 или 100, а может быть бесконечное множество. Слово “Подплывают” назовем генератором кораблей.
  3. Вид кораблей и их вместительность могут быть разными в зависимости от типа товаров, которые нужно загрузить на корабль. То есть для ТЗ я придумал 3 Типа кораблей (Хлеб, Банан и Одежда) и три вида вместительности 10, 50, 100 шт. товаров. 3 типа кораблей * 3 вида вместительности = 9 разных видов кораблей.
  4. Далее есть 3 вида причалов для погрузки кораблей — Хлеб, Банан и Одежда. Каждый причал берет или подзывает к себе необходимый ему корабль и начинает его загружать. За одну секунду причал загружает на корабль 10 ед. товара. То есть если у корабля вместительность 50 шт., то причал загрузит его за 5 секунд своей работы.

Требование было следующее:

  • Правильно разбить задачу на параллельность.
  • Синхронизировать потоки, сохранить целостность данных. Ведь ограничить доступ потоков к общему ресурсу дело не сложное, а заставить их работать согласованно уже намного сложнее.
  • Работа генератора кораблей не должна зависеть от работы причалов и наоборот.
  • Общий ресурс должен быть Thread Safe (Если таковой есть в реализации)
  • Потоки не должны быть активными если нет задач.
  • Потоки не должны держать mutex если нет задач.

И так поехали!

Для начала, я нарисовал диаграмму чтобы было понятнее что к чему.



Перед тем как решать какую-либо задачу хорошей практикой считается визуализировать ее на поверхность бумаги. Особенно если речь заходит о многопоточном приложении.
Давайте разобьём наше приложение на компоненты, в нашем случае на классы. Структура проекта.



Класс — Ship Сам класс никакой логики не содержит. POJO.

Исходник
public class Ship {
    private int count;
    private Size size;
    private Type type;

    public Ship(Size size, Type type) {
        this.size = size;
        this.type = type;
    }

    public void add(int count) {
        this.count += count;
    }

    public boolean countCheck() {
        if (count >= size.getValue()) {
            return false;
        }
        return true;
    }

    public int getCount() {
        return count;
    }

    public Type getType() {
        return type;
    }

    public Size getSize() {
        return size;
    }
}


Так как корабли могут отличаться друг от друга размерами и типами было решено создать Enum классы для определения свойств корабля. Size и Type. Size обладает заранее обозначенными свойствами. (код)

Так же класс Ship обладает счетчиком int count, сколько товаров в него уже погружено, если больше чем обозначено в свойстве корабля то метод boolean countCheck() этого же класса возвращает false, иначе говоря корабль загружен.

Класс – Tunnel

Исходник
public class Tunnel {

    private List<Ship> store;
    private static final int maxShipsInTunel = 5;
    private static final int minShipsInTunel = 0;
    private int shipsCounter = 0;

     public Tunnel() {
        store = new ArrayList<>();
    }

    public synchronized boolean add(Ship element) {

        try {
            if (shipsCounter < maxShipsInTunel) {
                notifyAll();
                store.add(element);
                String info = String.format("%s + The ship arrived in the tunnel: %s %s %s", store.size(), element.getType(), element.getSize(), Thread.currentThread().getName());
                System.out.println(info);
                shipsCounter++;

            } else {
                System.out.println(store.size() + "> There is no place for a ship in the tunnel: " + Thread.currentThread().getName());
                wait();
                return false;
            }

        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return true;
    }

    public synchronized Ship get(Type shipType) {

        try {
            if (shipsCounter > minShipsInTunel) {
                notifyAll();
                for (Ship ship : store) {
                    if (ship.getType() == shipType) {
                        shipsCounter--;
                        System.out.println(store.size() + "- The ship is taken from the tunnel: " + Thread.currentThread().getName());
                        store.remove(ship);
                        return ship;
                    }
                }
            }

            System.out.println("0 < There are no ships in the tunnel");
            wait();

        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return null;
    }


Туннель хранит в себе кораблики, делается это при помощи List. Вместительность туннели 5 корабликов. В туннель каким то образом нужно добавлять кораблики и доставать. Этим занимаются 2 метода add() и get(). Метод get() достает и удаляет из списка кораблик по необходимому ему типу. Как видите в методе add() и get() стоит проверка на количество корабликов в списке. Если ship>5, то добавить кораблик не получиться и наоборот если ship<0, то взять из списка тоже не получиться. Tunnel является общим ресурсом в контексте многопоточности. Общие ресурсы в многопоточности являются злом. Все проблемы и сложность многопоточного программирования исходит именно из-за общих ресурсов. Так что их стоит избегать.

В чем проблема конкретно с нашим общим ресурсом.

Во-первых, добавление и извлечение корабликов должно быть согласованно между потоками. Если нет согласованности есть большая вероятность возникновения Race Condition и потери данных. Это мы решаем при помощи ключевого слова synchronized.

Во-вторых, в ТЗ упоминалось:

— Потоки не должны быть активными если нет задач.
— Потоки не должны держать mutex если нет задач.

Для этого тоже есть решение в виде wait() и notifyAll().

Для метода add() и потока, который ее исполняет словосочетание “нет задач” значит, что ship>5. Когда ship> 5 поток должен остановить свою деятельность и подождать. Для того, чтобы остановить поток и снять mutex мы вызываем wait(). Для метода get() аналогичные правила. Только для него ship<0. Подождать то они подождут, а как их пробудить и сказать, чтобы они вновь шли работать? Тут то и приходит к нам на помощь метод notifyAll(). Ее задача переключить поток из состояния WAITING в RUNNABLE. Когда срабатывает метод add() и в то же время ship<5, тогда он пробуждает поток, который работает с методом get(). И наоборот, когда срабатывает метод get() и ship>0 то он пробуждает поток работающий с методом add(). Некая рекурсия… Но будьте осторожны! Есть вероятность поймать DEADLOCK и уйди в бесконечное ожидание. (http://www.quizful.net/interview/java/Deadlock)

Едем дальше…

Класс – ShipGenerator.

Теперь нам нужно что-то, что будет генерировать кораблики, и будет делать это в независимом потоке. Класс ShipGenerator.

Исходник
public class ShipGenerator implements Runnable {

    private Tunnel tunnel;
    private int shipCount;

    public ShipGenerator(Tunnel tunnel, int shipCount) {
        this.tunnel = tunnel;
        this.shipCount = shipCount;
    }

    @Override
    public void run() {
        int count = 0;
        while (count < shipCount) {
            Thread.currentThread().setName(" Generator ship");
            count++;
            tunnel.add(new Ship(getRandomSize(), getRandomType()));
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
              }
    }

    private Type getRandomType() {
        Random random = new Random();
        return Type.values()[random.nextInt(Type.values().length)];
    }

    private Size getRandomSize() {
        Random random = new Random();
        return Size.values()[random.nextInt(Size.values().length)];
    }
}


Для того, чтобы добавить задачу по генерации корабликов в поток необходимо реализовать интерфейс Runnable. На вход 2 параметра это Tunnel и количество кораблей, необходимых для генераций. Далее переопределить метод run(). Метод run() — это тот метод, который непосредственно будет исполняться потоком. Туда мы и поместим логику для генерации кораблей. Логика проста. Мы генерируем кораблики случайным способом, при помощи Random и помещаем в общий ресурс Tunnel.

Небольшое отступление. Я сказал, что необходимо добавить задачу для потока, так как многие ошибочно полагают, что реализовав интерфейс Runnable они создают поток. На самом деле они создают задачу для потока. Другими словами, создав 1000 классов, реализующих интерфейс Runnable не значит создать 1000 потоков. Это значит создать 1000 задач. А количество потоков, которые будут исполнять эти задачи будет зависеть от количества ядер на вашей машине.

Класс – PierLoader

Исходник
public class PierLoader implements Runnable {
    private Tunnel tunnel;
    private Type shipType;

    public PierLoader(Tunnel tunnel, Type shipType) {
        this.tunnel = tunnel;
        this.shipType =shipType;
    }

    @Override
    public void run() {

        while (true) {
            try {
                Thread.currentThread().setName("Loader "+shipType);

                //Time to load the goods
                Thread.sleep(500);
                Ship ship = tunnel.get(shipType);
                if(ship!=null)
                while (ship.countCheck()){
                    Thread.sleep(100);
                    ship.add(10);
                    System.out.println(ship.getCount() + " Loaded ship. " + Thread.currentThread().getName());
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}


Корабль сгенерировали, добавили в туннель теперь необходимо извлечь из туннеля и загрузить ее товаром. Для этого мы создадим класс PierLoader иначе говоря причал. Как известно у нас 3 вида кораблей, значит нам необходимо создать 3 вида причалов работающих независимо друг от друга. По аналогии ShipGenerator реализуем интерфейс Runnable. На вход 2 параметра это Tunnel и Type (тип кораблей, который принимает данные причал). Вместо add() вызываем метод get() и конкретный тип корабликов.

Заметьте, что я использую sleep () только для эмуляций работы погрузчиков и генераций кораблей (якобы нужно время для загрузки товарами и корабли должны приплыть), а не для задержки их с целью подогнать под другие потоки. Никогда этим не занимаетесь, на это есть множество причин, самое очевидное: что будет если нагрузка на поток А увеличиться и будет отрабатывать намного дольше (не 1 сек, а 3 сек как вы рассчитывали) чем вы усыпили поток В (на 1 сек) чтобы дождаться потока А? Даже в зависимости от OS JVM может вести себя по-разному. Как известно sleep не освобождает ресурс, а держит ее даже во время сна, ежели wait (), который сообщает потоку чтобы он прекратил свою работу и отпустил блокировку.

Класс – Main

Исходник
public class Main {

    public static void main(String[] args) {
        System.out.println("Available number of cores: " + Runtime.getRuntime().availableProcessors());

        Tunnel tunnel = new Tunnel();

        ShipGenerator shipGenerator = new ShipGenerator(tunnel, 10);

        PierLoader pierLoader1 = new PierLoader(tunnel, Type.DRESS);
        PierLoader pierLoader2 = new PierLoader(tunnel, Type.BANANA);
        PierLoader pierLoader3 = new PierLoader(tunnel, Type.MEAL);

        ExecutorService service = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

        service.execute(shipGenerator);
        service.execute(pierLoader1);
        service.execute(pierLoader2);
        service.execute(pierLoader3);

        service.shutdown();
   }
}


Он самый простой. Инициализируем созданный нами класс Tunnel. Далее инициализируем ShipGenerator, в конструктор передаем объект Tunnel и количество корабликов, необходимое для генераций потоком.

Таким же образом создаем 3 объекта PierLoader для погрузки 3 типов корабля. Передаем в конструктор наш общий ресурс для потоков под названием Tunnel. И тип корабля, который должен отработать PierLoader.

Далее все это добро мы отдаем классу ExecutorService. Сначала создаем пул потоков для запуска задач. Количество потоков вы определяем при помощи команды Runtime.getRuntime().availableProcessors(). Оно возвращает количество доступных нам потоков в формате int. Не имеет смысла создавать 1000 потоков если у вас доступных ядер только 8. Так как одновременно будут отрабатывать только 8.

Ну вот и все! Многопоточное приложение готово. Можно поиграться с количеством задач для потоков, временем генераций корабликов и загрузки товаром. Результат будет всегда разным.

Заключение


В заключение я хочу посоветовать прочесть книги “Философия Java” Брюса Эккеля. Глава “Параллельное выполнение” и “JAVA. Методы программирования” Н. Блинова. Глава “Потоковое выполнение”. Там описываются основные практики работы с многопоточность на Java и их сложности.

Перед тем как писать многопоточное приложение необходимо нарисовать его на листочке, сделать эскиз своего проекта. Избежать общих ресурсов. Общие ресурсы — это зло. Лучше, что бы потоки не знали и не слышали друг о друге вообще ничего. Пользоваться готовым API, чем делать все своими ручками. Это сократит время разработки и увеличит надежность вашего приложения. Тестировать и профилировать приложение. Возможно Вы найдете еще какой-нибудь bottleneck, который тормозит ваше приложение и не дает раскрыть весь потенциал вашего многопоточного приложения.

Комментарии (7)


  1. Szer
    29.03.2018 20:17

    Работа генератора кораблей не должна зависеть от работы причалов и наоборот

    Оно таки может привести к проблеме fast producer — slow consumer.
    Генератор не должен генерить новые корабли если причалы ещё загружают, а "тунель" полон.


    Есть разные стратегии обработки таких сценариев:
    1) Pull. Туннель будет буфером на 5 кораблей, когда буфер неполон, он сам просит нагенерить ему ещё кораблей, а в это время причалы грузят нонстоп, все довольны.
    Т.е. генератор выступает тут итератором, у которого есть метод — CreateNext()


    2) Drop. Если генератор ну очень хочет генерить корабли постоянно, а тунель не справляется, то можно дропать все корабли сверх лимита. Под словом дропать можно понимать разное: именно что дропать и забывать навсегда, или же поднимать новый "тунель" и засылать в него.


    1. chaknuris Автор
      30.03.2018 05:59

      Оно таки может привести к проблеме fast producer — slow consumer.
      Генератор не должен генерить новые корабли если причалы ещё загружают, а «тунель» полон.

      Генератор в данной реализаций так и реализован, он не генерирует новые кораблики пока место в «туннели» не освободиться. Он засыпает.
      2) Drop. Если генератор ну очень хочет генерить корабли постоянно, а тунель не справляется, то можно дропать все корабли сверх лимита. Под словом дропать можно понимать разное: именно что дропать и забывать навсегда, или же поднимать новый «тунель» и засылать в него.

      Я думаю такой подход не очень эффективен, если поток будет постоянно находиться в состояний RUNNABLE )


      1. Szer
        30.03.2018 09:28

        Генератор в данной реализаций так и реализован, он не генерирует новые кораблики пока место в «туннели» не освободиться. Он засыпает.

        Тогда ваше требование о независимости генератора от туннеля не выполняется.
        И вместо поллинга со стороны генератора лучше сделать pull данных со стороны туннеля.


        Я вообще сам дотнетчик, но знаю что в Scala есть Akka Streams, хз юзабельно оно или нет из Java


        Полезное чтиво:
        https://doc.akka.io/docs/akka/2.5/stream/stream-quickstart.html#back-pressure-in-action


        A typical problem applications (not using Akka Streams) like this often face is that they are unable to process the incoming data fast enough, either temporarily or by design, and will start buffering incoming data until there’s no more space to buffer, resulting in either OutOfMemoryError s or other severe degradations of service responsiveness.


  1. daniillnull
    30.03.2018 06:03

    Вместо написания класса Tunnel можно было использовать любую BlockingQueue из пакета java.util.concurrent (например ArrayBlockingQueue), которая выполняет почти те же задачи.


    1. chaknuris Автор
      30.03.2018 06:06

      Из статьи которая выше:

      Справедливости ради, стоит отметить, что никто не принуждал делать это низкоуровневыми инструментами такими как wait() и notify(), когда можно без великих хитростей использовать ArrayBlockingQueue, Semophore и в целом мощный API(java.util.concurrent).

      Как раз таки ArrayBlockingQueue я использовал в другой моей реализаций.


  1. GritskevichDS
    30.03.2018 11:28
    +1

    Для периодичности исполнения задач (как например в ShipGenerator) лучше использовать ScheduledExecutorService с задержкой вызова в 1 секунду и избавиться от Thread.sleep(1000). Так и код чище и работать будет более грамотно.


    1. chaknuris Автор
      30.03.2018 11:30

      Солидарен с Вами.