Вступление
Привет, меня зовут Денис Агапитов, я руководитель группы Platform Core компании Bercut. Работаю в компании без малого 20 лет, из них 18 пишу на Java.
Сегодня я расскажу об опыте увеличения производительности сетевого стэка и проблемах, с которыми можно столкнуться при использовании NIO в Java.
Эта статья основана на реальной практике борьбы с "OutOfMemory: direct memory" в шине данных гибридной интеграционной платформы.
Группа Platform Core, которой я руковожу, занимается разработкой и развитием гибридной интеграционной платформы, поддержкой систем и сервисов, написанных на платформе.
Платформа включает в себя:
Шину данных ESB.
Приложения API Gateway, SLES (сервер исполнения бизнес-процессов), SA Container (сервер с сервисами на Java), Notification Broker.
Платформенные сервисы: Scheduler, Service Profile Management и прочие.
Поддержку интеграции со Spring.
Итак, начнём с предпосылок, которые подвигли заняться анализом данной проблемы.
Особенности работы шины данных в Bercut
Наша гибридная интеграционная платформа имеет свою транспортную шину (RTSIB). Это ESB (enterprise service bus) в рамках архитектуры SOA (service-oriented architecture) со своими стеками HTTP и проприетарного асинхронного протокола RTSIB.
По своей сути это mesh-сеть между разными узлами и приложениями платформы.
Каждое RTSIB соединение обслуживается двумя потоками - читающим и пишущим, при этом пишущий поток вступает в игру только в том случае, если появились данные для отправки, а сокет занят отправкой другого пакета. В этом случае текущий добавляется в очередь отправки. Если же сокет свободен и доступен для записи, то запись в сокет происходит прямо с потока бизнес-логики.
HTTP соединения (на текущий момент мы поддерживаем версию 1.x) в виду синхронной архитектуры не требуют большого количества потоков обслуживания, потоки для них достаются из пула - примерно один поток на 25 соединений.
Такой подход имеет как плюсы и минусы, сегодня обсуждать мы их не будем, а просто возьмём за исходные данные, что на каждом приложении нашей платформы существует довольно большое количество потоков, которые работают с сокетами.
Особенности работы с сетью TCP/IP в неблокирующем режиме в Java
Кто уже интересовался тем, как работает запись в сокет на Java или просто любит смотреть исходные коды JDK, вероятно знает основные особенности работы с сокетом, но для понимания проблемы предлагаю ещё раз их проговорить.
Из исходных кодов Open JDK 13 (основная версия, используемой у нас Java) видно, что если записываемый ByteBuffer является DirectByteBuffer, то запись происходит сразу (writeFromNativeBuffer), а если он расположен в Heap, то сначала достаётся временный DirectByteBuffer, производится копирование и запись из временного DirectByteBuffer.
Код записи в сокет из Open JDK 13 ( IOUtil.java ):
static int write(FileDescriptor fd, ByteBuffer src, long position,
boolean directIO, int alignment, NativeDispatcher nd)
throws IOException
{
if (src instanceof DirectBuffer) {
return writeFromNativeBuffer(fd, src, position, directIO, alignment, nd);
}
// Substitute a native buffer
int pos = src.position();
int lim = src.limit();
assert (pos <= lim);
int rem = (pos <= lim ? lim - pos : 0);
ByteBuffer bb;
if (directIO) {
Util.checkRemainingBufferSizeAligned(rem, alignment);
bb = Util.getTemporaryAlignedDirectBuffer(rem, alignment);
} else {
bb = Util.getTemporaryDirectBuffer(rem);
}
try {
bb.put(src);
bb.flip();
// Do not update src until we see how many bytes were written
src.position(pos);
int n = writeFromNativeBuffer(fd, bb, position, directIO, alignment, nd);
if (n > 0) {
// now update src
src.position(pos + n);
}
return n;
} finally {
Util.offerFirstTemporaryDirectBuffer(bb);
}
}
Дополнительно осложняет ситуацию то, что внутри реализации JDK имеется КЭШ DirectByteBuffer с привязкой к потоку (ThreadLocal):
private static ThreadLocal<BufferCache> bufferCache = new TerminatingThreadLocal<>() {
@Override
protected BufferCache initialValue() {
return new BufferCache();
}
@Override
protected void threadTerminated(BufferCache cache) { // will never be null
while (!cache.isEmpty()) {
ByteBuffer bb = cache.removeFirst();
free(bb);
}
}
};
public static ByteBuffer getTemporaryDirectBuffer(int size) {
// If a buffer of this size is too large for the cache, there
// should not be a buffer in the cache that is at least as
// large. So we'll just create a new one. Also, we don't have
// to remove the buffer from the cache (as this method does
// below) given that we won't put the new buffer in the cache.
if (isBufferTooLarge(size)) {
return ByteBuffer.allocateDirect(size);
}
BufferCache cache = bufferCache.get();
ByteBuffer buf = cache.get(size);
if (buf != null) {
return buf;
} else {
// No suitable buffer in the cache so we need to allocate a new
// one. To avoid the cache growing then we remove the first
// buffer from the cache and free it.
if (!cache.isEmpty()) {
buf = cache.removeFirst();
free(buf);
}
return ByteBuffer.allocateDirect(size);
}
}
И после каждого использования временного DirectByteBuffer, он помещается в КЭШ. При этом, если в КЭШе нет DirectByteBuffer необходимого размера, он аллоцируется и после использования также помещается в КЭШ ( Util.offerFirstTemporaryDirectBuffer(bb) ).
Суть проблемы
В первых версиях платформы мы просто использовали HeapByteBuffer через простую и понятную static-функцию ByteBuffer.wrap(byte[] data) и бед вроде как не знали.
Всё работало, скорость была достаточная для текущих telecom-сервисов, работающих на платформе, но в один прекрасный день размер данных DWH (Data Warehouse), проходящих через нашу шину достиг критического объёма в мегабайтах и мы получили OOM Direct Memory.
Почему же так произошло? А вот почему: как обозначил выше, мы имеем mesh-сеть с множеством обслуживающих потоков и имеем данные большого размера, проходящие через эти потоки, которые складывают off-heap память в ThreadLocal КЭШи этих потоков. Достигнув предела насыщения использования off-heap памяти мы получаем OOM. Конечно, первым действием было увеличение параметра запуска JVM "-XX:MaxDirectMemorySize". Размер используемой direct памяти пришлось увеличить, потом увеличить ещё. Это стало тем самым звоночком, что с проблемой надо разбираться и как можно скорее.
Анализ возможных путей решения
После осознания проблемы, мы провели анализ возможных путей её решения и нашли следующие варианты:
Писать в цикле блоками, сдвигая вручную position и limit в записываемом ByteBuffer. Это должно помочь, так как в IOUtils временный DirectByteBuffer выделяется размером size = limit - position.
Перейти на использование ByteBuffer.allocateDirect().
Написать промежуточную абстракцию, содержащую нарезку из ByteBuffer одного размера, где ByteBuffer одного размера берутся из общего пула и после использования возвращаются обратно.
Погружаясь всё глубже в исследование проблемы стало понятно, что необходимо провести сравнительное тестирование разных размеров ByteBuffer и разных вариантов их использования для чтения и записи из сокета.
За несколько часов я написал тестовое приложение, которое эмулирует 4 вида работы с сокетом:
Аллоцирование HeapByteBuffer при каждой записи/чтении.
Аллоцирование DirectByteBuffer при каждой записи/чтении.
Переиспользование HeapByteBuffer при каждой записи/чтении.
Переиспользование DirectByteBuffer при каждой записи/чтении.
В тестовом приложении отсутствует маршаллинг (заполнение реальными данными), а присутствует только работа по записи и чтению из сокета с разными вариантами использования ByteBuffer.
Здесь приводить исходные коды не буду, но кто желает может ознакомится с ними на Github.
На выходе мы получили такую картину:
Из результатов видно, что до 1Mb самым медленным вариантом является аллокация DirectByteBuffer. Аллокация HeapByteBuffer через wrap и кэшированный HeapByteBuffer примерно равны с небольшим лидерством кэшированного. Из общей картины выбивается кэшированный DirectByteBuffer, что логично, так как пишется он напрямую, а время на аллокацию отсутствует.
Выбор и реализация
Для реализации выбрали 3 вариант решения проблемы: написать промежуточную абстракцию, содержащую нарезку из DirectByteBuffer одного размера, которые берутся и возвращаются в общий пул. За основной размер части пакета (размер DirectByteBuffer) было выбрано значение в 32Kb как минимальный по размеру пик при тестировании пропускной способности. Безусловно, так как у нас реализован и стэк HTTPs, фабрика может отдавать и пул с отличными от 32Kb размерами DirectByteBuffer, опираясь на PacketBufferSize и ApplicationBufferSize из настроек текущей сессии SSLEngine.
При написании слоя абстракции, названной CompositeBuffer, конечно же я реализовал и Input/Output streams, работающие напрямую с CompositeBuffer. Это было необходимо для нормальной работы слоя marshalling/unmarshalling.
В качестве хранилища уже аллоцированных DirectByteBuffer сделал простой стэк на CAS механизме:
public class CasStack<L extends LinkedObject<L>> {
public interface LinkedObject<L extends LinkedObject> {
public L getNext();
public void setNext(L next);
}
private final AtomicReference<L> head = new AtomicReference<>();
public void add(L lo) {
for (;;) {
lo.setNext(head.get());
if (head.compareAndSet(lo.getNext(), lo)) {
return;
}
}
}
public L poll() {
L lo;
for (;;) {
lo = head.get();
if (lo == null) {
return null;
}
if (head.compareAndSet(lo, lo.getNext())) {
lo.setNext(null);
return lo;
}
}
}
}
А примерно вот так выглядит часть основного класса CompositeBuffer в разрезе работы с чтением из сокета и записью в сокет (код был адаптирован для статьи):
DirectByteBuffer[] buffers;
int pos;
@Override
public int getBufIndex(int position) {
return position / pool.getPartCapacity();
}
@Override
public int read(ReadableByteChannel channel) throws IOException {
ensureCapacity();
int cur = getBufIndex(pos), readed = 0, read;
for (;;) {
try {
if (hasRemaining()) {
if (buffers[cur].hasRemaining()) {
read = channel.read(containers[cur]);
if (read < 0) {
return readed > 0 ? readed : read;
}
pos += read;
readed += read;
if (containers[cur].hasRemaining()) {
return readed;
}
}
cur++;
if (cur == containers.length) {
expandCapacity();
}
} else {
return readed;
}
} catch (IOException) {
if (readed > 0) {
return readed;
}
throw ex;
}
}
}
@Override
public int write(WritableByteChannel channel) throws IOException {
int cur = getBufIndex(pos), writed = 0, write;
for (;;) {
try {
if (hasRemaining()) {
if (buffers[cur].hasRemaining()) {
write = channel.write(buffers[cur]);
if (write < 0) {
return writed > 0 ? writed : write;
}
pos += write;
writed += write;
if (buffers[cur].hasRemaining()) {
return writed;
}
}
cur++;
if (cur == containers.length) {
if (writed == 0 && hasRemaining()) {
release();
throw new CompositeBufferLifecycleError();
}
return writed;
}
} else {
return writed;
}
} catch (IOException ex) {
if (writed > 0) {
return writed;
}
throw ex;
}
}
}
Конечно же пришлось написать далеко не один класс, а ещё несколько уровней абстракции, таких, как DirectContainer и механизмы addRef/releaseRef, проверку на ошибки жизненного цикла всей библиотеки и многое другое.
Заключение
По завершению оптимизации и переходу на переиспользуемый DirectByteBuffer пропускная способность шины увеличилась примерно вдвое.
До данной доработки размер off-heap памяти мог достигать 1-3Gb и складывался из максимальных размеров сообщений, прошедших через каждое соединение.
Сейчас же потребление off-heap памяти пулами довольно скромное - на среднем сервисе оно составляет всего 10-20 Mb.
На более сложном компоненте с парой сотен входящих вызовов в секунду, которые порождают до 15 тысяч внутренних вызовов на каждый входящий вызов - размер off-heap пула занимает менее 100Mb.
Важно ещё и то, что теперь off-heap память, используемая приложениями для работы с сетью более контролируема и растёт не от корреляции с количеством соединений (потоков), а только от корреляции с количеством сообщений проходящих через шину в единицу времени и средним размером сообщения.
При этом надо понимать, что размер off-heap памяти не может быть меньше максимального размера сообщения, когда-либо проходившего через узел.
Конечно, полностью исключить OOM direct memory таким решением всё равно не получится, но теперь off-heap память можно прогнозировать.
Комментарии (7)
kmatveev
21.09.2023 14:17Если я вас правильно понял, то у вас поток бизнес-логики пытается писать в неблокирующий сокет, а если не смог, то bytebuffer отдаётся другому потоку, который это сделает. Можно было бы сразу отдавать bytebuffer в поток-писатель, это очень слабо сказалось бы на latency. Я бы ещё понял, если бы вам очень хотелось иметь синхронные вызовы записи, чтобы обрабатывать сетевые ошибки в том же потоке, который приготовил байты для отправки, но раз уж у вас есть и асинхронная запись, то ваше приложение должно уже быть готово к асинхронности.
DenAgapitov Автор
21.09.2023 14:17Да, вы правильно поняли. Относительно асинхронности - все наши приложения полностью асинхронны с самого начала, как и шина. Есть, конечно, и синхронный API, но внутри он всё равно работает через асинхронный.
Запись с потока бизнес-логики уменьшает время отклика до определённого уровня нагрузки на данном железе. На принимающей стороне ответ также пытается уйти с потока бизнес-логики. У нас есть реальные сервисы, которые отвечают со временем отклика менее 1 мс. Многократное перекладывание с потока на поток ощутимо увеличивает время отклика при цепочке из нескольких вызовов.
Antharas
21.09.2023 14:17Правильно ли понимаю, что используете нативный сокет из коробки, если да, то почему не netty?
Krusho
Денис, спасибо за отличную статью!
В методе получения временного буфера написано, что если размер запрашиваемого буфера превышает некий порог, то кеш не используется. Вы не пробовали запретить кеширование буферов, установив "jdk.nio.maxCachedBufferSize" в ноль, интересно, насколько это было бы применимо для средних нагрузок?
DenAgapitov Автор
Решение с установкой "jdk.nio.maxCachedBufferSize", конечно, рабочее. Причём рабочее без изменения кода вообще, благодаря free при возвращении буфера:
Однако, такое решение прилично снизит пропускную способность, так как в этом случае мы имеем и копирование памяти из HeapByteBuffer в DirectByteBuffer и аллокацию direct-памяти при создании DirectByteBuffer.
На мой взгляд, для сервисов с небольшими нагрузками проще сразу работать с DirectByteBuffer и самому его аллоцировать.
Krusho
У меня немного другое видение ситуации, проекты с небольшими нагрузками не заметят проблемы с кешем, потому что буферы будут успевать освобождаться, и память новая аллоцироваться не будет. А если создают какой-нибудь концепт, то он нужен здесь и сейчас, и большинство предпочтут ByteBuffer.wrap(byte[] data), потому что это проще.
Со средними проектами немного иначе, из них до высоких нагрузок доходят гораздо меньше, чем завершают свой жизненный цикл или остаются в средняках. И в какой-то момент, "jdk.nio.maxCachedBufferSize" может стать настоящим спасением, чтоб проект мог дожить своё время, или дать отсрочку на чтение Вашей статьи.
DenAgapitov Автор
Скажу даже больше: статья будет в помощь для довольно узкого круга проектов и большинству не понадобится даже "jdk.nio.maxCachedBufferSize", так как сейчас довольно небольшое число проектов имеют собственный сетевой стэк.
Не думаю, что для тех, кто пишет микро-сервисы при помощи аннотаций RestController, PostMapping и тому подобных, вообще интересно знание о том, как Spring работает с сетью.
Но, если кто-то написал свой сетевой клиент/сервер при помощи NIO, то вопрос производительности в этом случае, как правило, стоит остро. Искренне надеюсь, что для таких проектов статья будет полезна.