Потоки Java соответствуют потокам ядра и поэтому обладают всеми присущими им недостатками. В сети можно найти проекты, целью которых является отказ от потоков Java и реализация пользовательских потоков. Самые известные перечислены ниже.
Kilim — один из первых «рабочих» проектов, реализующих легковесные потоки. Библиотека предоставляет средства для создания приложений, основанных на обмене сообщениями. Из-за соответствующего API данную библиотеку можно рассматривать скорее как реализующую модель акторов, чем потоковую модель.
Quasar — другой проект, реализующий прикладные потоки, называемые нитями (fibers). Кроме легковесных потоков библиотека предоставляет построенную на нитях реализацию модели акторов. Хотя API нитей похож на API потоков Java, чтобы воспользоваться средствами библиотеки, потребуется переписать код приложения.
В данной статье рассматривается проект Zephyr. Его отличие от первых двух проектов заключается в том, что средства библиотеки позволяют «превратить» обычные потоки в легковесные, не изменяя кода приложения. В действительности библиотека позволяет использовать любую реализацию потоков, и легковесные потоки являются одной из возможных реализаций.
Пример
Рассмотрим классическую задачу для оценки производительности потоков. По условиям задачи создаются 503 потока, объединенные в кольцо. Некоторое сообщение передается первому потоку и далее от потока к потоку N раз. Последний поток, получивший сообщение, печатает свой номер.
Ниже приведен исходный код задачи и POM-файл с плагином для запуска.
src\main\java\org\example\threadring\ThreadRing.java
package org.example.threadring;
import java.util.concurrent.locks.LockSupport;
public class ThreadRing {
public static void main(String[] args) throws InterruptedException {
int threadCount = Integer.parseInt(args[0]);
int n = Integer.parseInt(args[1]);
WorkerThread[] threads = new WorkerThread[threadCount];
WorkerThread first = new WorkerThread(1);
WorkerThread next = first;
for (int i = threadCount - 1; i > 0; i--) {
WorkerThread thread = new WorkerThread(i + 1);
threads[i] = thread;
thread.next = next;
thread.waiting = true;
thread.start();
next = thread;
}
threads[0] = first;
first.next = next;
first.message = n;
first.waiting = false;
first.start();
for (WorkerThread thread : threads) {
thread.join();
}
}
private static final class WorkerThread extends Thread {
private final int id;
WorkerThread next;
int message;
volatile boolean waiting;
WorkerThread(int id) {
this.id = id;
}
@Override
public void run() {
int m;
do {
while (waiting) {
LockSupport.park();
}
m = message;
waiting = true;
next.message = m - 1;
next.waiting = false;
LockSupport.unpark(next);
} while (m > 0);
if (m == 0) {
System.out.println(id);
}
}
}
}
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example.threadring</groupId>
<artifactId>threadring</artifactId>
<version>1.0-SNAPSHOT</version>
<build>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>1.4.0</version>
<configuration>
<mainClass>org.example.threadring.ThreadRing</mainClass>
<arguments>
<argument>503</argument>
<argument>50000000</argument>
</arguments>
</configuration>
<executions>
<execution>
<goals>
<goal>java</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Собираем проект и запускаем.
mvn exec:java [INFO] Scanning for projects... [INFO] [INFO] ------------------------------------------------------------------------ [INFO] Building threadring 1.0-SNAPSHOT [INFO] ------------------------------------------------------------------------ [INFO] [INFO] --- exec-maven-plugin:1.4.0:java (default-cli) @ threadring --- 292 [INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ [INFO] Total time: 1:45.475s
Теперь изменим POM-файл, добавив поддержку легковесных потоков.
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example.threadring</groupId>
<artifactId>threadring</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<zephyr.version>1.0-SNAPSHOT</zephyr.version>
</properties>
<build>
<plugins>
<plugin>
<groupId>org.jvnet.zephyr.maven</groupId>
<artifactId>remapping-maven-plugin</artifactId>
<version>1.0-SNAPSHOT</version>
<configuration>
<mappingEntries>
<mappingEntry>
<oldName>java/lang/Thread</oldName>
<newName>org/jvnet/zephyr/jcl/java/lang/Thread</newName>
</mappingEntry>
<mappingEntry>
<oldName>java/util/concurrent/locks/LockSupport</oldName>
<newName>org/jvnet/zephyr/jcl/java/util/concurrent/locks/LockSupport</newName>
</mappingEntry>
</mappingEntries>
</configuration>
<executions>
<execution>
<goals>
<goal>remapping</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.jvnet.zephyr.maven</groupId>
<artifactId>javaflow-maven-plugin</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.jvnet.zephyr.thread</groupId>
<artifactId>thread-api</artifactId>
<version>${zephyr.version}</version>
</dependency>
<dependency>
<groupId>org.jvnet.zephyr.jcl</groupId>
<artifactId>jcl-jdk7</artifactId>
<version>${zephyr.version}</version>
</dependency>
</dependencies>
<configuration>
<classesDirectory>${project.build.directory}/remapping-classes</classesDirectory>
</configuration>
<executions>
<execution>
<goals>
<goal>javaflow</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-jar-plugin</artifactId>
<version>2.5</version>
<executions>
<execution>
<goals>
<goal>jar</goal>
</goals>
<configuration>
<classifier>javaflow</classifier>
<classesDirectory>${project.build.directory}/javaflow-classes</classesDirectory>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>1.4.0</version>
<dependencies>
<dependency>
<groupId>org.example.threadring</groupId>
<artifactId>threadring</artifactId>
<version>${project.version}</version>
<classifier>javaflow</classifier>
</dependency>
<dependency>
<groupId>org.jvnet.zephyr.continuation</groupId>
<artifactId>continuation-javaflow</artifactId>
<version>${zephyr.version}</version>
</dependency>
<dependency>
<groupId>org.jvnet.zephyr.thread</groupId>
<artifactId>thread-api</artifactId>
<version>${zephyr.version}</version>
<classifier>javaflow</classifier>
</dependency>
<dependency>
<groupId>org.jvnet.zephyr.thread</groupId>
<artifactId>thread-continuation</artifactId>
<version>${zephyr.version}</version>
<classifier>javaflow</classifier>
</dependency>
<dependency>
<groupId>org.jvnet.zephyr.jcl</groupId>
<artifactId>jcl-jdk7</artifactId>
<version>${zephyr.version}</version>
<classifier>javaflow</classifier>
</dependency>
</dependencies>
<configuration>
<cleanupDaemonThreads>false</cleanupDaemonThreads>
<includeProjectDependencies>false</includeProjectDependencies>
<includePluginDependencies>true</includePluginDependencies>
<mainClass>org.example.threadring.ThreadRing</mainClass>
<arguments>
<argument>503</argument>
<argument>50000000</argument>
</arguments>
<systemProperties>
<systemProperty>
<key>org.jvnet.zephyr.thread.continuation.DefaultForkJoinPoolExecutor.parallelism</key>
<value>4</value>
</systemProperty>
</systemProperties>
</configuration>
<executions>
<execution>
<goals>
<goal>java</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Плагин remapping-maven-plugin обрабатывает class-файлы, переназначая типы. В данном примере все ссылки на Thread и LockSupport заменяются на классы, реализующие поддержку легковесных потоков. Поскольку новые классы имеют такой же интерфейс как и оригинальные, менять код приложение не требуется. По умолчанию преобразованные файлы копируются в директорию target\remapping-classes.
Плагин javaflow-maven-plugin трансформирует class-файлы, добавляя в методы поддержку продолжений (continuations), реализуемых библиотекой Commons Javaflow. Есть возможность исключить из обработки как целые классы (excludes), так и отдельные методы (excludedMethods). Файлы, обработанные плагином remapping-maven-plugin, являются исходными для javaflow-maven-plugin. Так как remapping-maven-plugin и javaflow-maven-plugin по умолчанию имеют один и тот же жизненный цикл (process-classes), то плагины запускаются в том порядке, в котором они указаны в POM-файле. В связи с тем что javaflow-maven-plugin в процессе работы загружает классы, на которые ссылаются трансформируемые class-файлы, необходимо указать соответствующие зависимости плагина. По умолчанию файлы после трансформации помещаются в директорию target\javaflow-classes.
Плагин maven-jar-plugin собирает jar-файл, использую файлы, полученные после обработки плагином javaflow-maven-plugin, и помечает его классификатором javaflow.
Новая версия приложения зависит от классов, реализующих поддержку легковесных потоков, но поскольку данные классы не нужны на стадии компиляции, соответствующие зависимости добавлены в плагин exec-maven-plugin.
Собираем и запускаем.
mvn exec:java [INFO] Scanning for projects... [INFO] [INFO] ------------------------------------------------------------------------ [INFO] Building threadring 1.0-SNAPSHOT [INFO] ------------------------------------------------------------------------ [INFO] [INFO] --- exec-maven-plugin:1.4.0:java (default-cli) @ threadring --- 292 [INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ [INFO] Total time: 24.061s
Как видим, новая версия приложения работает значительно быстрее оригинальной.
Реализация
Суть метода, который позволяет выполнить переход от обычных потоков к легковесным не меняя исходного кода приложения, заключается в переназначении на уровне байт кода стандартных классов на классы с таким же интерфейсом, но с другой реализацией. В примере, приведенном выше, переназначаются два класса: Thread и LockSupport. Новые классы делегируют вызовы своих методов некоторой реализации ThreadImpl. При конструировании экземпляра класса потока создание конкретной реализации запрашивается у провайдера ThreadImplProvider. Класс ThreadImplProvider абстрактный, и конкретный провайдер загружается через средство загрузки сервисов ServiceLoader. Все это позволяет присоединять к потокам произвольную реализацию, осуществляя идею подключаемых потоков.
На текущий момент есть две реализации потоков: JavaThreadImpl и ContinuationThreadImpl. Класс JavaThreadImpl делегирует вызовы своих методов обратно методам класса java.lang.Thread. Поток с этой реализацией доступен по умолчанию при вызове Thread.currentThread. Это необходимо, для того чтобы классы, которые зависят от новых Thread и LockSupport, могли работать вне подключаемых потоков (например, в методе main). Класс ContinuationThreadImpl реализует легковесные потоки через продолжения, и в качестве планировщика по умолчанию использует ForkJoinPool. Абстрактный класс Continuation, конкретная реализация которого создается при помощи провайдера ContinuationProvider, который в свою очередь загружается через ServiceLoader, содержит методы приостановки выполнения (suspend) и возобновления выполнения продолжения (resume). Класс ContinuationThreadImpl возобновляет работу легковесного потока, отправляя на исполнение в пул потоков задачу, в которой возобновляется выполнение приостановленного продолжения. Это происходит при вызове методов unpark и yield и после завершения ожидания, которое инициируют методы parkNanos, parkUntil, sleep и join. При вызове методов park, sleep и join продолжение приостанавливается, задача завершается, и поток переводится в состояние ожидания.
Пока доступна только одна реализация продолжении, основанная на слегка измененной версии библиотеки Commons Javaflow. Javaflow позволяет включить поддержку продолжений во все методы, кроме статических конструкторов и конструкторов экземпляра, также не поддерживаются встроенные блокировки. Это означает, что приложение будет работать некорректно в том случае, когда приостанавливается метод, вызванный из конструктора или блока синхронизации. В проекте предусмотрена возможность обойти данное ограничение. С помощью методов managed и manage класса ThreadUtils можно сделать методы, вызываемые из определенного места в коде, неприостанавливаемым. Другими словами, эти методы начинают работать так, как если бы они выполнялись в обычном потоке. При таком подходе приложение будет работать корректно, хотя могут возникнуть нежелательные блокировки потоков.
Помимо классов Thread и LockSupport поддержка подключаемых (и соответственно легковесных) потоков реализована в ThreadLocal, во многих классах из пакетов java.util.concurrent и java.util.concurrent.lock (параллельные коллекции, блокировки, барьеры, семафоры и др.). Классы ограниченных (bounded) параллельных коллекций с поддержкой подключаемых потоков в сочетании с легковесными потоками позволяют создавать асинхронные приложения, в которых нет такой проблемы, когда несколько поставщиков (producers), передающих данные через ограниченный буфер, и медленных потребителей (consumers), исполняющихся на пуле потоков, замедляют или блокируют работу всей системы. Также реализован сервис-провайдер SelectorProvider каналов NIO с поддержкой подключаемых потоков. Синхронный API каналов в контексте легковесных потоков позволяет писать понятный код без селекторов и обратных вызовов.
Тест производительности
Ниже приведены результаты теста thread ring для сравнения производительности потоков Java, акторов Akka, нитей Quasar и легковесных потоков Zephyr. Для тестирования потоков Zephyr используется ForkJoinPool из Quasar, так как он отличается от стандартного ForkJoinPool, который по умолчанию используется в Zephyr.
Командная строка для запуска теста:
java -jar zephyr-benchmark.jar -jvmArgsAppend "-DworkerCount=503 -DringSize=1000000 -Dparallelism=4 -javaagent:quasar-core-0.6.2.jar" -wi 5 -i 10 -bm avgt -tu ms -f 5 ".*RingBenchmark.*"
Результаты теста для N = 1000000 и parallelism = 4:
Benchmark Mode Cnt Score Error Units AkkaActorRingBenchmark.benchmark avgt 50 262.676 ± 4.840 ms/op JavaThreadRingBenchmark.benchmark avgt 50 2146.026 ± 11.087 ms/op QuasarFiberRingBenchmark.benchmark avgt 50 343.258 ± 5.362 ms/op ZephyrThreadRingBenchmark.benchmark avgt 50 251.916 ± 3.474 ms/op
Исходный код теста доступен по ссылке.
Комментарии (52)
mmMike
08.04.2015 13:06+6Первое что вызывает сомнение — это тест на производительность. Который как специально сделан, что бы протестировать не производительность потоков, как таковых, а производительность обмена данными между ними.
Понятно, что модели нитей, которая стандартно встроена в Javа и выполняющейся на JVM с соответствующей аппаратной поддержкой, такой тест покажет заведомо низкий результат, чем в реализации, изначально ориентированной на обмен сообщениями в без переключения контекста.
А то, что, кто то вынужден фактически избавляться от потоков (на уровне ядра), не меняя исходного кода говорит только об ободном: этот «кто то» допустил ошибку при разработке архитектуры своей программы (увлекся понятием потока, не понимая как это фактически работает) и теперь мучительно ищет пути для повышения производительности без существенной переделки исходников.
На мой взгляд — порочный путь.xhumanoid
08.04.2015 17:25+4по поводу теста, можно добавить еще немного.
Почему в каждой итерации присутствует полностью поднятие всех потоков/системы акторов, так как следом напрашиваются следующие уточняющие вопросы:
1) akka использует ForkJoinPool (да, я знаю что своя реализация, а не из java), в итоге если он прогрет, то все сводится к вопросу создания акторов
2) почему тесты для ForkJoinPool для Java и Scala не приведены в статье, хотя и написаны
3) автор точно уверен, что он мерял именно вопросы передачи сообщения по кольцу, а не что-то ещё (например скорость создания 500+ потоков в java)
это первые вопросы что возникли, еще до запуска тестовyngui Автор
09.04.2015 09:27Валидные замечания. Поправил тесты. Теперь время поднятия потоков и акторов не учитывается в общем времени. Правда цифры не очень изменились после этого.
Benchmark Mode Cnt Score Error Units AkkaActorRingBenchmark.benchmark avgt 50 262.676 ± 4.840 ms/op ForkJoinTaskRingBenchmark.benchmark avgt 50 96.272 ± 2.235 ms/op JavaThreadRingBenchmark.benchmark avgt 50 2146.026 ± 11.087 ms/op Jsr166ForkJoinTaskRingBenchmark.benchmark avgt 50 72.973 ± 1.808 ms/op QuasarActorRingBenchmark.benchmark avgt 50 682.840 ± 4.296 ms/op QuasarFiberRingBenchmark.benchmark avgt 50 343.258 ± 5.362 ms/op ScalaForkJoinTaskRingBenchmark.benchmark avgt 50 72.996 ± 1.531 ms/op ZephyrThreadRingBenchmark.benchmark avgt 50 251.916 ± 3.474 ms/op
Видимо все таки потоки Java создаются достаточно быстро. Интересно также, что первый прогревающий запуск теста Akka занимает секунды, тогда как последующие запуски отрабатывают значительно быстрее. Скорее всего тут дело в большом количестве классов (Scala же), которые загружаются при первом создании ActorSystem.
Результаты тестов ForkJoinPool не привел в статье по двум причинам. Во-первых, посчитал, что в этом нет смысла, потому что и так понятно, что «голый» ForkJoinPool быстрее чего-то, что написано с его использованием. Вопрос только, на сколько быстрее. Вторая причина такая, что хотелось сравнить что-то более менее одинаковое по степени «высокоуровневости». Все таки ForkJoinPool — достаточно низкоуровневая штука, которая подходит для узкого круга задач.
На самом деле Akka так же не имело смысла сравнивать, так как это не легковесные потоки. Стоило бы сравнить Kilim, хотя бы потому, что он указан в обзоре, но, к сожалению, в репозиториях нет соответствующего артефакта, а добавлять в тесты код целого проекта как-то не очень хотелось.
yngui Автор
09.04.2015 09:54Ну, фактически там нет обмена сообщениями, если не считать тест Akka. В тестах как раз проверяется, насколько быстро происходит планирование потоков. Да, нативные потоки планирует операционная система, а в случае с легковесных этим занимается библиотека. Но в этом собственно и смысл теста, показать, насколько легковесные потоки планируются быстрее нативных.
Что касается повышения производительности без переделки исходников, может быть кто-то и хочет этим заниматься, но я бы не стал этого делать, по крайней мере без ревизии существующей архитектуры. С легковесными потоками есть другие преимущества: знакомый API, потоковая модель, отсутствие проблемы back pressure. С чего бы тогда все кинулись программировать на Go и Erlang, если бы все это было не существенно.
Rogaven
08.04.2015 13:23Помнится изначально в Java как раз и были легковесные потоки, от которых в итоге отказались в пользу потоков ядра. Какой толк их возвращать.
FractalizeR
08.04.2015 14:16А какова была причина? Мне кажется, сейчас какая-то тенденция среди современных языков наметилась как раз в пользу легковесных потоков (go-routines, например)
TimReset
08.04.2015 14:18Вы про green thread ( en.wikipedia.org/wiki/Green_threads )? Вообще похоже.
yngui, не подскажите, а какая разница между легковесными потоками и green thread?yngui Автор
08.04.2015 14:22+1Насколько я понимаю, зеленые потоки соответствуют модели N:1, а легковесные модели M:N (http://en.wikipedia.org/wiki/Thread_(computing)#Models)
konsoletyper
08.04.2015 14:12+3Для TeaVM я реализовал легковесные потоки, правда, выполняется эта «Java» в JavaScript, поэтому возможны только легковесные потоки (WebWorkers не умеют выполняться в едином адресном пространстве). Вот простейший пример. А вот более сложный (карты скачиваются и заливаются в IndexedDB каждая в своём потоке). Я как раз собираюсь в скором времени написать статью по поводу внутренней механики своих легковесных процессов, и в некоторой степени там есть заимствования из Javaflow. Проблема в том, что Javaflow может просто так взять и вставить goto в нужную точку метода, а вот с декомпилированными из байткода JavaScript такой трюк не прокатит.
TimReset
08.04.2015 14:21Да, было бы интересно прочитать про TeaVM. А то давно от вас статей по ней не было. Я, как активно использующий GWT, интересуюсь его аналогами. :)
ababo
08.04.2015 14:15-10Вот почему я выбираю Go.
FractalizeR
08.04.2015 14:17+2Почему? И для чего конкретно? В Java нет нативных легковесных потоков. Как и в куче других языков. Разве это делает язык однозначно и для всего плохим и неподходящим?
ababo
08.04.2015 14:19-15Хорошо, что я не рядом. Вы бы, наверное, высказав всё в лицо, туда же и ударили.
FractalizeR
08.04.2015 14:27+5Странно, что вы так думаете. Разве я дал вам повод считать меня агрессивным?
Я отрицательно отношусь к неаргументированной критике — это правда. Однако насилие — не мой метод. Если Ява кажется вам плохим языком только по указанной вами причине — пожалуйста, продолжайте так думать. Мне просто было интересно.ababo
08.04.2015 14:33Просто напряг ваш напор. Я не считаю Java плохим языком. Да, он не оригинален (много заимствований из Оберона), не красив (избыточен, тяжеловесный синтаксис), но всё равно оказался очень полезным и востребованным. Java — это современный Cobol, язык, на который опирается бизнес, в частности, почти весь финансовый сектор. А в контексте данного поста я выбираю Go из-за лёгких потоков из коробки, позволяющих избавляться от спагетти асинхронных вызовов, а также из-за его простоты и красоты.
FractalizeR
08.04.2015 15:31Кстати, честно говоря, я уж и не знаю, какой из серьезно популярных языков несет действительно много новых собственных идей, которые просто никак не назвать переосмыслением старых парадигм.
PHmaster
08.04.2015 23:28Нормальный эволюционный процесс. Лишь небольшое количество совершенно новых «мутаций», и подавляющее большинство обкатанных на практике решений, доставшихся от предшественников в результате «скрещивания» и отбора. По-другому, наверное, и быть не может.
nehaev
08.04.2015 16:45-1Вот почему я выбираю Scala+Akka и совершенно не парюсь по поводу каких-то там «легковесных потоков».
PHmaster
08.04.2015 23:35+1Надо, видимо, все же научиться готовить этот ваш Akka. Попробовал акторов — и как-то плавно и незаметно для себя перешел на Scala+Futures/Promises. Может, посоветуете, что можно почитать по грамотному проектированию actor-based систем?
nehaev
08.04.2015 23:54Scala+Futures/Promises — это весьма удобно для простых задач, но с разрастанием функционала обычно приводит к «спагетти асинхронных вызовов».
Единственное, что точно поможет именно с грамотным проектированием в акке — это доки по ней. Может быть, это прозвучало несколько занудно, но доки там действительно весьма толковые.
Ну и если еще не проходили, однозначно рекомендую записаться на курс Reactive Programming (https://www.coursera.org/course/reactive). Один из ведущих Martin Odersky — автор Scala, другой Roland Kuhn — Akka Tech Lead.PHmaster
09.04.2015 00:45это весьма удобно для простых задач, но с разрастанием функционала обычно приводит к «спагетти асинхронных вызовов».
У меня как раз произошел обратный процесс: с разрастанием функционала акторы мутировали в каких-то монстров, интерфейс общения с каждым из которых состоял из целой батареи кейс-классов, и я начал во всем этом путаться. Поначалу я стал повсеместно применять ask/pipeTo, чтобы хоть как-то облегчить себе жизнь и избавиться от промежуточных сообщений, но в результате пришел к тому, что акторы мне оказались вообще не нужны, так как ask возвращает Future, и зачем мне городить актора и батарею case-class-ов для каждого сообщения, чтобы получить Future, если можно сразу написать нормальный класс, методы которого безо всяких сообщений и акторов будут возвращать мне эти самые Future.
Сейчас-то я приблизительно понимаю, где я просчитался и что спроектировал изначально неправильно, но проект, на котором я тренировался, уже окончательно и бесповоротно переведен на Future. В следующем проекте буду пробовать еще раз) Ну и, видимо, стоит взглянуть в сторону типизированных акторов, чтобы избавиться от кейс-классов для общения, а интерфейс актора выглядел извне как обычный класс.
Единственное, что точно поможет именно с грамотным проектированием в акке — это доки по ней
Доки по акке я и читал, в основном, но там как-то больше о способах применения, а не о целесообразности этих способов в моих конкретных ситуациях.
А вот за курс спасибо, запишусь обязательно. Недавно попробовал онлайн-обучение — мне понравилось, теперь еще хочу :))nehaev
09.04.2015 02:06+1Ага, теперь понял. Доки тут действительно не помогут. Тут нужен опыт и, желательно, чужой. Где-то с год назад я видел выступление, в котором чувак из какой-то компании рассказывал про грабли, по которым они щедро потоптались прежде чем научились готовить акку. Что характерно, я в свое время прошелся точно по таким же граблям. Сейчас это видео, к сожалению, не нагуглилось сходу. Но вообще выступления про то, как правильно готовить акку, бывают практически на любой конференции по scala. Наверняка в том же Киеве есть собирающееся с определенной периодичностью scala-сообщество, куда можно прийти и задать вопросы.
Несколько комментариев по тем вещам которые меня насторожили:
> акторы мутировали в каких-то монстров
Хоть автор исходной статьи и пытается доказать обратное, акторы — это чертовски легковесная вещь. Нет ничего страшного в том, чтобы плодить акторы на каждый чих. Монстров надо дробить на мелкие кусочки и разносить по разным акторам.
> интерфейс общения с каждым из которых состоял из целой батареи кейс-классов, и я начал во всем этом путаться
Иногда (достаточно редко при правильной декомпозиции) бывает нужно сделать актор со сложным внешним поведением и внутренней логикой. Здесь приходит на помощь FSM.
> я стал повсеместно применять ask/pipeTo
Акторы великолепно работают по принципу fire-and-forget. Надо дизайнить их исходя из этого. Ask — только в самом крайнем случае, желательно — никогда.
> зачем мне городить актора и батарею case-class-ов для каждого сообщения, чтобы получить Future,
если можно сразу написать нормальный класс
Обмен сообщениями это конечно важно, но акка не только про это. Например, любой актор является элементом иерархии супервизоров, благодаря чему можно делать такой fault-tolerance, который не снился никаким фьючам.
> стоит взглянуть в сторону типизированных акторов
Если акка под скалой а не под джавой, то имхо не стоит. Уже были типизированные акторы, типизированные каналы (а-ля Go), сейчас обещают какие-то новые типизированные акторы… Но тут проблема скорее всего в другом. Если у актора загроможденный интерфейс — нужна декомпозиция. Если хочется типобезопасных декларативных пайплайнов — можно посмотреть на Akka Streaming.PHmaster
09.04.2015 03:28Спасибо за столь развернутый ответ :)
Тут нужен опыт и, желательно, чужой.
Да, в этом-то и проблема. Я впоследствии нашел книгу (автора уже сейчас не вспомню, а под рукой сейчас нет) с конкретными практическими советами, но она оказалась очень короткой и быстро закончилась :))
Но вообще выступления про то, как правильно готовить акку, бывают практически на любой конференции по scala. Наверняка в том же Киеве есть собирающееся с определенной периодичностью scala-сообщество, куда можно прийти и задать вопросы.
Вот в какое-то местное scala-сообщество с удовольствием бы влился и послушал умных советов. Наверное, пора заняться поиском: вопросов и тем для обсуждения накопилось уже достаточно.
Хоть автор исходной статьи и пытается доказать обратное, акторы — это чертовски легковесная вещь. Нет ничего страшного в том, чтобы плодить акторы на каждый чих. Монстров надо дробить на мелкие кусочки и разносить по разным акторам.
Это было моей основной ошибкой, как я потом понял (после прочтения той самой книги). Это при том, что в доках по акке часто упоминается, что акторы легковесны, и что актор в ответ на сообщение может создавать новых акторов — пока не прочитал об этом в книге с конкретным примером — самому в голову как-то не пришло воспользоваться этой информацией. Поэтому я городил в одном акторе очень много логики, а новых акторов создавал очень редко.
Иногда (достаточно редко при правильной декомпозиции) бывает нужно сделать актор со сложным внешним поведением и внутренней логикой. Здесь приходит на помощь FSM.
Вот вместо разделения логики по нескольким акторам я начал всех чрезмерно перегруженных и усложнившихся акторов оформлять в виде FSM. И так мне они (FSM) понравились, что даже статью на хабре про них написал :)
Акторы великолепно работают по принципу fire-and-forget. Надо дизайнить их исходя из этого. Ask — только в самом крайнем случае, желательно — никогда.
Тоже никак не смог перестроить мозги на этот fire-and-forget. Может, со второй попытки получится. Есть, скажем, в БД таблица с задачами, и есть актор-исполнитель задач. Задача с пометкой pending вытаскивается из таблицы, помечается как processing и отправляется актору-исполнителю (fire-and-forget). Актор-исполнитель падает, супервизор его рестартует, и он просит новую задачу. А та, на которой он упал, так и остается forget, и в БД висит как processing, хотя про нее все успешно забыли и она никогда уже не выполнится. На тот же фьючерс можно повесить onFailure (или recover), и поменять в БД статус задачи с processing на failed. А с fire-and-forget — так и не разобрался, как нужно правильно поступить.
Обмен сообщениями это конечно важно, но акка не только про это.
Да, с акторами еще очень просто решается проблема сериализации доступа к изменяемому состоянию. То есть, внутри актора всегда есть гарантия, что код исполняется только в одном потоке, и можно смело изменять состояния безо всяких мьютексов-локов. А вот с использованием Future мне порой приходится заворачивать кусочки кода в synchronized. Не часто, но бывает. Это одна из причин, почему я хочу попробовать осилить акторов еще раз.
Например, любой актор является элементом иерархии супервизоров, благодаря чему можно делать такой fault-tolerance, который не снился никаким фьючам.
Fault-tolerance я тоже готовить толком не научился. Пара попыток была — но закончились чем-то невнятным. Зато у Future помимо map/flatMap есть recover/recoverWith — которые ох как мне помогают в этом плане.
Если хочется типобезопасных декларативных пайплайнов — можно посмотреть на Akka Streaming.
Жду стабильного релиза Reactive Streams, если разговор о них :) Обещают много интересного, но бету пока не пробовал.
Если у актора загроможденный интерфейс — нужна декомпозиция.
Ну вот был у меня актор, отвечающий за загрузку данных с БД (reactive-mongo). Типов данных (коллекций в монго, или таблиц в sql) много. Запросы к этому актору и ответы для каждого типа данных выглядели как набор кейс-классов:
кодsealed trait Request; sealed trait Response; case class FindCustomerByName(name: String) extends Request; case class FindCustomerById(id: ID) extends Request; case class FindCustomersByRegistrationDates(from: Date, to: Date) extends Request case class FoundCustomer(customer: Option[Customer]) extends Response; case class FoundCustomers(customers: List[Customer]) extends Response;
nehaev
09.04.2015 13:11> Актор-исполнитель падает, супервизор его рестартует, и он просит новую задачу. А та, на которой он упал, так и остается forget, и в БД висит как processing
Ну по идее как раз супервизор должен понять, что задача которую он поручил исполнителю зафейлилась и стартовать новый исполнитель, которые запишет статус failed.
> Типов данных много, кейс-классов — на несколько экранов. Ну и все их нужно в акторе ловить, обрабатываь и кидать назад нужный ответ.
В скале если не знаешь что делать — пиши свой dsl.
object Select {
trait SelectRequest {
def query: String // формирует sql-запрос
}
object Customer {
case class ByName(name: String) extends SelectRequest {
def query = "..."
}
case class ById(...) {....}
}
}
dbActor! Select.Customer.ByName(«Vasya»)
В акторе матчится просто на SelectRequest и вызывать query. Пример упрощенный, в реальной жизни одним query конечно не отделаешься. Но идея в том, что акка тут не при чем. Просто можно использовать богатые возможности Scala по абстрагированию и унификации.
PS. Прошу прощения за уродское оформление кода, теги для меня отключены.PHmaster
09.04.2015 22:37Ну по идее как раз супервизор должен понять, что задача которую он поручил исполнителю зафейлилась и стартовать новый исполнитель, которые запишет статус failed.
Пробовал и такое. Получается уже не fire-and-forget, а для каждого исполнителя супервизор должен запоминать в каком-то Map/HashMap ту задачу, которую тот сейчас выполняет в данный момент. А если падает сам супервизор — то все задачи, выполняющиеся в данный момент его исполнителями, он забывает, значит, над ним должен стоять еще один супервизор, который должен их запоминать. Ответсвенность как-то рассредотачивается по целой цепочке следящих друг за другом акторов, и все равно нет никаких гарантий, что не зафейлится вся цепочка. Правда, сейчас понимаю, что можно было бы сделать одного супервизора на одного исполнителя, избавиться от HashMap в супервизоре, упростить таким образом конструкцию и уменьшить концентрацию ответственности в каждой возможной точке отказа. Но все равно нет гарантий, что супервизор не упадет вместе с исполнителем и не забудет о задании, пускай даже об одном.
В скале если не знаешь что делать — пиши свой dsl.
С DSL тоже не сложилось, так как я решил вообще абстрагироваться от технологии храниния данных, чтобы при необходимости можно было бы легко переходить от SQL к различным NoSQL-решениям и назад. Для начала все было реализовано на reactive-mongo, на потом запланирована реализация на reactive-postgresql, соответственно, писать совместимые реализации DSL под каждую систему храниния мне показалось технически сложным и накладным по времени. Поэтому я решил, что старый добрый интерфейс — это мой выбор, его реализовывать для каждой конкретной СУБД будет проще, чем DSL.
yngui Автор
09.04.2015 10:40Представляю, как кто-то, например, в дойче банке подойдет к своему менеждеру и предложит писать на Go или Erlang. Куда он будет послан.
ababo
09.04.2015 11:10-1Ну, Erlang, может, несколько и экзотичен в силу функциональности, а вот Go, IMHO, будущий приемник эстафеты Java. Очень простой и синшный язык, учится за пару приседаний.
yngui Автор
09.04.2015 11:29+2Вы просто не представляете себе масштабы бедствия. Есть тонны кода, который придется переписать, есть армия девелоперов, которую надо будет переучивать. Никто не будет выкладывать миллиард каждый раз, когда на рынке появляется еще один язык программирования.
ababo
09.04.2015 11:31-1Согласен, это не простой и не дешёвый процесс. И не факт, что преимущества Go окажутся настолько серьёзными на практике, что заставит вложить этот миллиард. С другой стороны, где сейчас Cobol?
nehaev
09.04.2015 11:56+1Java — это уже давно не столько язык, сколько платформа. Быстрая кросплатформенная JVM, зрелые средства разработки, тонны библиотек на все случаи жизни. Go пока в этом плане похвастаться особо нечем.
На платформе Java язык Java — не единственный, можно выбирать из достаточно широкого набора, причем Go в этот набор, насколько я знаю, не входит. Поэтому я не очень представляю, в каком смысле он может стать приемником Java.konsoletyper
09.04.2015 13:23Java — это ещё байт-код, который можно изменять или генерировать, тем самым добавляя в язык новые фичи. Например, автор статьи показал, как в Java, где из коробки отсутствуют green threads, можно спокойно их добавить. Я тот же javaflow использую, чтобы «замораживать» бизнес-процессы и складывать их БД, а потом, когда пользователь нажмёт нужную кнопку, «размораживать» их и продолжать выполнение. EclipseLink ковыряет байткод, чтобы добиться ленивой загрузки полей. Примеров множество. Можно ли подобные вещи сделать в Go, не модифицируя сам компилятор языка?
ragesteel
08.04.2015 14:45Плагин maven-jar-plugin собирает jar-файл, использую файлы, полученные после обработки плагином javaflow-maven-plugin, и помечает его классификатором javaflow.
Так используется scope runtime для этих зависимостей!
gurinderu
08.04.2015 16:47+1Возникает логичный вопрос, а зачем вам столько потоков?) И почему бы не использовать ту самую модель акторов и свести переключение контекстов к минимуму?
konsoletyper
08.04.2015 20:00+1Наверное, потому что на акторах не всегда удобно писать?
PHmaster
09.04.2015 00:19Во! Видимо, не я один такой, кому неудобно) Выше я уже написал, что с акторов плавно «скатился» до использования scala.concurrent.Future, за исполнение которого отвечает scala.concurrent.ExecutionContext (Akka его использует «под капотом»). По сути, CompletableFuture в Java 8 точно так же может облегчить жизнь и отказаться от собственноручного создания тредов, переложив эту ответсвенность на плечи java.util.concurrent.Executor, который можно создавать и настраивать под свои нужны, или довольствоваться ForkJoinPool.commonPool() по умолчанию.
yngui Автор
09.04.2015 00:49Я думаю, это зависит от задачи. Если в требованиях нужно смоделировать миллион активных бизнес сущностей, то вероятно понадобится создать столько же вычислительных сущностей, будь то акторы Akka, процессы Erlang, горутины или легковесные потоки.
Отвечая на второй вопрос. Акторы акторам рознь. Например, акторы Akka принципиально отличаются от акторов Quasar. Первые основываются на событийной модели, тогда как вторые на потоковой. Фактически, актор в Quasar — это поток с очередью сообщений. Актор сам решает, когда ему забрать сообщение из очереди, попутно он может выполнять какую-то другую работу, не связанную с обменом сообщениями при условии, что это не блокирует поток. В Akka же актор не управляет своей очередью сообщений. По сути, актор в Akka — это callback, его задача — обработать (как можно быстрее) сообщение и выйти из метода-обработчика. То есть в акторах Akka нельзя делать ничего, что не связано с обменом сообщениями. К тому же написание кода исключительно на обратных вызовах тоже имеет свои особенности. Также акторы Akka не решают проблему с back pressure, тогда как в Quasar это доступно «из коробки».nehaev
09.04.2015 03:18Про разные задачи согласен. В частности, Akka плохо подходит для последовательной обработки (pipeline), но для этого на ее основе уже много чего сделали: например, Akka Streaming, Spark Streaming, и проблему с back pressure решили. Естественно, все это без обязательного instumentation'а и SuspendExecution'а.
> То есть в акторах Akka нельзя делать ничего, что не связано с обменом сообщениями.
Во-первых, можно делать любые асинхронные вызовы, без разницы, связаны ли они с обменом сообщениями. Во -вторых, можно делать и «подвисающие» вызовы, главное — не забить ими весь пул потоков. Если есть такая опасность — можно пересадить «подвисающие» акторы на отдельный пул.
> акторы Akka не решают проблему с back pressure, тогда как в Quasar это доступно «из коробки».
Что именно доступно из коробки в Quasar? Просто посмотреть размер своей очереди? Или планировщик умеет как-то замедлять быстрых продюсеров?yngui Автор
09.04.2015 10:03Akka плохо подходит для последовательной обработки (pipeline), но для этого на ее основе уже много чего сделали: например, Akka Streaming, Spark Streaming, и проблему с back pressure решили.
Да, решили, но это все те же обратные вызовы. И reactive streams предназначены для узкого круга задач. Я с трудом представляю, как проект, в котором 1,5 миллиона строк кода, можно переписать на реактивных потоках. Даже не переписать, а переосмыслить архитектуру.
Что именно доступно из коробки в Quasar? Просто посмотреть размер своей очереди? Или планировщик умеет как-то замедлять быстрых продюсеров?
Продьюсеры просто саспендятся, если буфер заполнен.nehaev
09.04.2015 13:37> Да, решили, но это все те же обратные вызовы. И reactive streams предназначены для узкого круга задач.
Ну не знаю… По-моему streams отлично дополняют акку в части последовательной обработки, в чем как раз традиционно сильны всякие там рутины. Причем акка приносит с собой в эту сферу все свои многочисленные плюшки, которые собственно и заставляют предпочитать ее рутинам.
> Я с трудом представляю, как проект, в котором 1,5 миллиона строк кода, можно переписать на реактивных потоках. Даже не переписать, а переосмыслить архитектуру.
Могу себе представить как переосмыслить и даже как переписать. Но вот где найти столько спецов по reactive streams и столько денег, чтобы они взялись за это…
Но вообще да. Reactive архитектура не совместима с традиционной. Если где-то появилось event-driven, рано или поздно оно расползется по всему проекту.
> Продьюсеры просто саспендятся, если буфер заполнен.
Это круто. А можно ли настраивать другое поведение? Если например я просто хочу, чтобы лишние сообщения удалялись (throttle, debounce)?yngui Автор
09.04.2015 15:39Насчет throttle. Набросал такой код по описанию, взятому отсюда http://doc.akka.io/docs/akka/snapshot/contrib/throttle.html
Консьюмер — забирает сообщение из очереди и печатает его.
class Consumer extends Thread { private final BlockingQueue<Integer> queue; Consumer(BlockingQueue<Integer> queue) { this.queue = queue; } @Override public void run() { try { while (true) { System.out.println(queue.take()); } } catch (InterruptedException e) { e.printStackTrace(); } } }
Продьюсер — тротлит сообщения перед отправкой в очередь, 3 сообщения в секунду.
class Producer extends Thread { private static final int NUM = 3; private static final long PERIOD = 1000; private final BlockingQueue<Integer> queue; private long startTime; private int numLeft; Producer(BlockingQueue<Integer> queue) { this.queue = queue; } @Override public void run() { try { // These three messages will be sent to the target immediately throttle(1); throttle(2); throttle(3); // These two will wait until a second has passed throttle(4); throttle(5); } catch (InterruptedException e) { e.printStackTrace(); } } private void throttle(int message) throws InterruptedException { long time = System.currentTimeMillis(); if ((time - startTime) / PERIOD == 0) { if (numLeft > 0) { queue.put(message); numLeft--; return; } long millis = PERIOD - (time - startTime); if (millis > 0) { Thread.sleep(millis); } } startTime = time; queue.put(message); numLeft = NUM - 1; } }
Код для запуска.
BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(); new Consumer(queue).start(); Producer producer = new Producer(queue); producer.start(); producer.join();
Осталось только натравит на все это Zephyr (кроме Thread надо еще переназначить LinkedBlockingQueue), и получим тротлинг на легковесных потоках.
Думаю, debounce тоже несложно сделать.
gurinderu
09.04.2015 13:43Никто и не говорит про задачи.Мне просто не понятно зачем вам нужно плодить кучу потоков?
В условиях ОС все равно диспетчеризация будет идти через нее.
Можно же взять и использовать нативный поток и с той самой событийной моделью (как в Akka) избежать переключения контекстов.
По сути легковесный поток это тоже самое, внутри некой абстракной машины, который все равно работает на нативном)
Собственно я думаю поэтому в Java так и сделано, чтобы предоставить больше гибкости для разработчика.yngui Автор
09.04.2015 14:12Мне просто не понятно зачем вам нужно плодить кучу потоков?
Наверное, для того чтобы распараллеливать задачи?
Можно же взять и использовать нативный поток и с той самой событийной моделью
Так вот не хочется использовать событийную модель, а хочется потоковую.
Собственно я думаю поэтому в Java так и сделано, чтобы предоставить больше гибкости для разработчика.
В Java нет легковесных потоков, потому что есть нативные методы, которые работают в том же потоке, могут вызывать блокировки и не умеют саспендиться. В Erlang, например, эта проблема решена через порты.
konsoletyper
09.04.2015 14:21Ну вообще, штуку, похожую на ту, которая описана в данной статье, в своё время делали ребята из mail.ru при разработке серверной части SkyForge. Они это мотивировали тем, что объекты игровой механики реализованы в «потоках», что вполне логично, но таких «потоков» очень много, ни один сервер не потянул бы. Они закономерно пробовали колбэки, чтобы обращаться к сервисам, но утонули в получившихся спагетти. Тогда и сделали свой аналог javaflow, воодрузили на него fibers и легковесный RPC. Вот даже на Хабре статью нашёл: habrahabr.ru/company/mailru/blog/220359/
Throwable
13.04.2015 12:47Вопрос, а IO с легковесными потоками будет работать? То есть если я сделаю простейший тредовый сервер, который для каждого клиента делает новый тред и висит на .read(), будет ли это работать с лекговесными потоками?
yngui Автор
14.04.2015 00:05Да, работать будет. Специально для этого написан SelectorProvider, который умеет открывать каналы с поддержкой легковесных потоков, и который автоматически загружается, если в classpath присутствует jcl-jdk7-<version>-javaflow.jar.
На самом деле потоки не будут висеть на read. Все блокирующие методы канала приостанавливают поток и запускают асинхронную операцию. Работа потока возобновляется по завершении асинхронной операции.
Здесь можно найти примеры, взятые из Netty и переделанные на потоки.
Командная строка для запуска DiscardServer:
java -classpath "target\javaflow-classes;%USERPROFILE%\.m2\repository\org\jvnet\zephyr\continuation\continuation-api\1.0-SNAPSHOT\continuation-api-1.0-SNAPSHOT.jar;%USERPROFILE%\.m2\repository\org\jvnet\zephyr\continuation\continuation-javaflow\1.0-SNAPSHOT\continuation-javaflow-1.0-SNAPSHOT.jar;%USERPROFILE%\.m2\repository\org\jvnet\zephyr\javaflow\javaflow-runtime\1.0-SNAPSHOT\javaflow-runtime-1.0-SNAPSHOT.jar;%USERPROFILE%\.m2\repository\commons-logging\commons-logging\1.2\commons-logging-1.2.jar;%USERPROFILE%\.m2\repository\org\jvnet\zephyr\thread\thread-api\1.0-SNAPSHOT\thread-api-1.0-SNAPSHOT-javaflow.jar;%USERPROFILE%\.m2\repository\org\jvnet\zephyr\thread\thread-continuation\1.0-SNAPSHOT\thread-continuation-1.0-SNAPSHOT-javaflow.jar;%USERPROFILE%\.m2\repository\org\jvnet\zephyr\jcl\jcl-jdk7\1.0-SNAPSHOT\jcl-jdk7-1.0-SNAPSHOT-javaflow.jar" org.jvnet.zephyr.example.io.discard.DiscardServer
Командная строка для DiscardClient (без легковесных потоков, для проверки работы DiscardServer)
java -classpath "target\classes" org.jvnet.zephyr.example.io.discard.DiscardClient
XlebNick
результаты впечатляют! Спасибо!
yngui Автор
Пожалуйста. Пользуйтесь!