Привет, Хабр!

Многопоточность — это не просто возможность приложения выполнять несколько задач одновременно, это его способность делать это эффективно и безопасно. В Java многопоточность неотделима от синхронизации, ведь именно она помогает управлять состоянием разделяемых ресурсов между потоками.

Всё начинается с потребности в быстродействии и масштабируемости. C несколькими потоками можно обрабатывать больше операций одновременно.

В этой статье мы рассмотрим, как синхронизировать потоки в Java.

Синхронизированные блоки

В Java слово synchronized может использоваться для методов и блоков кода. Это некая "база" для корректной работы с общими ресурсами в многопоточной среде.

Пример:

public class Counter {
    private int count = 0;

    public synchronized void increment() {
        count++;
    }

    public synchronized int getCount() {
        return count;
    }
}

Методы increment() и getCount() синхронизированы, что гарантирует, что только один поток может изменять или читать значение count в один момент времени.

Пример синхронизированного блока:

public class Counter {
    private int count = 0;
    private final Object lock = new Object();

    public void increment() {
        synchronized(lock) {
            count++;
        }
    }

    public int getCount() {
        synchronized(lock) {
            return count;
        }
    }
}

Здесь используем синхронизированный блок с явным объектом блокировки lock. Так можно управлять синхронизацией, ограничивая её только нужными частями кода.

Каждый объект в Java имеет связанный с ним монитор. Когда поток входит в синхронизированный блок или метод, он захватывает монитор этого объекта. Если монитор уже занят другим потоком, текущий поток будет ждать его освобождения:

public void addToQueue(String item) {
    synchronized(queue) {
        queue.add(item);
    }
}

В примере, если один поток уже добавляет элемент в очередь queue, другой поток будет ожидать, пока первый не закончит, прежде чем сможет выполнить добавление.

Методы wait(), notify() и notifyAll()

Методы wait(), notify(), и notifyAll() — это инструменты для координации работы между потоками, которые используют общие ресурсы. Они вызываются на объекте класса, который реализует интерфейс Object, и должны использоваться только в синхронизированных блоках или методах.

  • wait() заставляет текущий поток ожидать до тех пор, пока другой поток не вызовет notify() или notifyAll() на том же объекте.

  • notify() пробуждает один случайно выбранный поток, который ожидает на этом объекте.

  • notifyAll() пробуждает все потоки, которые ожидают на этом объекте.

Рассмотрим классический пример производитель-потребитель:

public class Buffer {
    private int contents;
    private boolean available = false;

    public synchronized void put(int value) {
        while (available) {
            try {
                wait();
            } catch (InterruptedException e) { Thread.currentThread().interrupt(); }
        }
        contents = value;
        available = true;
        notifyAll();
    }

    public synchronized int get() {
        while (!available) {
            try {
                wait();
            } catch (InterruptedException e) { Thread.currentThread().interrupt(); }
        }
        available = false;
        notifyAll();
        return contents;
    }
}

ЗдесьBuffer класс хранит одно целочисленное значение. Метод put() ожидает, пока Buffer не будет доступен для записи, и затем добавляет значение. Метод get() ожидает, пока Buffer не будет доступен для чтения, затем возвращает значение и освобождает буфер. wait() вызывается, когда поток должен ждать доступности буфера, а notifyAll() пробуждает все потоки после изменения состояния буфера.

InterruptedException — это исключение, которое выбрасывается, когда другой поток прерывает текущий поток, пока тот находится в состоянии ожидания. Правильный способ обработки этого исключения — это выставить флаг прерывания обратно и завершить выполнение, если это уместно:

try {
    wait();
} catch (InterruptedException e) {
    Thread.currentThread().interrupt(); // восстановить статус прерывания
    return; // и выйти, если поток не может продолжить выполнение
}

Пару советов:

  1. Минимизируйте время блокировки: используйте синхронизированные блоки как можно более короткими.

  2. Избегайте вложенных блокировок: это может привести к взаимоблокировкам.

  3. Используйте отдельные объекты блокировки: для управления доступом к различным частям данных.

  4. Будьте внимательны с условиями ожидания: всегда используйте циклы while, а не условия if, чтобы проверять условие ожидания, потому что поток может проснуться без изменений в условиях.

Присоединение и блокировка потоков

Метод join() позволяет одному потоку ожидать завершения работы другого. Мастхев, когда нужно, чтобы данные, обрабатываемые в одном потоке, были полностью готовы перед выполнением каких-то действий в другом.

Пример использования join():

public class ThreadJoinExample {
    public static void main(String[] args) {
        Thread thread1 = new Thread(() -> {
            System.out.println("Thread 1 running");
            try {
                Thread.sleep(1000); // Подождем 1 секунду
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            System.out.println("Thread 1 finished");
        });

        Thread thread2 = new Thread(() -> {
            System.out.println("Thread 2 running");
            try {
                thread1.join(); // Ожидаем завершения thread1
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            System.out.println("Thread 2 finished");
        });

        thread1.start();
        thread2.start();
    }
}

Здесь thread2 ждет, пока thread1 не завершит свою работу, благодаря вызову join(). Таким образом, сообщение "Thread 2 finished" всегда будет выводиться после "Thread 1 finished".

Тайм-ауты хорошидля предотвращения того, чтобы поток оставался заблокированным вечно, если ожидаемое событие никогда не произойдет. Это важный элемент управления исполнением, который помогает избежать фризов программы.

Пример с тайм-аутом в join():

Thread thread3 = new Thread(() -> {
    try {
        thread1.join(500); // Ожидаем максимум 500 мс
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
    System.out.println("Thread 3 may or may not have waited for Thread 1 to finish");
});
thread3.start();

thread3 ждет завершения thread1 не более 500 миллисекунд. Если thread1 не завершится за это время, thread3 продолжит выполнение.

Метод sleep() применяется для приостановки работы текущего потока на заданный период времени. Он используется для имитации длительных операций, управления частотой выполнения и создания задержек.

Пример использования sleep():

Thread sleeperThread = new Thread(() -> {
    try {
        System.out.println("Sleeper thread going to sleep");
        Thread.sleep(2000); // Спим 2 секунды
    } catch (InterruptedException e) {
        System.out.println("Sleeper thread was interrupted");
        Thread.currentThread().interrupt();
    }
    System.out.println("Sleeper thread woke up");
});
sleeperThread.start();

Здесь поток sleeperThread "спит" 2 секунды, что хорошо, например, для ограничения скорости выполнения цикла обработки данных.

Блокирующие объекты и конструкции синхронизации

Java имеет интерфейс Lock с его замечательной реализацией ReentrantLock. Эти механизмы предлагают большую гибкость по сравнению с традиционным подходом с использованием synchronized. Они позволяют делать попытки захвата блокировок, тайм-ауты блокировок и многие другие операции, которые не поддерживаются synchronized.

Пример использования ReentrantLock:

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class Counter {
    private final Lock lock = new ReentrantLock();
    private int count = 0;

    public void increment() {
        lock.lock();
        try {
            count++;
        } finally {
            lock.unlock();
        }
    }

    public int getCount() {
        lock.lock();
        try {
            return count;
        } finally {
            lock.unlock();
        }
    }
}

Здесь lock() вызывается для захвата блокировки перед началом критической секции, и важно всегда вызывать unlock() в блоке finally, чтобы гарантировать освобождение блокировки даже в случае возникновения исключений.

Condition увеличивает гибкость управления блокировками, позволяя одним потокам приостанавливать себя (ждать), пока другой поток не сообщит о каком-то условии. Это можно сравнить с расширенной версией Object.wait() и Object.notify(), но с возможностью создания множественных условий ожидания на одной блокировке.

Пример с Condition:

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class BoundedBuffer {
    private final Lock lock = new ReentrantLock();
    private final Condition notFull  = lock.newCondition();
    private final Condition notEmpty = lock.newCondition();

    private final Object[] items = new Object[100];
    private int putptr, takeptr, count;

    public void put(Object x) throws InterruptedException {
        lock.lock();
        try {
            while (count == items.length) {
                notFull.await();
            }
            items[putptr] = x;
            if (++putptr == items.length) putptr = 0;
            ++count;
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
    }

    public Object take() throws InterruptedException {
        lock.lock();
        try {
            while (count == 0) {
                notEmpty.await();
            }
            Object x = items[takeptr];
            if (++takeptr == items.length) takeptr = 0;
            --count;
            notFull.signal();
        } finally {
            lock.unlock();
        }
        return x;
    }
}

Реализуем паттерн производитель-потребитель с блокирующей очередью:

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class ProducerConsumerExample {
    private static final BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);

    static class Producer extends Thread {
        public void run() {
            try {
                for (int i = 0; i < 20; i++) {
                    queue.put(i);
                    System.out.println("Produced " + i);
                }
            } catch (InterruptedException ex) {
                ex.printStackTrace();
            }
        }
    }

    static class Consumer extends Thread {
        public void run() {
            try {
                while (true) {
                    Integer item = queue.take();
                    System.out.println("Consumed " + item);
                }
            } catch (InterruptedException ex) {
                ex.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        new Producer().start();
        new Consumer().start();
    }
}

В заключение приглашаем всех желающих на открытые уроки, посвященные Java-разработке:

  • 7 августа: «Reflection API» — познакомимся с механизмом рефлексии в языке Java и посмотрим, где он применяется. Записаться

  • 21 августа: «Обобщения в Java» — изучим, для чего они нужны; где они применяются в стандартной Java библиотеке; а также как их можно использовать в своем коде. Записаться

Комментарии (9)


  1. martyncev
    28.07.2024 08:09
    +6

    И ни слова ни про ReadWriteLock, StampedLock, CountDownLatch... и много чего другого полезного и вкусного.
    Ну и блокируемую очередь обычно делают через LinkedBlockingQueue.


    1. rpc1
      28.07.2024 08:09

      И про Loom ни слова


    1. ermadmi78
      28.07.2024 08:09
      +4

      ИМХО. Вполне приличная статья про основы синхронизации потоков в Java.

      Это замечательно, что вы знаете про ReadWriteLock, StampedLock, CountDownLatch. Но, если говорить про асинхронность, многопоточность и JMM, то я к вашему списку могу добавить еще несколько десятков "баззвордов". Только стоит ли их все упоминать в статье? Получится сборная солянка, которая окончательно запутает новичка, и не позволит ему усвоить основы.


    1. DenSigma
      28.07.2024 08:09

      Статья-то от Отуса. У них на курсах все это есть (полагаю). Не будут же они все выкладывать здесь.


  1. nv13
    28.07.2024 08:09
    +3

    Про быстродействие неплохо бы упомянуть, иначе зачем такой зоопарк


  1. vasyakolobok77
    28.07.2024 08:09
    +2

    В свете релиза виртуальных потоков стоило отметить, что подходов к synchronized стоит избегать и отдавать предпочтение ReentrantLock. Кроме того ReentrantLock / Condition имеют куда более гибкий и читаемый api.


    1. Shatun
      28.07.2024 08:09

      В свете релиза виртуальных потоков стоило отметить, что подходов к synchronized стоит избегать и отдавать предпочтение ReentrantLock

      Недавно пофиксили проблему с synchronized блоками, правда в 23 джаву не вошло. Надеюсь к 25 версии(лтс) увтдеть в основной ветке


    1. cpud47
      28.07.2024 08:09

      Не очень понимаю, почему. У Вас же тогда может быть активны две критических секции одновременно: на разный нитях одного потока ОС. Или я Вас неправильно понимаю?


  1. kmatveev
    28.07.2024 08:09
    +2

    Ловить в библиотечном коде InterruptedException и вызывать в обработчике interrupt() - хреновая идея. Его вообще не надо ловить. Если уж дошло до того, что блокирующую операцию хотят прервать, то её нужно прервать, и выйти из метода, внутри которого это случилось, позволив вылететь этому InterruptedException из метода. В последнем примере используется BlockingQueue, она так и делает. Не библиотечный код, а приложение должно решать, что делать в случае прерывания операции.