Для будущих учащихся на курсе "Java Developer. Basic" подготовили перевод статьи.

Также приглашаем всех желающих на открытый вебинар к данному курсу, где участники вместе с преподавателем разберут, как работает Интернет.


Насколько быстро работает gRPC? Довольно быстро, если вы понимаете, как построены современные клиенты и серверы. В первой части я показал, как легко добиться улучшения на 60%. В этом посте я покажу, как добиться улучшения в 10 000%. 

Как и в первой части, мы начнем с готового сервиса "ключ-значение" на Java. Сервис способен обрабатывать запросы на создание, чтение, изменение, удаление ключей и значений одновременно от нескольких клиентов. Весь код можно посмотреть здесь, если вы хотите с ним поэкспериментировать.

Конкурентность сервера

Давайте посмотрим на класс KvService. Этот сервис обрабатывает RPC, отправленные клиентом, контролируя, чтобы ни один из них случайно не испортил состояние хранилища. Для гарантии этого сервис использует ключевое слово synchronized, чтобы быть уверенным, что одновременно будет активен только один RPC:

private final Map<ByteBuffer, ByteBuffer> store = new HashMap<>();

@Override
public synchronized void create(
    CreateRequest request, StreamObserver<CreateResponse> responseObserver) {
  ByteBuffer key = request.getKey().asReadOnlyByteBuffer();
  ByteBuffer value = request.getValue().asReadOnlyByteBuffer();
  simulateWork(WRITE_DELAY_MILLIS);
  if (store.putIfAbsent(key, value) == null) {
    responseObserver.onNext(CreateResponse.getDefaultInstance());
    responseObserver.onCompleted();
    return;
  }
  responseObserver.onError(Status.ALREADY_EXISTS.asRuntimeException());
}

Хотя этот код является потокобезопасным, но он очень дорогой: активным может быть только один RPC! Нам нужен способ, позволяющий безопасно выполнять несколько операций одновременно. В противном случае программа не сможет использовать все доступные процессоры.

Убираем блокировки

Чтобы решить эту проблему, нам нужно немного больше знать о семантике наших RPC. Чем больше мы знаем о том, как должны работать RPC, тем больше оптимизаций мы сможем сделать. Для сервиса «ключ-значение» можно заметить, что операции с разными ключами не мешают друг другу. Когда мы обновляем ключ "foo", это не имеет никакого отношения к ключу "bar". Но наш сервер написан таким образом, что операции с любыми ключами синхронизированы относительно друг друга. Если бы мы могли выполнять операции с разными ключами одновременно, то наш сервер справился бы с гораздо большей нагрузкой.

Давайте подумаем, как реализовать эту идею. Ключевое слово synchronized заставляет Java получить блокировку на объекте, которым в данном случае будет экземпляр KvService. Блокировка устанавливается при входе в метод create и снимается при возврате из метода. Синхронизация нам нужна для защиты Map. Мы используем HashMap и, если она не будет правильно синхронизирована, то внутреннее состояние HashMap может быть повреждено. Поэтому нельзя просто удалить synchronized у метода.

Однако Java предлагает здесь решение: ConcurrentHashMap. Этот класс предлагает возможность безопасного конкурентного доступа к содержимому Map. Например, в нашем случае мы хотим проверить наличие ключа. Если ключа нет, мы его добавляем, иначе возвращаем ошибку. Метод putIfAbsent атомарно проверяет наличие значения, добавляет его, в случае отсутствия, и сообщает нам, удалось ли это сделать.

ConcurrentMap обеспечивает более сильные гарантии безопасности putIfAbsent, поэтому мы можем заменить HashMap на ConcurrentHashMap и убрать synchronized:

private final ConcurrentMap<ByteBuffer, ByteBuffer> store = new ConcurrentHashMap<>();

@Override
public void create(
    CreateRequest request, StreamObserver<CreateResponse> responseObserver) {
  ByteBuffer key = request.getKey().asReadOnlyByteBuffer();
  ByteBuffer value = request.getValue().asReadOnlyByteBuffer();
  simulateWork(WRITE_DELAY_MILLIS);
  if (store.putIfAbsent(key, value) == null) {
    responseObserver.onNext(CreateResponse.getDefaultInstance());
    responseObserver.onCompleted();
    return;
  }
  responseObserver.onError(Status.ALREADY_EXISTS.asRuntimeException());
}

Продолжаем оптимизацию

Улучшить create было довольно просто. Сделать то же самое для retrieve и delete также сложностей не представляет. Однако метод update немного сложнее. Давайте посмотрим, что он делает:

@Override
public synchronized void update(
    UpdateRequest request, StreamObserver<UpdateResponse> responseObserver) {
  ByteBuffer key = request.getKey().asReadOnlyByteBuffer();
  ByteBuffer newValue = request.getValue().asReadOnlyByteBuffer();
  simulateWork(WRITE_DELAY_MILLIS);
  ByteBuffer oldValue = store.get(key);
  if (oldValue == null) {
    responseObserver.onError(Status.NOT_FOUND.asRuntimeException());
    return;
  }
  store.replace(key, oldValue, newValue);
  responseObserver.onNext(UpdateResponse.getDefaultInstance());
  responseObserver.onCompleted();
}

Обновление ключа до нового значения требует двух взаимодействий со store:

  1. Проверить существование ключа.

  2. Изменить предыдущее значение на новое.

К сожалению, у ConcurrentMap нет прямого метода для этого. Поскольку мы можем быть не единственными, кто изменяет Map, нам нужно учитывать это. Мы прочитаем прежнее значение, но к моменту, когда мы его заменим, оно может быть уже удалено.

Чтобы справится с этой ситуацией, давайте попробуем повторить попытку, если replace не удастся выполнить. Метод replace возвращает true, если все прошло успешно. (ConcurrentMap гарантирует, что операции не повредят внутреннюю структуру, но не то, что они будут успешными!) Мы будем использовать цикл do-while:

@Override
public void update(
    UpdateRequest request, StreamObserver<UpdateResponse> responseObserver) {
  // ...
  ByteBuffer oldValue;
  do {
    oldValue = store.get(key);
    if (oldValue == null) {
      responseObserver.onError(Status.NOT_FOUND.asRuntimeException());
      return;
    }
  } while (!store.replace(key, oldValue, newValue));
  responseObserver.onNext(UpdateResponse.getDefaultInstance());
  responseObserver.onCompleted();
}

Следует отметить, что если другой RPC изменит значение между store.get() и store.replace(), то будет ошибка. Но для нас это некритично, поэтому мы просто пробуем еще раз. После успешного сохранения нового значения сервис отправляет ответ пользователю.

Есть еще одна ситуация, которая может произойти: два RPC могут изменять одно и то же значение и перезаписать работу друг друга. Хотя это может быть приемлемым для некоторых приложений, но это не подходит для API, обеспечивающих транзакционность. Исправление этого выходит за рамки данного поста, но имейте в виду, что такое может произойти.

Измеряем производительность

В прошлом посте мы модифицировали клиент, сделав его асинхронным и использовали ListenableFuture. Чтобы избежать нехватки памяти мы ограничили до 100 количество одновременно активных RPC. Как мы видим, на стороне сервера узкое место — это блокировки. И поскольку мы избавились от них, то ожидаем увидеть значительное улучшение производительности. Для обработки одного RPC выполняется все та же работа, но выполняя ее параллельно, можно сделать гораздо больше. Давайте проверим, верна ли наша гипотеза:

До изменения:

$ ./gradlew installDist
$ time ./build/install/kvstore/bin/kvstore
Apr 16, 2018 10:38:42 AM io.grpc.examples.KvRunner runClient
INFO: Did 24.067 RPCs/s

real	1m0.886s
user	0m9.340s
sys	0m1.660s

После изменения:

Apr 16, 2018 10:36:48 AM io.grpc.examples.KvRunner runClient
INFO: Did 2,449.8 RPCs/s

real	1m0.968s
user	0m52.184s
sys	0m20.692s

Вау! Было 24 RPC в секунду, а стало 2400. И нам не пришлось менять ни API, ни клиента. Вот почему важно понимать ваш код и семантику API. Благодаря использованию особенностей сервиса «ключ-значение», а именно независимости операций с различными ключами, код теперь работает намного быстрее.

Одним из примечательных артефактов этого кода является время user в результатах. Первоначальное значение было только 9 секунд, то есть процессор использовался только 9 секунд из 60 секунд. После наших изменений время увеличилось более чем в 5 раз, до 52 секунд. Причина в том, что ядра процессора использовались больше. KvServer эмулирует работу, засыпая на несколько миллисекунд. В реальном приложении он выполнял бы полезную работу, но каких-то кардинальных изменений не было. Вместо масштабирования по количеству RPC, он будет масштабироваться по количеству ядер. Таким образом, если бы у машины было 12 ядер, то можно ожидать 12-кратное улучшение. Весьма неплохо!

Больше ошибок

Если вы запустите этот код, то в логах увидите намного больше подобных сообщений:

Apr 16, 2018 10:38:40 AM io.grpc.examples.KvClient$3 onFailure
INFO: Key not found
io.grpc.StatusRuntimeException: NOT_FOUND

Причина в том, что новая версия кода делает состояние гонки более вероятным. Если выполняется в 100 раз больше RPC, то вероятность коллизий обновления и удаления выше. Чтобы решить эту проблему, нам нужно будет изменить API. Следите за постами, и увидите, как это исправить.

Выводы

Есть много способов оптимизировать gRPC-код. Для этого необходимо понимать, что делает ваш код. В этом посте показано, как преобразовать сервис с блокировками в неблокирующий с низким уровнем конкуренции. Всегда измеряйте производительность до и после внесения изменений.


Узнать подробнее о курсе "Java Developer. Basic".

Смотреть открытый вебинар «Как работает Интернет».