Часто возникает вопрос о том, как ускорить gRPC. gRPC позволяет реализовать высокопроизводительный RPC, но не всегда понятно как достичь этого быстродействия. И я решил попытаться показать ход своих мыслей при оптимизации программ.

Рассмотрим простой сервис "ключ-значение", который используется несколькими клиентами. Сервис должен корректно работать при параллельных операциях изменения данных. Также должна быть возможность масштабирования. И, в конце концов, он должен быть быстрым. Для реализации подобного сервиса gRPC будет идеальным вариантом. Давайте посмотрим, как его реализовать.

Я написал пример клиента и сервера на Java. В примере три основных класса и protobuf-файл, описывающий API:

  • KvClient — имитирует пользователя сервиса "ключ-значение". Он случайным образом создает, извлекает, изменяет и удаляет ключи и значения. Размер используемых ключей и значений также определяется случайным образом, используя экспоненциальное распределение.

  • KvService — реализация сервиса "ключ-значение". Он обрабатывает запросы от клиентов. Для имитации хранения данных на диске при операциях чтения и записи добавляются небольшие задержки в 10 и 50 мс, чтобы сделать пример похожим на настоящую базу данных.

  • KvRunner — организует взаимодействие между клиентом и сервером. Это основная точка входа, запускающая процессы клиента и сервера, и ожидающая, пока клиент выполнит свою работу. Runner работает в течение 60 секунд, а затем выводит количество выполненных RPC.

  • kvstore.proto — определение Protocol Buffers нашего сервиса. Здесь описывается, что клиенты могут ожидать от сервиса. Для простоты в качестве операций мы будем использовать Create, Retrieve, Update и Delete (широко известные как CRUD). Эти операции работают с ключами и значениями, состоящими из произвольных байт. Хотя это в некоторой степени похоже на REST, но у нас остается возможность добавить более сложные операции в будущем.

Protocol Buffers можно использовать и без gRPC — это удобный способ определения интерфейсов сервисов, а также генерации клиентского и серверного кода. Генерируемый код действует как клей между логикой приложения и библиотекой gRPC. Этот gRPC-код, используемый клиентом, мы называем стабом (stub, заглушка).

Первоначальная версия

Клиент

Теперь, когда мы разобрались с тем, что программа должна делать, можно начать изучать, как она это делает. Как уже упоминалось выше, клиент выполняет случайные RPC. Например, следующий код делает запрос create:

private void doCreate(KeyValueServiceBlockingStub stub) {
  ByteString key = createRandomKey();
  try {
    CreateResponse res = stub.create(
        CreateRequest.newBuilder()
            .setKey(key)
            .setValue(randomBytes(MEAN_VALUE_SIZE))
            .build());
    if (!res.equals(CreateResponse.getDefaultInstance())) {
      throw new RuntimeException("Invalid response");
    }
  } catch (StatusRuntimeException e) {
    if (e.getStatus().getCode() == Code.ALREADY_EXISTS) {
      knownKeys.remove(key);
      logger.log(Level.INFO, "Key already existed", e);
    } else {
      throw e;
    }
  }
}

Создается случайный ключ и случайное значение. Запрос отправляется на сервер, и клиент ждет ответа. Когда ответ получен, проверяется, соответствует ли он тому, что ожидалось, и если не соответствует, то бросается исключение. Несмотря на то что ключи генерируются случайным образом, они должны быть уникальны, поэтому необходимо убедиться, что ключ еще не используется. Для проверки уникальности мы отслеживаем созданные ключи, чтобы не использовать один и тот же ключ дважды. Однако вполне вероятно, что какой-то другой клиент уже создал определенный ключ, поэтому мы его логируем и идем дальше. В противном случае выбрасывается исключение.

Здесь мы используем блокирующий gRPC API, который отправляет запрос и ждет ответа. Это самый простой gRPC-стаб, блокирующий поток. Получается, что клиент одновременно может выполнять не более одного RPC.

Сервер

На стороне сервера запрос обрабатывает следующий код:

private final Map<ByteBuffer, ByteBuffer> store = new HashMap<>();

@Override
public synchronized void create(
    CreateRequest request, StreamObserver<CreateResponse> responseObserver) {
  ByteBuffer key = request.getKey().asReadOnlyByteBuffer();
  ByteBuffer value = request.getValue().asReadOnlyByteBuffer();
  simulateWork(WRITE_DELAY_MILLIS);
  if (store.putIfAbsent(key, value) == null) {
    responseObserver.onNext(CreateResponse.getDefaultInstance());
    responseObserver.onCompleted();
    return;
  }
  responseObserver.onError(Status.ALREADY_EXISTS.asRuntimeException());
}

Сервис извлекает из запроса ключ и значение как ByteBuffer. И для контроля, что одновременные запросы не повредят хранилище, метод объявлен как synchronized. После имитации записи на диск данные сохраняются в Map.

В отличие от клиентского кода, серверный обработчик является неблокирующим. Для отправки ответа вызывается onNext() у responseObserver. Для завершения отправки сообщения вызывается onCompleted().

Производительность

Код выглядит корректным и безопасным — давайте посмотрим, как он работает. Я использую компьютер с Ubuntu, 12-ти ядерным процессором и 32 ГБ памяти. Давайте выполним сборку и запустим:

$ ./gradlew installDist
$ time ./build/install/kvstore/bin/kvstore
Feb 26, 2018 1:10:07 PM io.grpc.examples.KvRunner runClient
INFO: Starting
Feb 26, 2018 1:11:07 PM io.grpc.examples.KvRunner runClient
INFO: Did 16.55 RPCs/s

real	1m0.927s
user	0m10.688s
sys	0m1.456s

Ну и дела! На такой мощной машине только 16 RPC в секунду. Процессор почти не используется, и мы не знаем, сколько потреблялось памяти. Давайте выяснять, почему получился такой плохой результат.

Оптимизация

Анализ

Прежде чем вносить какие-либо изменения, давайте разберемся, что делает программа. Нам нужно выяснить, где код тратит свое время, чтобы понять что необходимо оптимизировать. На этом этапе инструменты профилирования пока не нужны, мы можем просто проанализировать код.

Клиент запускается и последовательно выполняет RPC примерно в течение минуты. На каждой итерации он случайным образом решает, какую операцию выполнить:

void doClientWork(AtomicBoolean done) {
  Random random = new Random();
  KeyValueServiceBlockingStub stub = KeyValueServiceGrpc.newBlockingStub(channel);

  while (!done.get()) {
    // Pick a random CRUD action to take.
    int command = random.nextInt(4);
    if (command == 0) {
      doCreate(stub);
      continue;
    }
    /* ... */
    rpcCount++;
  }
}

Это означает, что одновременно может выполняться не более одного RPC. Каждый вызов должен ждать завершения предыдущего. А сколько времени занимает каждый RPC? После изучения кода сервера мы видим, что самая тяжелая операция занимает около 50 мс. При максимальной эффективности получится выполнить только 20 запросов в секунду:

20 запросов = 1000 мс / (50 мс / запрос)

Наш код может выполнять около 16 запросов в секунду, что кажется правдой. Мы можем проверить это предположение, посмотрев на вывод команды time, используемой для запуска кода. При выполнении запросов в методе simulateWork сервер просто спит (sleep). Получается, что программа в основном простаивает, ожидая завершения RPC.

Можно подтвердить это, посмотрев выше на реальное время выполнения между стартом и завершением (real) и на время использования процессора (user). Прошла одна минута, но процессор потратил только 10 секунд. Мой мощный многоядерный процессор был занят только 16% времени. Таким образом, если заставить программу выполнять больше работы в течение этого времени, то, похоже, можно увеличить количество RPC.

Гипотеза

Теперь мы четко видим проблему и можем предложить решение. Один из способов ускорить работу — убедиться, что процессор не простаивает. Для этого мы будем выполнять работу параллельно.

В gRPC-библиотеке для Java есть три типа стабов: блокирующие, неблокирующие и ListenableFuture. Мы уже видели блокирующий в клиенте и неблокирующий на сервере. ListenableFuture API — это компромисс между ними, предлагающий как блокирующее, так и не блокирующее поведение. До тех пор, пока мы не блокируем поток, ожидающий завершения работы, мы можем запускать новые RPC, не дожидаясь завершения старых. 

Эксперимент

Для проверки нашей гипотезы давайте используем в клиенте ListenableFuture. А это значит, что теперь нам надо больше думать о конкурентности в коде. Например, при отслеживании используемых ключей на клиенте нам нужно безопасно читать, изменять и записывать ключи. Также необходимо убедиться, что в случае ошибки мы прекратим выполнять новые RPC (правильная обработка ошибок будет рассмотрена в следующем посте). Наконец, нам нужно увеличить счетчик выполненных RPC и не забыть, что RPC выполняются в разных потоках.

Все эти изменения усложняют код. Но это компромисс, на который мы идем для улучшения производительности. Часто простота кода противоречит оптимизации. Тем не менее приведенный ниже код по-прежнему вполне читабельный и поток выполнения идет сверху вниз. Вот исправленный метод doCreate():

private void doCreate(KeyValueServiceFutureStub stub, AtomicReference<Throwable> error) {
  ByteString key = createRandomKey();
  ListenableFuture<CreateResponse> res = stub.create(
      CreateRequest.newBuilder()
          .setKey(key)
          .setValue(randomBytes(MEAN_VALUE_SIZE))
          .build());
  res.addListener(() -> rpcCount.incrementAndGet(), MoreExecutors.directExecutor());
  Futures.addCallback(res, new FutureCallback<CreateResponse>() {
    @Override
    public void onSuccess(CreateResponse result) {
      if (!result.equals(CreateResponse.getDefaultInstance())) {
        error.compareAndSet(null, new RuntimeException("Invalid response"));
      }
      synchronized (knownKeys) {
        knownKeys.add(key);
      }
    }

    @Override
    public void onFailure(Throwable t) {
      Status status = Status.fromThrowable(t);
      if (status.getCode() == Code.ALREADY_EXISTS) {
        synchronized (knownKeys) {
          knownKeys.remove(key);
        }
        logger.log(Level.INFO, "Key already existed", t);
      } else {
        error.compareAndSet(null, t);
      }
    }
  });
}

Стаб был изменен на KeyValueServiceFutureStub, который создает Future вместо прямого возврата значения. В gRPC Java используется ListenableFuture, который позволяет добавить обратный вызов при завершении Future. Здесь нас не сильно беспокоит ответ. Нас больше волнует, выполнился RPC или нет. Поэтому код в основном обрабатывает ошибки, а не ответ.

Первое изменение в том как мы считаем количество RPC. Вместо того чтобы увеличивать счетчик в основном цикле, мы увеличиваем его по завершении RPC.

Далее для каждого RPC мы создаем новый объект, который обрабатывает как успешные, так и неудачные попытки. Так как doCreate()уже завершится к моменту запуска метода обратного вызова после выполненного RPC, нам нужен способ распространения ошибок, отличный от throw. Вместо этого мы пытаемся обновить ссылку атомарно. Основной цикл время от времени проверяет, не произошла ли ошибка, и останавливается при ее обнаружении.

И, наконец, в коде добавляется ключ в knownKeys только тогда, когда RPC действительно завершен, и удаляется тогда, когда известно, что произошел сбой. Мы синхронизируемся на переменной, чтобы убедиться, что два потока не конфликтуют. Примечание: хотя доступ к knownKeys потокобезопасный, но вероятность возникновения состояния гонки остается. Один поток может прочитать из knownKeys, потом второй удалить из knownKeys, а потом первый может выполнить RPC, используя прочитанный ключ. Синхронизация на ключах гарантирует только согласованность, но не их правильность. Исправление этого выходит за рамки данного поста, поэтому мы просто логируем это и двигаемся дальше. Вы увидите несколько таких сообщений в логе, когда запустите программу.

Запускаем код

Если вы запустите эту программу, то увидите, что она не работает:

WARNING: An exception was thrown by io.grpc.netty.NettyClientStream$Sink$1.operationComplete()
java.lang.OutOfMemoryError: unable to create new native thread
	at java.lang.Thread.start0(Native Method)
	at java.lang.Thread.start(Thread.java:714)
	...

Что?! Зачем я показываю вам код, который не работает? Обычно в реальной жизни с первой попытки ничего не работает. В данном случае не хватило памяти. А когда программе не хватает памяти, начинают происходить странные вещи. И часто первопричину найти довольно трудно, так как бывает много сбивающих с толку моментов. Сообщение об ошибке говорит: "unable to create new native thread" (не удалось создать новый нативный поток), хотя мы не создавали никаких новых потоков. В устранении подобных проблем мне очень помогает опыт, а не отладка. Я часто встречался с OOM и понял, что Java говорит нам о последней капле, переполнившей чашу. Наша программа начала использовать много памяти и последнее выделение памяти, которое не удалось выполнить, случайно произошло при создании потока.

Итак, что же случилось? В блокирующей версии очередной RPC не запускался, пока не завершался предыдущий. Это было медленно, но это и помогало нам не создать тонны RPC, для которых у нас в итоге не хватило памяти. Мы должны учесть это в версии с ListenableFuture.

Для решения этой проблемы можно реализовать ограничение на количество активных RPC. Перед запуском нового RPC мы постараемся получить разрешение. Если мы его получаем, то выполняем RPC. Если нет, то ждем, пока оно не станет доступным. Когда RPC завершится (успешно или неудачно), разрешение возвращается. Для этого мы будем использовать семафор (Semaphore):

private final Semaphore limiter = new Semaphore(100);

private void doCreate(KeyValueServiceFutureStub stub, AtomicReference<Throwable> error)
    throws InterruptedException {
  limiter.acquire();
  ByteString key = createRandomKey();
  ListenableFuture<CreateResponse> res = stub.create(
      CreateRequest.newBuilder()
          .setKey(key)
          .setValue(randomBytes(MEAN_VALUE_SIZE))
          .build());
  res.addListener(() ->  {
    rpcCount.incrementAndGet();
    limiter.release();
  }, MoreExecutors.directExecutor());
  /* ... */
}

Теперь код работает успешно и память не заканчивается.

Результаты

После внесенных изменений все выглядит намного лучше:

$ ./gradlew installDist
$ time ./build/install/kvstore/bin/kvstore
Feb 26, 2018 2:40:47 PM io.grpc.examples.KvRunner runClient
INFO: Starting
Feb 26, 2018 2:41:47 PM io.grpc.examples.KvRunner runClient
INFO: Did 24.283 RPCs/s

real	1m0.923s
user	0m12.772s
sys	0m1.572s

Наш код выполняет на 46% больше RPC в секунду, чем раньше. Также мы видим, что используется примерно на 20% больше процессора. Наша гипотеза оказалась верной, и внесенные изменения сработали. И при этом мы не делали никаких изменений на сервере. А также не использовали какие-то специальные профилировщики или трассировщики.

Есть ли смысл в этих цифрах? Мы ожидаем, что примерно 1/4 операций будут мутабельными (create, update и delete). И чтение также будет в 1/4 случаях. Среднее время RPC должно быть примерно равно средневзвешенному значению:

.25 * 50ms (create)
  .25 * 10ms (retrieve)
  .25 * 50ms (update)
 +.25 * 50ms (delete)
------------
        40ms

При среднем значении в 40 мс на один RPC мы ожидаем, что количество RPC в секунду будет:

25 запросов = 1000 мс / (40 мс / запрос)

Это примерно то, что мы и видим. Но сервер по прежнему обрабатывает запросы последовательно, поэтому в будущем нам предстоит еще поработать над ним. Хотя на данный момент наша оптимизация, похоже, сработала.

Выводы

Есть много способов оптимизировать gRPC-код. Для этого необходимо понимать, что ваш код делает и что он должен делать. В этом посте показаны только самые основы того, как подходить к оптимизации. Всегда измеряйте производительность до и после внесения изменений и используйте эти измерения как руководство для оптимизации.


Перевод статьи подготовлен в преддверии старта курса Java Developer. Basic. Приглашаем всех желающих посетить бесплатный вебинар, в рамках которого наши эксперты расскажут о карьерных перспективах после прохождения курса, а также ответят на интересующие вас вопросы.

Записаться на вебинар.