Привет, Хабр! Это первая часть обзора по изучению существенных тем Java-стека и туториала по их применению. В этой статье вы найдете важные аспекты многопоточности в Java, а также полезные шпаргалки с практическими примерами.

Меня зовут Влад, я Java-разработчик в компании SimbirSoft. Надеюсь, что моя статья будет полезна как начинающим, так и продвинутым Java-специалистам, которые заинтересованы в прокачке своих знаний для успешной подготовки к собеседованиям на боевые проекты.

Прежде чем мы приступим к разбору нашей темы, давайте вспомним, что такое многопоточность.

Многопото́чность (англ. Multithreading) — свойство платформы (например, операционной системы, виртуальной машины и т. д.) или приложения, состоящее в том, что процесс, порождённый в операционной системе, может состоять из нескольких потоков, выполняющихся «параллельно», то есть без предписанного порядка во времени. При выполнении некоторых задач такое разделение может достичь более эффективного использования ресурсов вычислительной машины.

В Java многопоточность также играет значимую роль, поэтому давайте изучим ее немного подробнее.

Зачем нужны потоки?

Многопоточное программирование является одним из ключевых аспектов разработки приложений на языке Java. Эффективное использование потоков позволяет достичь высокой производительности, улучшить отзывчивость приложения и эффективно использовать ресурсы процессора. Мы погрузимся с вами в мир многопоточности в Java, начиная с основных понятий и заканчивая практическими примерами и рекомендациями по использованию.

В первой части статьи мы рассмотрим различные способы создания потоков в Java, обсудим основные проблемы, с которыми может столкнуться Java-разработчик при работе с многопоточностью, например, дедлоки и состязания за данные. Кроме того, мы изучим преимущества и недостатки использования потоков в Java, рассмотрим различные механизмы синхронизации – synchronized и volatile, а также их роль в обеспечении безопасности работы с общими ресурсами.

Основные причины применения многопоточности

1. Повышение производительности. Разделение задач на потоки позволяет эффективнее использовать ресурсы процессора и ускоряет выполнение вычислительных задач.

2. Отзывчивость приложения. Задачи, требующие времени, такие как загрузка данных или выполнение сложных вычислений, могут быть вынесены в отдельные потоки, что не блокирует основной поток и обеспечивает отзывчивость приложения.

3. Улучшенный пользовательский опыт. Параллельное выполнение задач позволяет создавать более отзывчивые и интерактивные пользовательские интерфейсы.

Преимущества использования многопоточности в Java

1. Конкурентность. Многопоточные приложения могут эффективно конкурировать за ресурсы и выполнять задачи параллельно, что улучшает общую производительность.

2. Масштабируемость. Путем добавления дополнительных потоков можно легко масштабировать приложение для работы с более сложными задачами и большими объемами данных.

3. Улучшенная отказоустойчивость. Многопоточные приложения могут быть более устойчивыми к отказам, так как сбой в одном потоке не обязательно приведет к сбою всего приложения.

Основные понятия многопоточности

В основе многопоточности лежат два ключевых понятия — потоки и процессы, давайте их рассмотрим:

Процесс — это экземпляр выполняющейся программы. Каждый процесс обладает своим собственным адресным пространством и ресурсами, такими как файлы и сетевые соединения. По сути это изолированный контейнер, в котором выполняется код программы.

Поток (или поток выполнения) — более легковесная единица выполнения, существующая внутри процесса. В одном процессе может существовать несколько потоков, которые могут параллельно выполнять инструкции программы. В отличие от процессов, потоки внутри одного процесса совместно используют его ресурсы.

Есть и другие варианты решения проблем взаимной блокировки и состояния гонки, которые также используют несколько фундаментальных понятий. Поэтому давайте проясним эти понятия.

Синхронизация — это процесс управления одновременным выполнением нескольких потоков или процессов с целью предотвращения возможных конфликтов и проблем, связанных с одновременным доступом к общим ресурсам.

Монитор — это механизм синхронизации, связанный с каждым объектом. Он обеспечивает взаимное исключение и контроль доступа к общим ресурсам. Только один поток может находиться в мониторе объекта в любой момент времени. Мониторы используются для предотвращения состояний гонки и обеспечивают безопасное взаимодействие между потоками.

Критическая секция — участок кода, в котором выполняются операции над общими ресурсами. Внутри критической секции поток захватывает монитор объекта, чтобы гарантировать взаимное исключение.

Synchronized — это ключевое слово в Java, которое используется для синхронизации потоков. Когда метод или блок кода помечается как synchronized, это означает, что только один поток может выполнять данный метод или блок кода на объекте в определенный момент времени.

Возьмем тот же пример из блока про Race Condition (см. ниже) и добавим к методу инкремент ключевое слово synchronized, сравним результаты:

То есть, с использованием synchronized мы получаем ожидаемый результат => мы решили проблему Race Condition.

Volatile – это ключевое слово, использующееся для переменных в Java. Когда переменная объявлена как volatile, это гарантирует, что операции чтения и записи к этой переменной выполняются атомарно (в одну операцию), и также гарантирует, что изменения переменной видны всем потокам.

Например: private volatile boolean flag = false;

Если один поток изменяет значение flag, это изменение сразу же становится видимым другим потокам. Без ключевого слова volatile, изменения могли бы задерживаться из-за кэширования переменных потоками.

Жизненный цикл потоков в Java

Потоки в Java проходят пять основных стадий в течение своего существования:

1. Создание (New)

На этой стадии поток только что был создан, но еще не запущен. Чтобы его создать, необходимо сформировать объект класса Thread и, при необходимости, передать ему объект, реализующий интерфейс Runnable.

2. Готовность (Runnable)

Поток переходит в состояние готовности, когда вызывается метод start(). Но его финальную готовность определяет система планирования операционной системы — она решает, когда поток будет выделен для выполнения.

3. Выполнение (Running)

Когда поток получает выделение от планировщика операционной системы, он переходит в состояние выполнения. В этот момент код, содержащийся в методе run(), начинает выполняться.

4. Ожидание (Blocked/Waiting)

Поток может временно оказаться в состоянии ожидания по различным причинам, например, ожидание блокировки (Blocked) или ожидание события (Waiting). Это происходит, когда поток ожидает выполнения какого-то условия или ресурса.

5. Завершение (Terminated)

Когда выполнение метода run() завершается, поток переходит в состояние завершения. Это может произойти при естественном завершении выполнения кода или в случае вызова метода stop(). После завершения выполнения поток больше не может быть запущен.

Как потоки используют память

В Java каждый поток выполняется в собственном стеке вызовов, который содержит локальные переменные и данные, относящиеся к выполнению методов. Этот стек является приватной областью памяти для каждого потока. Однако все потоки в одном процессе совместно используют общую кучу (heap), где располагаются объекты и массивы.

Куча предназначена для хранения динамически создаваемых объектов, и к ней может обращаться более одного потока – это дает возможность обмена данными между потоками через общие объекты в куче. Однако для обеспечения безопасности и предотвращения гонок данных необходимо правильно синхронизировать доступ к этим общим ресурсам.

Потоки могут обмениваться данными через общие переменные, поля объектов или другие структуры данных, расположенные в куче. Ключевые слова, такие как synchronized и volatile, предоставляют средства синхронизации для обеспечения корректного доступа к общим ресурсам.

Существует также механизм Thread-Local Storage (TLS), который позволяет каждому потоку иметь свой собственный набор данных, независимо от других потоков. Это может быть полезным, когда требуется изолировать данные между потоками.

Виды потоков

В Java потоки могут быть разделены на два основных типа: демон-потоки и не-демон-потоки. Этот подход помогает управлять выполнением программы и завершением приложения в зависимости от активности потоков.

1. Демон-потоки

Демон-потоки работают в фоновом режиме и предназначены для выполнения служебных задач, не являющихся критически важными для основной логики программы. Когда все не-демон-потоки завершают свою работу, JVM автоматически завершает выполнение программы, независимо от того, завершились демон-потоки или нет.

2. Не-демон-потоки

Не-демон-потоки считаются основными потоками выполнения. Если есть хотя бы один не-демон-поток, JVM будет ждать завершения всех не-демон-потоков перед завершением выполнения программы.

Когда запускается новый поток, он по умолчанию является не-демон-потоком. Вы можете изменить его статус с помощью метода setDaemon(boolean on):

Thread myThread = new Thread(new MyRunnable());
myThread.setDaemon(true); // Установка демон-статуса
myThread.start();

Важно отметить, что управление потоками демон/не-демон имеет смысл только при запуске нового. Если поток уже запущен, изменение его демон-статуса не оказывает влияния.

Пример использования демон-потока может быть в следующих сценариях:

  • Задачи фонового мониторинга.

  • Автоматическая очистка ресурсов.

  • Поддержание актуальности данных в фоновом режиме.

Виртуальные потоки:

Начиная с 16-й версии в Java появились виртуальные потоки. Давайте сравним их с традиционными:

  • Виртуальные потоки являются более легковесными, потому что они не привязаны к нативным потокам ОС, в то время как обычные потоки тяжеловесные, потому что каждый обычный поток соответствует нативному потоку внутри процесса.

  • Виртуальные потоки управляются и планируются JVM без привязки к ОС, в то время как обычные потоки создаются и управляются ОС.

  • Создание и управление виртуальных потоков эффективнее, потому что они не требуют создания и управления нативными потоками. Обычные же потоки управляются именно через нативные, следовательно, им требуется больше системных ресурсов.

  • Виртуальные потоки обычно применяются в Project Loom, тогда как традиционные  —  в обычной Java. 

Способы создания и запусков потоков

Есть несколько способов, позволяющих создавать и запускать потоки в приложениях на языке Java. Каждый из этих подходов имеет свои особенности и преимущества, которые важно учитывать при выборе подходящего метода для конкретной задачи. Давайте познакомимся с каждым видом подробнее и рассмотрим их плюсы и минусы.

 1. Класс Thread

Класс java.lang.Thread в Java представляет собой API для работы с потоками, управляемыми Java Virtual Machine (JVM) и операционной системой. Важно понимать, что сами экземпляры класса Thread не являются фактическими потоками, а скорее предоставляют интерфейс для управления потоками.

Когда Java Virtual Machine запускается, создается главный поток с именем main, который исполняет метод main-класса. Помимо главного потока могут существовать и другие например, служебные или демон-потоки, создаваемые JVM.

Из JavaDoc класса Thread видно, что при старте JVM обычно существует единственный не-демон-поток (non-daemon thread). Не-демон-потоки не позволяют JVM завершить свою работу, пока они не завершат свою. Таким образом, главный поток по умолчанию является не-демон-потоком.

Пример создания потока с использованием класса Thread:

public class MyThread extends Thread {
    public void run() {
        // Код, который будет выполняться в новом потоке
    }
 
    public static void main(String[] args) {
        MyThread myThread = new MyThread();
        myThread.start(); // Запуск нового потока
    }
}

Здесь MyThread является наследником класса Thread и метод run содержит код, который будет выполнен в новом потоке при вызове метода start.

Рассмотрим некоторые методы класса Thread. С методами start и run вы уже знакомы, поэтому их пропустим.

Метод sleep(long millis): приостанавливает выполнение потока на указанное количество миллисекунд.

Пример:

public class SleepExample {
    public static void main(String args[]) {
        try {
            System.out.println("Thread will sleep for 3 seconds");
            Thread.sleep(3000);
            System.out.println("Thread woke up after 3 seconds");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

Метод join(): блокирует выполнение, пока поток, на котором вызван метод, не завершится. 

Пример:

class MyThread extends Thread {
    public void run() {
        for (int i = 0; i < 5; i++) {
            System.out.println(Thread.currentThread().getId() + " Value " + i);
        }
    }
}
 
public class JoinExample {
    public static void main(String args[]) {
        MyThread t1 = new MyThread();
        t1.start();
 
        try {
            t1.join(); // ждем, пока поток t1 завершит свою работу
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
 
        System.out.println("Main thread finished");
    }
}


Метод isAlive(): Возвращает true, если поток еще выполняется, и false, если поток завершен.

Пример:

class MyThread extends Thread {
    public void run() {
        for (int i = 0; i < 5; i++) {
            System.out.println(Thread.currentThread().getId() + " Value " + i);
        }
    }
}
 
public class IsAliveExample {
    public static void main(String args[]) {
        MyThread t1 = new MyThread();
        t1.start();
 
        System.out.println("Thread is alive: " + t1.isAlive());
        
        try {
            t1.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
 
        System.out.println("Thread is alive: " + t1.isAlive());
    }
}

Метод setName(String name) и getName(): устанавливает или возвращает имя потока. 

Пример:

public class NameExample {
    public static void main(String args[]) {
        Thread t1 = new Thread(() -> {
            System.out.println("Thread name: " + Thread.currentThread().getName());
        });
 
        t1.setName("MyThread");
        t1.start();
    }
}

Метод setPriority(int priority) и getPriority(): устанавливает или возвращает приоритет потока. Приоритет может быть в диапазоне от Thread.MIN_PRIORITY (1) до Thread.MAX_PRIORITY (10), средний — Thread.NORM_PRIORITY (5).

Пример:

public class PriorityExample {
    public static void main(String args[]) {
        Thread t1 = new Thread(() -> {
            System.out.println("Thread priority: " + Thread.currentThread().getPriority());
        });
 
        t1.setPriority(Thread.MAX_PRIORITY);
        t1.start();
    }
}

Метод yield() позволяет текущему работающему потоку передать управление другим потокам. Текущий поток сообщает планировщику, что ему не нужно столько процессорного времени и он готов поделиться.

Пример:

class MyThread extends Thread {
    public void run() {
        for (int i = 0; i < 5; i++) {
            System.out.println(Thread.currentThread().getId() + " Value " + i);
            // Передаем управление другим потокам
            Thread.yield();
        }
    }
}
 
public class YieldExample {
    public static void main(String args[]) {
        MyThread t1 = new MyThread();
        MyThread t2 = new MyThread();
 
        t1.start();
        t2.start();
    }
}

2. Интерфейс Runnable

Второй способ создания потока — имплементирование интерфейса Runnable.
Этот метод имеет ряд преимуществ по сравнению с созданием потока через Thread:

1. Гибкость в наследовании

Классы в Java могут наследоваться только от одного. Если вы уже наследуетесь от какого-то класса, использование Runnable позволяет вам добавить многопоточность, не теряя возможности наследования от другого класса.

2. Легкая реорганизация кода

Если вам потребуется в будущем изменить структуру вашего класса, использование Runnable позволит легко интегрировать многопоточность без необходимости изменения иерархии наследования.

3. Разделение задач:

Интерфейс Runnable позволяет разделить задачу (логику потока) от средств выполнения (самого потока), что способствует четкому разграничению ответственности и улучшает читаемость и поддерживаемость кода. Это помогает придерживаться букве S из принципов SOLID.

4. Повышение многократного использования:

Классы, реализующие интерфейс Runnable, могут быть повторно использованы в различных контекстах, таких как исполнители (Executor), без привязки к специфическим деталям реализации потока.

Пример использования Runnable:

class MyRunnable implements Runnable {
    public void run() {
        // Код, который будет выполнен в новом потоке
    }
}
 
public class Main {
    public static void main(String[] args) {
        MyRunnable myRunnable = new MyRunnable();
        Thread thread = new Thread(myRunnable);
        thread.start(); // Запуск нового потока
    }
}

3. Интерфейс Callable

Как сказано в документации о Callabe:

public interface Callable<V>

A task that returns a result and may throw an exception. Implementors define a single method with no arguments called call.

The Callable interface is similar to Runnable, in that both are designed for classes whose instances are potentially executed by another thread. A Runnable, however, does not return a result and cannot throw a checked exception.

Таким образом, интерфейсы Callable и Runnable используются для выполнения кода в отдельном потоке, но есть различия в том, как они предоставляют результат выполнения и обрабатывают исключения.

4. Сравнение Runnable и Callable

Интерфейс Runnable

1. Возвращаемое значение. Runnable не предоставляет способа возвращать результат выполнения задачи.

2. Исключения. Runnable не бросает проверяемые исключения (за исключением тех, которые являются подклассами RuntimeException).

Интерфейс Callable

1. Возвращаемое значение: Callable предоставляет возможность возвращать результат выполнения задачи.

2. Исключения: Callable позволяет бросать проверяемые исключения.

3. Метод: Он содержит метод call(), который аналогичен методу run() в интерфейсе Runnable, но может возвращать значение и бросать проверяемые исключения.

3.1 Future

Благодаря этому интерфейсу можно взаимодействовать с результатами асинхронных вычислений. Future показывает результат выполнения асинхронной операции и предоставляет методы для проверки завершения операции, ожидания её завершения и извлечения результата.

В интерфейсе Future определены следующие основные методы:

  • boolean isDone(): Возвращает true, если операция завершена, то есть, готов результат.

  • boolean isCancelled(): Возвращает true, если операция была отменена до завершения.

  • boolean cancel(boolean mayInterruptIfRunning): Пытается отменить выполнение задачи. Параметр mayInterruptIfRunning указывает, может ли быть прерван поток, выполняющий задачу.

  • V get(): Возвращает результат выполнения задачи. Если задача еще не завершена, вызывающий поток блокируется до её завершения.

  • V get(long timeout, TimeUnit unit): Возвращает результат выполнения задачи, ожидая указанное количество времени. Если результат не готов в течение указанного времени, генерируется исключение TimeoutException.

Пример использования Future с Callable:

import java.util.concurrent.*;


public class FutureExample {
    public static void main(String[] args) {
        // Создаем пул потоков
        ExecutorService executor = Executors.newFixedThreadPool(1);
        // Создаем объект Callable
        Callable<Integer> callableTask = () -> {
            System.out.println("Выполняется в отдельном потоке");
            // Какая-то вычислительная работа
            return 42;
        };
        // Передаем Callable задачу в ExecutorService
        Future<Integer> future = executor.submit(callableTask);
        try {
            // Делаем что-то другое в основном потоке
            // Ждем завершения выполнения и получаем результат
            Integer result = future.get();
            System.out.println("Результат: " + result);
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
        // Завершаем ExecutorService
        executor.shutdown();
    }
}

Этот пример был выполнен с помощью интерфейса Callable для создания асинхронной задачи, а затем ее результат получили с применением Future, который используется для ожидания и получения итога выполнения задачи в асинхронном режиме. Метод get() блокирует вызывающий поток, пока результат не будет готов.

 3.2 CompleteableFuture

CompletableFuture — это расширение интерфейса Future, введенное в Java 8, которое предоставляет более мощные и гибкие возможности для работы с асинхронными операциями. Этот класс позволяет комбинировать, композировать и управлять несколькими асинхронными операциями, обрабатывать их результаты и управлять потоком выполнения. В отличие от базового интерфейса Future, CompletableFuture предоставляет широкий набор методов для выполнения различных операций над результатами асинхронных задач.

Главное отличие между Future и CompletableFuture заключается в их функциональности. У последнего более высокий уровень абстракции, он поддерживает сложные сценарии работы с асинхронными операциями, включая комбинацию результатов, обработку исключений и управление потоками выполнения. В то время как Future может быть достаточен для простых асинхронных операций, у CompletableFuture более гибкие возможности для сложных сценариев.

5. Executors

В Java пакет java.util.concurrent имеется удобный способ управления потоками через интерфейсы Executor, ExecutorService и фабрику Executors. Давайте рассмотрим их основные концепции и методы создания потоков:

1. Executor. Это простой интерфейс с единственным методом execute (Runnable command). Он предназначен для выполнения переданного объекта Runnable, но не предоставляет методов для управления и контроля выполнения задач.

2. ExecutorService. Это многофункциональный интерфейс, расширяющий Executor. Предоставляет более продвинутый способ управления выполнением задач и включает методы управления жизненным циклом потока. Некоторые из основных методов ExecutorService:

  • submit(Callable<T> task). Подает задачу на выполнение и возвращает Future, который предоставляет доступ к результату выполнения задачи и позволяет управлять её состоянием.

  • submit(Runnable task). Подает задачу на выполнение и возвращает Future, который связан с переданной задачей, но не возвращает результат.

  • shutdown(). Завершает выполнение задач, но дает возможность уже поставленным задачам завершить выполнение.

  • shutdownNow(). Пытается прервать выполнение всех активных задач и возвратить список ожидающих выполнения задач.

3. Executors. Это фабрика для создания различных типов исполнителей (Executor и ExecutorService), где есть методы для создания исполнителей с различными характеристиками.

  • newFixedThreadPool(int nThreads). Создает пул потоков с фиксированным числом потоков.

  • newCachedThreadPool(). Создает пул потоков, который автоматически масштабируется в зависимости от нагрузки.

  • newSingleThreadExecutor(). Создает исполнителя с единственным потоком.

Важно отметить, что ExecutorService использует механизм управления задачами через блокирующую очередь, что гарантирует эффективную и безопасную передачу задач между потоками. ExecutorService использует BlockingQueue для управления задачами.

BlockingQueue – это интерфейс, содержащий методы для добавления и удаления элементов из очереди. Он расширяет интерфейс Queue и включает в себя методы, блокирующие выполнение потока при попытке добавления или удаления элементов из пустой или полной очереди.

Пример использования ExecutorService:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
 
public class ExecutorServiceExample {
    public static void main(String[] args) {
        // Создаем исполнителя с фиксированным числом потоков (например, 5)
        ExecutorService executorService = Executors.newFixedThreadPool(5);
 
        // Подаем задачу на выполнение
        executorService.submit(() -> {
            System.out.println("Task executed by thread: " + Thread.currentThread().getName());
        });
 
        // Завершаем выполнение задач
        executorService.shutdown();
    }
}

6. ForkJoinPool

Это особый тип пула потоков в Java, предназначенный для выполнения рекурсивных задач и разбивающий их на более мелкие. Он введен в Java 7 и базируется на идее «разделяй и властвуй» (divide and conquer). ForkJoinPool используется в сочетании с фреймворком Fork/Join (java.util.concurrent.ForkJoinTask и java.util.concurrent.RecursiveTask).

Основные концепции ForkJoinPool:

  • Разделение (Forking). Задача разбивается на более мелкие подзадачи.

  • Выполнение (Joining). Подзадачи выполняются параллельно в пуле потоков и их результаты объединяются в результат исходной задачи.

Пример:

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;


public class Main {


    public static void main(String[] args) {
        // Создаем экземпляр ForkJoinPool с указанием параллелизма
        ForkJoinPool forkJoinPool = new ForkJoinPool(Runtime.getRuntime().availableProcessors());


        // Создаем экземпляр задачи, которая будет выполняться параллельно
        MyRecursiveAction action = new MyRecursiveAction(24);


        // Запускаем задачу в пуле ForkJoinPool
        forkJoinPool.invoke(action);
    }


    // Рекурсивная задача, которая просто выводит информацию о потоке
    static class MyRecursiveAction extends RecursiveAction {
        private final int workload;


        MyRecursiveAction(int workload) {
            this.workload = workload;
        }


        @Override
        protected void compute() {
            // Если работа мала, выполнить ее
            if (workload <= 2) {
                System.out.println("Workload is small enough to be processed by thread: " + Thread.currentThread().getName());
            } else {
                // Если работа большая, разбить ее на подзадачи
                int split = workload / 2;
                MyRecursiveAction leftChild = new MyRecursiveAction(split);
                MyRecursiveAction rightChild = new MyRecursiveAction(split);


                // Запуск подзадач параллельно
                leftChild.fork();
                rightChild.fork();


                // Дождаться завершения подзадач и объединить результаты
                leftChild.join();
                rightChild.join();
            }
        }
    }
}

В методе main создается экземпляр ForkJoinPool, который будет управлять выполнением ForkJoin-задач. Ему передается количество доступных процессоров, чтобы он мог определить, сколько потоков использовать.

Создается экземпляр MyRecursiveAction, который будет выполняться параллельно в пуле потоков. В данном случае мы указываем количество работы равное 24.

С помощью метода invoke() пул запускает выполнение задачи.

Класс MyRecursiveAction представляет собой рекурсивную задачу.

В методе compute() проверяется размер работы. Если он меньше или равен 2, работа выполняется непосредственно в текущем потоке и выводится сообщение о том, какой поток выполняет эту работу.

Если работа больше, она разбивается на две подзадачи, которые запускаются параллельно методом fork(). Затем поток, запустивший подзадачи, дожидается их завершения с помощью метода join().

Результат:

7. Parallel Stream

Parallel Stream это один из механизмов параллельной обработки данных в Java, введенный в Java 8 вместе с функциональным интерфейсом java.util.stream.Stream.

Параллельные потоки (Parallel Stream) позволяют автоматически распараллеливать выполнение операций над элементами потока данных. Parallel Stream помогает использовать многопоточность без явного создания и управления потоками. Просто указывая, что вы хотите выполнить поток параллельно, библиотека сама разделит задачи между несколькими потоками.

Например:

import java.util.Arrays;
import java.util.List;

public class Main {
    public static void main(String[] args) {
        // Создаем список данных
        List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

        // Создаем параллельный поток и применяем операцию фильтрации
        numbers.parallelStream()
            .filter(n -> n % 2 == 0) // фильтруем только четные числа
            .map(n -> n * n) // возводим в квадрат
            .forEach(System.out::println); // выводим результаты
    }
}

Результат:

36
100
64
16
4

Можно было написать вместо numbers.parallelStream()numbers.stream().parallel() и получить тот же результат, так в чем разница? 

Метод parallelStream() является частью интерфейса Collection, позволяющий вызывать параллельный поток на основе коллекции.

Метод parallel() – часть интерфейса BaseStream, который предоставляет общие операции над потоками, не ограничиваясь только коллекциями. Вызывая stream() для получения потока данных, мы можем использовать parallel() для переключения на параллельный режим обработки данных. Такой подход более общий и может быть применен к любому потоку данных, а не только к коллекциям.

8. Atomic

Атомарность, пакет java.util.concurrent.atomic.

В многопоточной программе атомарность гарантирует, что выполнение определенной операции будет происходить как единое, неделимое действие. Если операция является атомарной, то она либо выполняется полностью, либо не выполняется вообще. Атомарность важна в контексте многопоточности, чтобы избежать состояний гонки, когда несколько потоков могут одновременно изменять общие ресурсы.

java.util.concurrent.atomic в Java – небольшой инструментарий классов для реализации потокобезопасного программирования без использования блокировок на отдельных переменных. В основе этих классов лежит идея волатильных значений, полей и элементов массива, которые также предоставляют атомарную условную операцию обновления в форме:

boolean compareAndSet(expectedValue, updateValue);

Этот метод (типы аргументов могут различаться в разных классах) атомарно устанавливает переменную в значение updateValue, если она в данный момент имеет значение expectedValue, возвращая true в случае успеха. Классы в этом пакете также содержат методы для получения и безусловной установки значений, а также более слабую условную атомарную операцию обновления weakCompareAndSet.

Некоторые из основных классов в пакете java.util.concurrent.atomic включают в себя:

1. AtomicInteger. Обеспечивает атомарные операции для переменных типа int.

2. AtomicLong. Предоставляет атомарные операции для переменных типа long.

3. AtomicBoolean. Позволяет безопасно выполнять атомарные операции над переменными типа boolean.

4. AtomicReference. Позволяет проводить атомарные операции с переменными типа ссылки.

5. AtomicIntegerArray, AtomicLongArray, AtomicReferenceArray. Предоставляют атомарные операции для соответствующих массивов.

Механизм CAS

CAS (Compare-And-Swap) – это механизм, используемый в многопоточном программировании для атомарного обновления значения переменной. Он обеспечивает безопасную операцию записи, которая выполняется только в том случае, если значение переменной соответствует ожидаемому.

Основные компоненты CAS:

  • Сравнение (Compare). Сравнение текущего значения переменной с ожидаемым значением.

  • Замена (And-Swap). Если сравнение успешно, происходит замена значения переменной на новое.

Механизм CAS широко используется в алгоритмах многопоточной синхронизации и в классах из пакета java.util.concurrent.atomic. С ним можно избежать использования явных блокировок и улучшить эффективность в средах с высоким параллелизмом.

9. RxJava

RxJava – библиотека для композиции асинхронных и событийно-ориентированных программ, основанная на концепциях реактивного программирования. Это набор инструментов и типов данных для работы с потоками данных и событиями. В некотором смысле RxJava напоминает асинхронную версию Stream API. Оба API дают возможность работать с последовательностями данных (или потоками данных), применять операции к этим данным и выполнять их.

Главные отличия между RxJava и Stream API:

  1. Подход к обработке данных

    1. Стримы (Stream) в Java представляют собой pull-based последовательности данных, в то время как Observable в RxJava является push-based, означающее, что данные активно отправляются в подписчика.

    2. Стрим используют только один раз, а Observable можно подписывать несколько раз.

  2. Многопоточность

    1. Для работы со стримами в Java предусмотрен механизм параллельной обработки (parallel()), который автоматически использует фреймворк fork-join для распараллеливания операций. Однако он не позволяет явно указывать пул потоков.

    2. В RxJava для работы с многопоточностью используются Scheduler, помогающие явно контролировать на каком потоке выполнять операции (subscribeOn() и observeOn()).

  3. Операции и управление временем

    RxJava предоставляет более широкий набор операций для работы с потоками данных и событиями, а также операции, связанные со временем, например, interval() и window(), которых нет в Stream API.

  4. Управление ресурсами

    Observable в RxJava предлагает удобные средства для управления ресурсами с помощью using(), что позволяет автоматически освобождать ресурсы после завершения подписки. В Stream API подобного функционала нет, и управление ресурсами может быть более ручным.

  5. Создание и конструирование:

    Создание Observable в RxJava более гибкое и удобное, чем создание Stream в Java, особенно в отношении обработки ошибок и работы с асинхронными операциями.

Таким образом, хотя оба API предоставляют средства для работы с последовательностями данных, RxJava предлагает более мощные инструменты для работы с асинхронными операциями, управлением потоками данных и обработкой событий.

Поэтому давайте рассмотрим несколько ключевых концепций RxJava:

  1. Observable – это класс, представляющий источник данных в RxJava. Он может испускать ноль или более элементов и уведомлять подписчиков об ошибках или завершении. Кроме того, Observable содержит методы для создания, преобразования и комбинирования потоков данных.

  2. Observer – подписчик на Observable, который потребляет данные и реагирует на них. Когда Observer подписывается на Observable, последний вызывает метод onSubscribe(Disposable) для предоставления подписчику объекта Disposable, который позволяет отменить подписку. Затем Observable может вызывать методы onNext (T), onComplete() или onError(Throwable) у Observer в зависимости от событий.

  3. Operator – функция, которая принимает один Observable (исходный) и возвращает другой Observable (результат). Операторы дают возможность преобразовывать, фильтровать, комбинировать и управлять потоками данных в RxJava.

  4. Schedulers – позволяют управлять тем, на каких потоках выполняются операции Observable и Observer. В RxJava есть различные планировщики (Schedulers), такие как io(), computation(), newThread(). С их помощью можно выполнять операции на определенных потоках.

  5. Subject – объект, который одновременно является и подписчиком, и источником данных в RxJava. Он может подписываться на несколько Observable и переотправлять события от них своим подписчикам. Это удобно для создания моста между несколькими Observable и подписчиками.

Для работы с RxJava надо добавить зависимость в проект:

<dependency>
	<groupId>io.reactivex.rxjava3</groupId>
    <artifactId>rxjava</artifactId>
    <version>3.1.8</version>
<dependency>

На практике RxJava можно использовать так:

import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.core.Observer;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.schedulers.Schedulers;


public class Main {
    public static void main(String[] args) throws InterruptedException {
        // Создаем Observable, который испускает числа от 1 до 5
        Observable<Integer> observable = Observable.range(1, 5);


        // Создаем Observer
        Observer<Integer> observer = new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("onSubscribe");
            }


            @Override
            public void onNext(Integer value) {
                System.out.println("onNext: " + value);
            }


            @Override
            public void onError(Throwable e) {
                System.err.println("onError: " + e.getMessage());
            }


            @Override
            public void onComplete() {
                System.out.println("onComplete");
            }
        };


        // Подписываем Observer на Observable
        observable
            .subscribeOn(Schedulers.io()) // Выполнение на отдельном потоке
            .observeOn(Schedulers.single()) // Результаты обработки на одном потоке
            .subscribe(observer);
    }
}

Программа создает поток данных с помощью RxJava. Код использует метод Observable.range(1, 5), чтобы создать поток чисел от 1 до 5. Затем создается объект Observer, который будет подписан на этот поток данных. Далее выполняется подписка на Observable с помощью метода subscribe(observer). Этот метод запускает выполнение потока данных.

Давайте запустим и посмотрим на результат:

onSubscribe


Process finished with exit code 0

Получаем onSubscribe вместо ожидаемого:

onSubscribe
onNext: 1
onNext: 2
onNext: 3
onNext: 4
onNext: 5
onComplete

Почему так происходит? 

Причина в том, что программа завершилась до того, как Observable успел закончить выпуск всех элементов. RxJava работает асинхронно, поэтому выполнение главного потока не ожидает завершения работы с Observable. Когда вы вызываете observable.subscribe(observer), это создает цепочку операций, которые работают асинхронно на различных потоках (как указано с помощью subscribeOn(Schedulers.io()) и observeOn(Schedulers.single())). В результате главный поток продолжает выполняться независимо от этих асинхронных операций. После вызова subscribe() программа продолжает выполнение, и, если нет других инструкций или блокировок, главный поток завершится, не дожидаясь завершения работы Observable. Во избежание такой ситуации, вы можете использовать методы Thread.sleep() или CountDownLatch, чтобы блокировать главный поток и дать Observable достаточно времени для завершения испускания элементов. 

Давайте добавим засыпание потока на секунду и перезапустим программу:

observable
	.subscribeOn(Schedulers.io())
	.observeOn(Schedulers.single())
	.subscribe(observer);

Thread.sleep(1000);

Получим ожидаемый результат:

onSubscribe
onNext: 1
onNext: 2
onNext: 3
onNext: 4
onNext: 5
onComplete

Рекомендации по выбору способа создания потоков

Thread:

Вы можете использовать класс Thread, когда требуется базовая функциональность создания потока и нет необходимости в дополнительных возможностях управления и синхронизации.

ForkJoinPool:

Для параллельного выполнения независимых подзадач и управления пулом потоков, рекомендуется использовать ForkJoinPool. Он особенно полезен для высокоуровневого параллельного программирования.

Runnable:

Используйте интерфейс Runnable для создания потоков, если необходимо выполнить асинхронную задачу без возвращаемого значения. Этот подход обеспечивает многократное использование кода и отделение логики потока от его управления.

ParallelStream:

При обработке больших объемов данных и выполнении операций над ними, рассмотрите параллельные потоки данных (ParallelStream). Они автоматически распределят выполнение операций между доступными потоками и ускорят обработку данных.

Callable:

Если нужно получить результат выполнения задачи или обработать исключение, выбирайте интерфейс Callable. Он позволяет вернуть значение из потока и бросить исключение в основной поток.

Atomic:

Для обеспечения безопасности доступа к общим ресурсам из нескольких потоков хорошо подходят классы из пакета java.util.concurrent.atomic: AtomicInteger, AtomicLong и другие.

Executors:

Для формирования пула потоков лучше всего подходит Executors, который предоставляет удобные методы для создания и управления потоками в приложении, что позволит избежать этого вручную.

RxJava:

Если нужна реализация реактивного программирования и работы с асинхронными потоками, выбирайте RxJava. Этот подход предоставляет мощные инструменты для работы с потоками и обработки асинхронных событий.

Каждый из вышеперечисленных подходов имеет свои особенности и преимущества, поэтому выбор конкретного метода зависит от требований вашего приложения и контекста применения. Также важно учитывать требования к производительности, удобству использования и возможности многократного использования кода.

Минусы потоков

К текущему моменту мы рассмотрели достоинства потоков, как они помогают увеличить скорость работы приложения, а также различные варианты их создания в Java. Все было бы хорошо, но использование потоков также может привести к проблемам. Главной из них является физическое ограничение на уровне железа, которую стоит упомянуть при обсуждении использования потоков. Несмотря на то, что эта проблема не связана напрямую с языком программирования, но она имеет прямое отношение к результату использования многопоточности. Когда его использует приложение, оно может активно подключать ресурсы процессора, что наверняка приведет к проблемам в случае недостатка вычислительной мощности (например, при Starvation, когда потоку не хватает ресурсов для работы). Физический процессор имеет ограничения по количеству потоков, которые он может обрабатывать одновременно. Если их число в приложении превышает допустимый предел, то могут возникнуть проблемы с производительностью из-за конкуренции за ресурсы процессора. Помимо этого есть и другие общеизвестные ситуации, которых хотелось бы избежать. Ознакомимся с ними поближе.

1. Состояние гонки (Race Condition)

Состояние гонки возникает, когда два или более потока пытаются одновременно получить доступ или изменить общие данные, и результат зависит от того, какой поток завершит свою операцию быстрее. Проблема состоит в том, что порядок выполнения этих операций не определен и результат может быть непредсказуемым.

Например:

class SharedResource {
    private int counter = 0;
 
    public void increment() {
        counter++;
    }
 
    public int getCounter() {
        return counter;
    }
}
 
public class RaceConditionExample {
    public static void main(String[] args) {
        SharedResource sharedResource = new SharedResource();
 
        Runnable incrementTask = () -> {
            for (int i = 0; i < 10000; i++) {
                sharedResource.increment();
            }
        };
 
        Thread thread1 = new Thread(incrementTask);
        Thread thread2 = new Thread(incrementTask);
        thread1.start();
        thread2.start();
 
        try {
            thread1.join();
            thread2.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
 
        System.out.println("Final Counter Value: " + sharedResource.getCounter());
    }
}

Класс SharedResource содержит общий ресурс — переменную counter, к которой имеют доступ два потока. Они выполняют задачу инкрементации переменной в цикле, повторяя операцию 10 000 раз. После выполнения обоих потоков, программа выводит итоговое значение переменной counter. Из-за отсутствия синхронизации и возможности одновременного доступа нескольких потоков, результат может быть непредсказуемым.

Напишем класс Main и попытаемся запустить программу:

public class Main {
    public static void main(String[] args) {
        RaceConditionExample example = new RaceConditionExample();
        example.runExample();
    }
}

Рассмотрим результаты при нескольких запусках:

Final Counter Value: 18076
Final Counter Value: 16744
Final Counter Value: 16294

Действительно, итог выполнения меняется при каждом запуске. Это и есть состояние гонки.

2. Взаимоблокировка (Deadlock)


Взаимоблокировка возникает, когда два или более потока удерживают ресурсы и ждут друг друга для освобождения ресурсов, необходимых другим потокам. Как результат – ни один из потоков не может продолжить выполнение и программа оказывается заблокированной.

Например:

public class DeadlockExample {
    static Object lock1 = new Object();
    static Object lock2 = new Object();
 
    public static void main(String[] args) {
        Thread thread1 = new Thread(() -> {
            synchronized (lock1) {
                System.out.println("Thread 1: Holding lock 1...");
 
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
 
                System.out.println("Thread 1: Waiting for lock 2...");
                synchronized (lock2) {
                    System.out.println("Thread 1: Holding lock 1 and lock 2...");
                }
            }
        });
 
        Thread thread2 = new Thread(() -> {
            synchronized (lock2) {
                System.out.println("Thread 2: Holding lock 2...");
 
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
 
                System.out.println("Thread 2: Waiting for lock 1...");
                synchronized (lock1) {
                    System.out.println("Thread 2: Holding lock 2 and lock 1...");
                }
            }
        });
 
        thread1.start();
        thread2.start();
    }
}


Пошаговое описание кода:

1. Создаются два объекта-блокировки: lock1 и lock2.

2. Создаются два потока, thread1 и thread2, которые используют эти блокировки.

3. thread1 пытается сначала захватить lock1, затем ждет 1 секунду, после чего пытается захватить lock2.

4. thread2 пытается сначала захватить lock2, затем также ждет 1 секунду и пытается захватить lock1.

5. Если выполнение этих двух потоков происходит в определенной последовательности, то возможен deadlock. Например, если thread1 захватывает lock1, а затем thread2 захватывает lock2, оба потока будут ожидать друг друга, и программа заблокируется.

При запуске получаем:

Thread 1: Holding lock 1...
Thread 2: Holding lock 2...
Thread 2: Waiting for lock 1...
Thread 1: Waiting for lock 2...

В итоге мы получаем бесконечный цикл, где каждый поток хочет заполучить ресурсы, занимаемые другим потоком. При этом программа продолжает работать, как будто все хорошо.
Даже после тестирования не всегда удается распознать подобные блокировки, поэтому в пакете java.util.concurrent было предложено решение в виде интерфейса Lock и его методов.
Lock – это интерфейс с механизмом управления блокировкой, обеспечивающий безопасность выполнения кода в многопоточной среде. Реализации этого интерфейса (например, ReentrantLock) предоставляют конкретные реализации блокировок.

ReentrantLock – это реентерабельная взаимная исключающая блокировка с базовым поведением и семантикой, аналогичными неявной мониторной блокировке, используемой в синхронизированных методах и операторах. Эта блокировка может быть захвачена и освобождена одним и тем же потоком несколько раз, что делает ее реентерабельной.

Блокировка принадлежит потоку, который последним успешно ее захватил, но еще не освободил. Если поток вызывает метод lock и блокировка не принадлежит другому потоку, то он успешно её захватывает.

Методы isHeldByCurrentThread() и getHoldCount() позволяют проверить, принадлежит ли текущий поток блокировке, а также сколько раз блокировка была захвачена текущим потоком.

Конструктор класса принимает параметр справедливости (fairness), который опционален. Если установлен в true, то при конфликте блокировки благосклонность отдается потоку, который дольше всех ждал. В противном случае порядок доступа не гарантируется.

Обратимся к документации:

It is recommended practice to always immediately follow a call to lock with a try block, most typically in a before/after construction.

Это означает, что рекомендуется всегда сразу после вызова lock использовать блок try, например, в конструкции try-finally, чтобы гарантировать освобождение блокировки в любых обстоятельствах. Также в документации приводится следующий пример:

class X {
   private final ReentrantLock lock = new ReentrantLock();
   // ...
 
   public void m() {
     lock.lock();  // block until condition holds
     try {
       // ... method body
     } finally {
       lock.unlock()
     }
   }
 }

В дополнение к реализации интерфейса Lock этот класс определяет ряд общедоступных и защищенных методов для проверки состояния блокировки. Некоторые из этих методов полезны только для мониторинга.

Данная блокировка поддерживает максимум 2147483647 рекурсивных блокировок одним и тем же потоком. Попытки превысить этот лимит приводят к выбрасыванию ошибки из методов блокировки.

Коротко отметим, что существуют и другие менее популярные проблемы:

  • Livelock – ситуация, когда потоки продолжают работу, но не могут завершить свои задачи из-за постоянных условий, которые не могут быть выполнены. Поток часто вызывается в ответ на действие другого потока. Если оно также является ответом на действие другого потока, то может возникнуть блокировка. Как и в случае взаимоблокировки, потоки livelock не могут дальше выполняться. Однако потоки не заблокированы, они просто слишком заняты, отвечая друг другу, чтобы возобновить работу. Такую ситуацию можно сравнить с двумя людьми, пытающимися обогнать друг друга в коридоре: Альфонс отходит влево, чтобы пропустить Гастона, а Гастон отходит вправо, чтобы пропустить Альфонса. Понимая, что оба блокируют друг друга, Альфонс перемещается вправо, а Гастон — влево. Но они все еще продолжают блокировать друг друга и т.д.

  • Starvation – описывает ситуацию, когда поток не может получить регулярный доступ к общим ресурсам и не может продвигаться вперед. Так случается, когда общие ресурсы становятся недоступными в течение длительного времени из-за «жадных» потоков. Предположим, что объект предоставляет синхронизированный метод, который часто требует много времени, чтобы вернуть значение. Если один поток часто вызывает этот метод, другие потоки, которым также требуется частый синхронизированный доступ к одному и тому же объекту, часто будут блокироваться.

Заключение

Многопоточное программирование имеет мощные инструменты для повышения эффективности и производительности приложений. В этой статье мы рассмотрели ключевые аспекты работы с потоками в Java — базовые понятия и проблемы, связанные с многозадачностью, а также продвинутые концепции, такие как синхронизация и CAS-механизм.

Подведем итоги:

  • Потоки позволяют параллельно выполнять задачи, улучшая отзывчивость приложений и общую производительность.

  • Состояния гонки и дедлоки являются распространенными проблемами в многопоточном программировании, их предотвращение важно для обеспечения корректной работы программы.

  • Синхронизация и CAS предоставляют средства для обеспечения безопасности доступа к общим ресурсам в многопоточных приложениях.

Понимание этих концепций поможет вам создавать надежные и эффективные многопоточные приложения. Однако несмотря на все преимущества, важно остерегаться сложности и подходить к многопоточности ответственно, учитывая особенности своего проекта. В следующей части вас ждет набор новых шпаргалок на актуальные темы Java :) 

P.S.: А по ссылке вы найдете обещанную мини-шпаргалку по многопоточности в Java. С ее помощью вам будет гораздо легче ориентироваться в ключевых понятиях, а также отслеживать те аспекты, которые могут быть случайно упущены. Надеюсь, что она поможет нашим читателям лучше подготовиться к собеседованию :) Желаю успехов!

Использованные источники: 

https://docs.oracle.com/javase/8/docs/api/java/lang/Thread.html 

https://docs.oracle.com/javase/tutorial/essential/concurrency/ 

https://download.java.net/java/early_access/valhalla/docs/api/java.base/java/lang/Thread.html 

Спасибо за внимание!

Больше авторских материалов для backend-разработчиков читайте в соцсетях SimbirSoft – ВКонтакте и Telegram.

Комментарии (6)


  1. mrprogre
    07.05.2024 08:48
    +1

    Лично для меня это лучшая статья по этой теме. Всё в одном месте! Сохраняю :) спасибо!


  1. Scott_Leopold
    07.05.2024 08:48
    +4

    В целом, статья неплохая, но есть неточности. Например, volatile не гарантирует атомарность.



  1. hunterartur
    07.05.2024 08:48
    +1

    БАЗА


  1. tom_tt
    07.05.2024 08:48

    Спасибо за статью!
    Хотелось бы ещё добавить, что у виртуальных потоков нет приоритета


  1. SergeyMironov
    07.05.2024 08:48

    Спасибо за статью - написано грамотно и разложено по полочкам!

    Исключения. Runnable не бросает проверяемые исключения (за исключением тех, которые являются подклассами RuntimeException).

    подклассы RuntimeException не являются проверяемыми исключениями. Runnable не бросает проверяемые исключения и точка.