История

Механизм удаленных вызовов процедур (Remote Procedure Call, RPC) был предложен сотрудником Xerox PARC Брюсом Нельсоном в 1981 году. Принцип сейчас кажется очевидным, но для того времени был в новинку. С полным текстом статьи можно ознакомиться тут.

Bruce Jay Nelson, Remote Procedure Call, 1981
Bruce Jay Nelson, Remote Procedure Call, 1981

Я специально выбрал самую простую схему из статьи, на которой видно, что на машине C процедура вызывается, а выполняется на другой машине S. Все общение происходит по сети. Выглядит как что-то до боли знакомое, не так ли?

За более сорокалетнюю историю RPC то становился более популярней, то менее. В java-мире 20 лет назад использовали SOAP, расширение XML-RPC, а сейчас популярен gRPC, который гораздо более похож на SOAP, чем кажется.

Сокеты

Сперва хотелось бы акцентировать внимание на сетевое взаимодействие между сервисами. Понять, как именно будут вызывать процедуры (функции, методы - я буду использовать разные термины для одного и того же) на удаленном сервере.

Нам нужно передать сообщение от клиента серверу. И для этого нам нужно использовать какой-то сетевой протокол. Программисты чаще всего работают с http, его выбор мог бы стать результатом привычки. Да и многие решения используют его, как упомянутые ранее SOAP и gRPC. Но мне такой выбор кажется как минимум странным: нам же нужен только транспорт, без методов (GET, POST, PUT и тд) и без кодов (200 и 400) и прочих прикладных вещей. Название вызываемой процедуры будет в теле сообщения, возвращаемый результат - тоже. Правильней будет использовать именно транспортный протокол: TCP или UDP. В случае UDP придется реализовывать гарантию доставки сообщений на прикладном уровне, что не лишено смысла в плане скорости, но слишком все усложнит. Поэтому TCP.

Для работы с TCP существуют программные интерфейсы - сокеты. Они представляют из себя ip-адрес с портом. Когда происходит передача данных (потока байтов) по TCP, они режутся на сегменты, каждый из которых снабжается заголовком и отправляется по сети. Сегменты идут от клиентского сокета к серверному, и эти два сокета образуют "логическое соединение". Если к серверному сокету подключаются два клиентских, то, соответственно, мы имеем два соединения.

Для работы с сокетами в языках программирования есть API. Дальше речь идет только про java, потому что... я люблю java :)

Существуют два API для работы с сокетами в java:

  1. Старый. Классы java.net.Socket и java.net.ServerSocket.

  2. Новый, NIO2. Классы java.nio.channels.AsynchronousSocketChannel и java.nio.channels.AsynchronousServerSocketChannel.

В NIO2, на мой взгляд, злоупотребили optional, и обрабатывать исключения не очень удобно - они обернуты в TimeoutException и прочие. Я попробовал оба варианта и остановился на первом.

В общих чертах на клиента нужно будет сделать:

  1. Создать экземпляр Socket и подключить его к серверу, указав ip и порт.

  2. Получить от сокета InputStream и OutputStream для работы с ним.

На сервере:

  1. Создать экземпляр ServerSocket и "повесить" его на порт.

  2. Вызвать на нем метод accept() и, когда клиент подключится, получить из него Socket.

  3. Создать для сокета InputStream и OutputStream.

Если обратиться к гуглу, то можно найти код сервера вида:

public void start(int port) {
    serverSocket = new ServerSocket(port);
    clientSocket = serverSocket.accept();
    out = new PrintWriter(clientSocket.getOutputStream(), true);
    in = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
    String greeting = in.readLine();
    if ("hello server".equals(greeting)) {
        out.println("hello client");
    } else {
        out.println("unrecognised greeting");
    }
}

Конечно, код работать не будет. Выполнение остановится на третьей строке, на accept(), пока первый, он же последний, клиент не подключится. Очевидно, что тут нужно вызывать accept() не в основном потоке и, после его срабатывания, тут же вызывать его опять, чтобы ждал нового клиента. И так бесконечно.

Пулы потоков

Для работы с ServerSocket понадобится целых два пула. Первый нужен будет для accept(), а второй - для непосредственной работы с клиентами.

Библиотека concurrency довольно большая, но мне хотелось бы рассмотреть только три пула потоков.

У класса java.util.concurrent.ThreadPoolExecutor есть три важных параметра в конструкторе:

  • corePoolSize. Количество потоков в пуле, в том числе ожидающих.

  • maximumPoolSize. Максимальное количество потоков.

  • keepAliveTime. Время ожидания. Если corePoolSize превышено, и потоки ждут дольше keepAliveTime, то они уничтожаются.

В библиотеке много пулов, различающихся сочетанием этих параметров.

FixedThreadPool

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
        0L, TimeUnit.MILLISECONDS,
        new LinkedBlockingQueue<Runnable>());
}

Для FixedThreadPool размеры corePoolSize=maximumPoolSize, а keepAliveTime=0. Таким образом это пул фиксированного размера. Наверно, самый простой для понимания и часто используемый.

CachedThreadPool

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
        60L, TimeUnit.SECONDS,
        new SynchronousQueue<Runnable>());
}

Для этого пула corePoolSize=0, но максимальный размер не ограничен (только размером типа int). Он хорош тем, что при работе с ним нет резкого падения производительности при заполнении пула, как в случае с FixedThreadPool.

SingleThreadExecutor

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
            0L, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<Runnable>()));
}

Он похож на пул с фиксированным размером, но corePoolSize=maximumPoolSize=1. Именно он подходит для event loop. И именно его мы будем использовать для вызова accept().

Код сервера

ServerSocket listener = new ServerSocket();
listener.bind(new InetSocketAddress(port));

Executor executionPool = Executors.newCachedThreadPool();
Executor acceptPool = Executors.newSingleThreadExecutor();

acceptPool.execute(new Runnable() {

    @Override
    public void run() {
        Socket socket;
                    
        try {
            socket = listener.accept();
        } catch (IOException e) {
            return;
        }

        if (!listener.isClosed()) {
            acceptPool.execute(this);	// ждем следующего клиента
        }

        executionPool.execute(() -> {
            try {
                while (!socket.isClosed() && !listener.isClosed()) {
                    handle(socket);	// непосредственная работа
                }
            } catch (Exception e) {
                // exception
            } finally {
                // close socket
            }
        });
    }
});

Полный код здесь.

Код клиента

Клиент еще проще:

Socket socket = new Socket();
socket.connect(new InetSocketAddress(host, port));

DataInputStream in = new DataInputStream(socket.getInputStream());
DataOutputStream out = new DataOutputStream(socket.getOutputStream());

Полный код клиента здесь и здесь.

Фреймворк

Результат можно найти на github: https://github.com/artfultom/vecenta.

Артефакты опубликованы в Maven Central:

repositories {
    mavenCentral()
}

dependencies {
    implementation 'io.github.artfultom:vecenta:0.0.7'
}

Плагин для генерации кода vecenta-gradle-plugin.

Комментарии (0)