История
Механизм удаленных вызовов процедур (Remote Procedure Call, RPC) был предложен сотрудником Xerox PARC Брюсом Нельсоном в 1981 году. Принцип сейчас кажется очевидным, но для того времени был в новинку. С полным текстом статьи можно ознакомиться тут.
Я специально выбрал самую простую схему из статьи, на которой видно, что на машине C процедура вызывается, а выполняется на другой машине S. Все общение происходит по сети. Выглядит как что-то до боли знакомое, не так ли?
За более сорокалетнюю историю RPC то становился более популярней, то менее. В java-мире 20 лет назад использовали SOAP, расширение XML-RPC, а сейчас популярен gRPC, который гораздо более похож на SOAP, чем кажется.
Сокеты
Сперва хотелось бы акцентировать внимание на сетевое взаимодействие между сервисами. Понять, как именно будут вызывать процедуры (функции, методы - я буду использовать разные термины для одного и того же) на удаленном сервере.
Нам нужно передать сообщение от клиента серверу. И для этого нам нужно использовать какой-то сетевой протокол. Программисты чаще всего работают с http, его выбор мог бы стать результатом привычки. Да и многие решения используют его, как упомянутые ранее SOAP и gRPC. Но мне такой выбор кажется как минимум странным: нам же нужен только транспорт, без методов (GET, POST, PUT и тд) и без кодов (200 и 400) и прочих прикладных вещей. Название вызываемой процедуры будет в теле сообщения, возвращаемый результат - тоже. Правильней будет использовать именно транспортный протокол: TCP или UDP. В случае UDP придется реализовывать гарантию доставки сообщений на прикладном уровне, что не лишено смысла в плане скорости, но слишком все усложнит. Поэтому TCP.
Для работы с TCP существуют программные интерфейсы - сокеты. Они представляют из себя ip-адрес с портом. Когда происходит передача данных (потока байтов) по TCP, они режутся на сегменты, каждый из которых снабжается заголовком и отправляется по сети. Сегменты идут от клиентского сокета к серверному, и эти два сокета образуют "логическое соединение". Если к серверному сокету подключаются два клиентских, то, соответственно, мы имеем два соединения.
Для работы с сокетами в языках программирования есть API. Дальше речь идет только про java, потому что... я люблю java :)
Существуют два API для работы с сокетами в java:
Старый. Классы java.net.Socket и java.net.ServerSocket.
Новый, NIO2. Классы java.nio.channels.AsynchronousSocketChannel и java.nio.channels.AsynchronousServerSocketChannel.
В NIO2, на мой взгляд, злоупотребили optional, и обрабатывать исключения не очень удобно - они обернуты в TimeoutException и прочие. Я попробовал оба варианта и остановился на первом.
В общих чертах на клиента нужно будет сделать:
Создать экземпляр Socket и подключить его к серверу, указав ip и порт.
Получить от сокета InputStream и OutputStream для работы с ним.
На сервере:
Создать экземпляр ServerSocket и "повесить" его на порт.
Вызвать на нем метод accept() и, когда клиент подключится, получить из него Socket.
Создать для сокета InputStream и OutputStream.
Если обратиться к гуглу, то можно найти код сервера вида:
public void start(int port) {
serverSocket = new ServerSocket(port);
clientSocket = serverSocket.accept();
out = new PrintWriter(clientSocket.getOutputStream(), true);
in = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
String greeting = in.readLine();
if ("hello server".equals(greeting)) {
out.println("hello client");
} else {
out.println("unrecognised greeting");
}
}
Конечно, код работать не будет. Выполнение остановится на третьей строке, на accept(), пока первый, он же последний, клиент не подключится. Очевидно, что тут нужно вызывать accept() не в основном потоке и, после его срабатывания, тут же вызывать его опять, чтобы ждал нового клиента. И так бесконечно.
Пулы потоков
Для работы с ServerSocket понадобится целых два пула. Первый нужен будет для accept(), а второй - для непосредственной работы с клиентами.
Библиотека concurrency довольно большая, но мне хотелось бы рассмотреть только три пула потоков.
У класса java.util.concurrent.ThreadPoolExecutor есть три важных параметра в конструкторе:
corePoolSize. Количество потоков в пуле, в том числе ожидающих.
maximumPoolSize. Максимальное количество потоков.
keepAliveTime. Время ожидания. Если corePoolSize превышено, и потоки ждут дольше keepAliveTime, то они уничтожаются.
В библиотеке много пулов, различающихся сочетанием этих параметров.
FixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
Для FixedThreadPool размеры corePoolSize=maximumPoolSize, а keepAliveTime=0. Таким образом это пул фиксированного размера. Наверно, самый простой для понимания и часто используемый.
CachedThreadPool
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
Для этого пула corePoolSize=0, но максимальный размер не ограничен (только размером типа int). Он хорош тем, что при работе с ним нет резкого падения производительности при заполнении пула, как в случае с FixedThreadPool.
SingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
Он похож на пул с фиксированным размером, но corePoolSize=maximumPoolSize=1. Именно он подходит для event loop. И именно его мы будем использовать для вызова accept().
Код сервера
ServerSocket listener = new ServerSocket();
listener.bind(new InetSocketAddress(port));
Executor executionPool = Executors.newCachedThreadPool();
Executor acceptPool = Executors.newSingleThreadExecutor();
acceptPool.execute(new Runnable() {
@Override
public void run() {
Socket socket;
try {
socket = listener.accept();
} catch (IOException e) {
return;
}
if (!listener.isClosed()) {
acceptPool.execute(this); // ждем следующего клиента
}
executionPool.execute(() -> {
try {
while (!socket.isClosed() && !listener.isClosed()) {
handle(socket); // непосредственная работа
}
} catch (Exception e) {
// exception
} finally {
// close socket
}
});
}
});
Полный код здесь.
Код клиента
Клиент еще проще:
Socket socket = new Socket();
socket.connect(new InetSocketAddress(host, port));
DataInputStream in = new DataInputStream(socket.getInputStream());
DataOutputStream out = new DataOutputStream(socket.getOutputStream());
Полный код клиента здесь и здесь.
Фреймворк
Результат можно найти на github: https://github.com/artfultom/vecenta.
Артефакты опубликованы в Maven Central:
repositories {
mavenCentral()
}
dependencies {
implementation 'io.github.artfultom:vecenta:0.0.7'
}
Плагин для генерации кода vecenta-gradle-plugin.