Вступление

Привет, меня зовут Денис Агапитов, я руководитель группы Platform Core компании Bercut. Работаю в компании без малого 20 лет, из них 18 пишу на Java.

Сегодня я расскажу об опыте увеличения производительности сетевого стэка и проблемах, с которыми можно столкнуться при использовании NIO в Java.

Эта статья основана на реальной практике борьбы с "OutOfMemory: direct memory" в шине данных гибридной интеграционной платформы.

Группа Platform Core, которой я руковожу, занимается разработкой и развитием гибридной интеграционной платформы, поддержкой систем и сервисов, написанных на платформе.

Платформа включает в себя:

  • Шину данных ESB.

  • Приложения API Gateway, SLES (сервер исполнения бизнес-процессов), SA Container (сервер с сервисами на Java), Notification Broker.

  • Платформенные сервисы: Scheduler, Service Profile Management и прочие.

  • Поддержку интеграции со Spring.

Итак, начнём с предпосылок, которые подвигли заняться анализом данной проблемы.

Особенности работы шины данных в Bercut

Наша гибридная интеграционная платформа имеет свою транспортную шину (RTSIB). Это ESB (enterprise service bus) в рамках архитектуры SOA (service-oriented architecture) со своими стеками HTTP и проприетарного асинхронного протокола RTSIB.

По своей сути это mesh-сеть между разными узлами и приложениями платформы.

Каждое RTSIB соединение обслуживается двумя потоками - читающим и пишущим, при этом пишущий поток вступает в игру только в том случае, если появились данные для отправки, а сокет занят отправкой другого пакета. В этом случае текущий добавляется в очередь отправки. Если же сокет свободен и доступен для записи, то запись в сокет происходит прямо с потока бизнес-логики.

HTTP соединения (на текущий момент мы поддерживаем версию 1.x) в виду синхронной архитектуры не требуют большого количества потоков обслуживания, потоки для них достаются из пула - примерно один поток на 25 соединений.

Такой подход имеет как плюсы и минусы, сегодня обсуждать мы их не будем, а просто возьмём за исходные данные, что на каждом приложении нашей платформы существует довольно большое количество потоков, которые работают с сокетами.

Особенности работы с сетью TCP/IP в неблокирующем режиме в Java

Кто уже интересовался тем, как работает запись в сокет на Java или просто любит смотреть исходные коды JDK, вероятно знает основные особенности работы с сокетом, но для понимания проблемы предлагаю ещё раз их проговорить.

Из исходных кодов Open JDK 13 (основная версия, используемой у нас Java) видно, что если записываемый ByteBuffer является DirectByteBuffer, то запись происходит сразу (writeFromNativeBuffer), а если он расположен в Heap, то сначала достаётся временный DirectByteBuffer, производится копирование и запись из временного DirectByteBuffer.

Код записи в сокет из Open JDK 13 ( IOUtil.java ):

static int write(FileDescriptor fd, ByteBuffer src, long position,
                 boolean directIO, int alignment, NativeDispatcher nd)
    throws IOException
{
    if (src instanceof DirectBuffer) {
        return writeFromNativeBuffer(fd, src, position, directIO, alignment, nd);
    }
 
    // Substitute a native buffer
    int pos = src.position();
    int lim = src.limit();
    assert (pos <= lim);
    int rem = (pos <= lim ? lim - pos : 0);
    ByteBuffer bb;
    if (directIO) {
        Util.checkRemainingBufferSizeAligned(rem, alignment);
        bb = Util.getTemporaryAlignedDirectBuffer(rem, alignment);
    } else {
        bb = Util.getTemporaryDirectBuffer(rem);
    }
    try {
        bb.put(src);
        bb.flip();
        // Do not update src until we see how many bytes were written
        src.position(pos);
 
        int n = writeFromNativeBuffer(fd, bb, position, directIO, alignment, nd);
        if (n > 0) {
            // now update src
            src.position(pos + n);
        }
        return n;
    } finally {
        Util.offerFirstTemporaryDirectBuffer(bb);
    }
}

Дополнительно осложняет ситуацию то, что внутри реализации JDK имеется КЭШ DirectByteBuffer с привязкой к потоку (ThreadLocal):

private static ThreadLocal<BufferCache> bufferCache = new TerminatingThreadLocal<>() {
    @Override
    protected BufferCache initialValue() {
        return new BufferCache();
    }
    @Override
    protected void threadTerminated(BufferCache cache) { // will never be null
        while (!cache.isEmpty()) {
            ByteBuffer bb = cache.removeFirst();
            free(bb);
        }
    }
};
 
public static ByteBuffer getTemporaryDirectBuffer(int size) {
    // If a buffer of this size is too large for the cache, there
    // should not be a buffer in the cache that is at least as
    // large. So we'll just create a new one. Also, we don't have
    // to remove the buffer from the cache (as this method does
    // below) given that we won't put the new buffer in the cache.
    if (isBufferTooLarge(size)) {
        return ByteBuffer.allocateDirect(size);
    }
 
    BufferCache cache = bufferCache.get();
    ByteBuffer buf = cache.get(size);
    if (buf != null) {
        return buf;
    } else {
        // No suitable buffer in the cache so we need to allocate a new
        // one. To avoid the cache growing then we remove the first
        // buffer from the cache and free it.
        if (!cache.isEmpty()) {
            buf = cache.removeFirst();
            free(buf);
        }
        return ByteBuffer.allocateDirect(size);
    }
}

И после каждого использования временного DirectByteBuffer, он помещается в КЭШ. При этом, если в КЭШе нет DirectByteBuffer необходимого размера, он аллоцируется и после использования также помещается в КЭШ ( Util.offerFirstTemporaryDirectBuffer(bb) ).

Суть проблемы

В первых версиях платформы мы просто использовали HeapByteBuffer через простую и понятную static-функцию ByteBuffer.wrap(byte[] data) и бед вроде как не знали.

Всё работало, скорость была достаточная для текущих telecom-сервисов, работающих на платформе, но в один прекрасный день размер данных DWH (Data Warehouse), проходящих через нашу шину достиг критического объёма в мегабайтах и мы получили OOM Direct Memory.

Почему же так произошло? А вот почему: как обозначил выше, мы имеем mesh-сеть с множеством обслуживающих потоков и имеем данные большого размера, проходящие через эти потоки, которые складывают off-heap память в ThreadLocal КЭШи этих потоков. Достигнув предела насыщения использования off-heap памяти  мы получаем OOM. Конечно, первым действием было увеличение параметра запуска JVM "-XX:MaxDirectMemorySize". Размер используемой direct памяти пришлось увеличить, потом увеличить ещё. Это стало тем самым звоночком, что с проблемой надо разбираться и как можно скорее.

Анализ возможных путей решения

После осознания проблемы, мы провели анализ возможных путей её решения и нашли следующие варианты:

  • Писать в цикле блоками, сдвигая вручную position и limit в записываемом ByteBuffer. Это должно помочь, так как в IOUtils временный DirectByteBuffer выделяется размером size = limit - position.

  • Перейти на использование ByteBuffer.allocateDirect().

  • Написать промежуточную абстракцию, содержащую нарезку из ByteBuffer одного размера, где ByteBuffer одного размера берутся из общего пула и после использования возвращаются обратно.

Погружаясь всё глубже в исследование проблемы стало понятно, что необходимо провести сравнительное тестирование разных размеров ByteBuffer и разных вариантов их использования для чтения и записи из сокета.

За несколько часов я написал тестовое приложение, которое эмулирует 4 вида работы с сокетом:

  • Аллоцирование HeapByteBuffer при каждой записи/чтении.

  • Аллоцирование DirectByteBuffer при каждой записи/чтении.

  • Переиспользование HeapByteBuffer при каждой записи/чтении.

  • Переиспользование DirectByteBuffer при каждой записи/чтении.

В тестовом приложении отсутствует маршаллинг (заполнение реальными данными), а присутствует только работа по записи и чтению из сокета с разными вариантами использования ByteBuffer.

Здесь приводить исходные коды не буду, но кто желает может ознакомится с ними на Github.

На выходе мы получили такую картину:

Из результатов видно, что до 1Mb самым медленным вариантом является аллокация DirectByteBuffer. Аллокация HeapByteBuffer через wrap и кэшированный HeapByteBuffer примерно равны с небольшим лидерством кэшированного. Из общей картины выбивается кэшированный DirectByteBuffer, что логично, так как пишется он напрямую, а время на аллокацию отсутствует.

Выбор и реализация

Для реализации выбрали 3 вариант решения проблемы: написать промежуточную абстракцию, содержащую нарезку из DirectByteBuffer одного размера, которые берутся и возвращаются в общий пул. За основной размер части пакета (размер DirectByteBuffer) было выбрано значение в 32Kb как минимальный по размеру пик при тестировании пропускной способности. Безусловно, так как у нас реализован и стэк HTTPs, фабрика может отдавать и пул с отличными от 32Kb размерами DirectByteBuffer, опираясь на PacketBufferSize и ApplicationBufferSize из настроек текущей сессии SSLEngine.

При написании слоя абстракции, названной CompositeBuffer, конечно же я реализовал и Input/Output streams, работающие напрямую с CompositeBuffer. Это было необходимо для нормальной работы слоя marshalling/unmarshalling.

В качестве хранилища уже аллоцированных DirectByteBuffer сделал простой стэк на CAS механизме:

public class CasStack<L extends LinkedObject<L>> {
 
    public interface LinkedObject<L extends LinkedObject> {
     
        public L getNext();
         
        public void setNext(L next);
         
    }
 
    private final AtomicReference<L> head = new AtomicReference<>();
 
    public void add(L lo) {
        for (;;) {
            lo.setNext(head.get());
            if (head.compareAndSet(lo.getNext(), lo)) {
                return;
            }
        }
    }
 
    public L poll() {
        L lo;
        for (;;) {
            lo = head.get();
            if (lo == null) {
                return null;
            }
            if (head.compareAndSet(lo, lo.getNext())) {
                lo.setNext(null);
                return lo;
            }
        }
 
    }
 
}

А примерно вот так выглядит часть основного класса CompositeBuffer в разрезе работы с чтением из сокета и записью в сокет (код был адаптирован для статьи):

DirectByteBuffer[] buffers;
int pos;
 
@Override
public int getBufIndex(int position) {
    return position / pool.getPartCapacity();
}
 
@Override
public int read(ReadableByteChannel channel) throws IOException {
    ensureCapacity();
    int cur = getBufIndex(pos), readed = 0, read;
    for (;;) {
        try {
            if (hasRemaining()) {
                if (buffers[cur].hasRemaining()) {
                    read = channel.read(containers[cur]);
                    if (read < 0) {
                        return readed > 0 ? readed : read;
                    }
                    pos += read;
                    readed += read;
                    if (containers[cur].hasRemaining()) {
                        return readed;
                    }
                }
                cur++;
                if (cur == containers.length) {
                    expandCapacity();
                }
            } else {
                return readed;
            }
        } catch (IOException) {
            if (readed > 0) {
                return readed;
            }
            throw ex;
        }
    }
}
 
@Override
public int write(WritableByteChannel channel) throws IOException {
    int cur = getBufIndex(pos), writed = 0, write;
    for (;;) {
        try {
            if (hasRemaining()) {
                if (buffers[cur].hasRemaining()) {
                    write = channel.write(buffers[cur]);
                    if (write < 0) {
                        return writed > 0 ? writed : write;
                    }
                    pos += write;
                    writed += write;
                    if (buffers[cur].hasRemaining()) {
                        return writed;
                    }
                }
                cur++;
                if (cur == containers.length) {
                    if (writed == 0 && hasRemaining()) {
                        release();
                        throw new CompositeBufferLifecycleError();
                    }
                    return writed;
                }
            } else {
                return writed;
            }
        } catch (IOException ex) {
            if (writed > 0) {
                return writed;
            }
            throw ex;
        }
    }
}

Конечно же пришлось написать далеко не один класс, а ещё несколько уровней абстракции, таких, как DirectContainer и механизмы addRef/releaseRef, проверку на ошибки жизненного цикла всей библиотеки и многое другое.

Заключение

По завершению оптимизации и переходу на переиспользуемый DirectByteBuffer пропускная способность шины увеличилась примерно вдвое.

До данной доработки размер off-heap памяти мог достигать 1-3Gb и складывался из максимальных размеров сообщений, прошедших через каждое соединение.

Сейчас же потребление off-heap памяти пулами довольно скромное - на среднем сервисе оно составляет всего 10-20 Mb.

На более сложном компоненте с парой сотен входящих вызовов в секунду, которые порождают до 15 тысяч внутренних вызовов на каждый входящий вызов - размер off-heap пула занимает менее 100Mb.

Важно ещё и то, что теперь off-heap память, используемая приложениями для работы с сетью более контролируема и растёт не от корреляции с количеством соединений (потоков), а только от корреляции с количеством сообщений проходящих через шину в единицу времени и средним размером сообщения.

При этом надо понимать, что размер off-heap памяти не может быть меньше максимального размера сообщения, когда-либо проходившего через узел.

Конечно, полностью исключить OOM direct memory таким решением всё равно не получится, но теперь off-heap память можно прогнозировать.

Комментарии (7)


  1. Krusho
    21.09.2023 14:17
    +1

    Денис, спасибо за отличную статью!

    В методе получения временного буфера написано, что если размер запрашиваемого буфера превышает некий порог, то кеш не используется. Вы не пробовали запретить кеширование буферов, установив "jdk.nio.maxCachedBufferSize" в ноль, интересно, насколько это было бы применимо для средних нагрузок?

    public static ByteBuffer getTemporaryDirectBuffer(int size) {
        // If a buffer of this size is too large for the cache, there
        // should not be a buffer in the cache that is at least as
        // large. So we'll just create a new one. Also, we don't have
        // to remove the buffer from the cache (as this method does
        // below) given that we won't put the new buffer in the cache.
        if (isBufferTooLarge(size)) {
            return ByteBuffer.allocateDirect(size);
        }
      ....
    
       /**
       * Returns the max size allowed for a cached temp buffers, in
       * bytes. It defaults to Long.MAX_VALUE. It can be set with the
       * jdk.nio.maxCachedBufferSize property. Even though
       * ByteBuffer.capacity() returns an int, we're using a long here
       * for potential future-proofing.
       */
      private static long getMaxCachedBufferSize() {
          String s = java.security.AccessController.doPrivileged(
              new PrivilegedAction<String>() {
                  @Override
                  public String run() {
                      return System.getProperty("jdk.nio.maxCachedBufferSize");
                  }
              });


    1. DenAgapitov Автор
      21.09.2023 14:17
      +2

      Решение с установкой "jdk.nio.maxCachedBufferSize", конечно, рабочее. Причём рабочее без изменения кода вообще, благодаря free при возвращении буфера:

          /**
           * Releases a temporary buffer by returning to the cache or freeing it. If
           * returning to the cache then insert it at the start so that it is
           * likely to be returned by a subsequent call to getTemporaryDirectBuffer.
           */
          static void offerFirstTemporaryDirectBuffer(ByteBuffer buf) {
              // If the buffer is too large for the cache we don't have to
              // check the cache. We'll just free it.
              if (isBufferTooLarge(buf)) {
                  free(buf);
                  return;
              }
      
              assert buf != null;
              BufferCache cache = bufferCache.get();
              if (!cache.offerFirst(buf)) {
                  // cache is full
                  free(buf);
              }
          }

      Однако, такое решение прилично снизит пропускную способность, так как в этом случае мы имеем и копирование памяти из HeapByteBuffer в DirectByteBuffer и аллокацию direct-памяти при создании DirectByteBuffer.

      На мой взгляд, для сервисов с небольшими нагрузками проще сразу работать с DirectByteBuffer и самому его аллоцировать.


      1. Krusho
        21.09.2023 14:17

        У меня немного другое видение ситуации, проекты с небольшими нагрузками не заметят проблемы с кешем, потому что буферы будут успевать освобождаться, и память новая аллоцироваться не будет. А если создают какой-нибудь концепт, то он нужен здесь и сейчас, и большинство предпочтут ByteBuffer.wrap(byte[] data), потому что это проще.

        Со средними проектами немного иначе, из них до высоких нагрузок доходят гораздо меньше, чем завершают свой жизненный цикл или остаются в средняках. И в какой-то момент, "jdk.nio.maxCachedBufferSize" может стать настоящим спасением, чтоб проект мог дожить своё время, или дать отсрочку на чтение Вашей статьи.


        1. DenAgapitov Автор
          21.09.2023 14:17

          Скажу даже больше: статья будет в помощь для довольно узкого круга проектов и большинству не понадобится даже "jdk.nio.maxCachedBufferSize", так как сейчас довольно небольшое число проектов имеют собственный сетевой стэк.

          Не думаю, что для тех, кто пишет микро-сервисы при помощи аннотаций RestController, PostMapping и тому подобных, вообще интересно знание о том, как Spring работает с сетью.

          Но, если кто-то написал свой сетевой клиент/сервер при помощи NIO, то вопрос производительности в этом случае, как правило, стоит остро. Искренне надеюсь, что для таких проектов статья будет полезна.


  1. kmatveev
    21.09.2023 14:17

    Если я вас правильно понял, то у вас поток бизнес-логики пытается писать в неблокирующий сокет, а если не смог, то bytebuffer отдаётся другому потоку, который это сделает. Можно было бы сразу отдавать bytebuffer в поток-писатель, это очень слабо сказалось бы на latency. Я бы ещё понял, если бы вам очень хотелось иметь синхронные вызовы записи, чтобы обрабатывать сетевые ошибки в том же потоке, который приготовил байты для отправки, но раз уж у вас есть и асинхронная запись, то ваше приложение должно уже быть готово к асинхронности.


    1. DenAgapitov Автор
      21.09.2023 14:17

      Да, вы правильно поняли. Относительно асинхронности - все наши приложения полностью асинхронны с самого начала, как и шина. Есть, конечно, и синхронный API, но внутри он всё равно работает через асинхронный.

      Запись с потока бизнес-логики уменьшает время отклика до определённого уровня нагрузки на данном железе. На принимающей стороне ответ также пытается уйти с потока бизнес-логики. У нас есть реальные сервисы, которые отвечают со временем отклика менее 1 мс. Многократное перекладывание с потока на поток ощутимо увеличивает время отклика при цепочке из нескольких вызовов.


  1. Antharas
    21.09.2023 14:17

    Правильно ли понимаю, что используете нативный сокет из коробки, если да, то почему не netty?