Очереди часто являются фундаментальными компонентами в паттернах проектирования программного обеспечения.

Но что, если каждую секунду поступают миллионы сообщений, а многопроцессорные потребители должны иметь возможность читать полный журнал всех сообщений?

Java может хранить ограниченное количество информации, пока куча не станет ограничивающим фактором, в результате чего сборка мусора будет иметь большие последствия, что потенциально может помешать нам выполнить целевые SLA (соглашения об уровне обслуживания) или даже остановить JVM на секунды или даже минуты.

В этой статье рассказывается о том, как создавать огромные персистентные очереди, сохраняя при этом предсказуемую и стабильную низкую задержку с помощью Chronicle Queue с открытым исходным кодом.

Приложение

В данной статье задача состоит в том, чтобы поддерживать очередь объектов из потоков рыночных данных (например, последняя цена на ценные бумаги, торгуемые на бирже). Можно было бы выбрать и другие области бизнеса, такие как сенсорный ввод с устройств IoT или считывание информации о дорожно-транспортных происшествиях в автомобильной промышленности, также могли быть выбраны. Принцип тот же.

Для начала определяется класс, содержащий рыночные данные:

public class MarketData extends SelfDescribingMarshallable {
    int securityId;
    long time;
    float last;
    float high;
    float low;

    // Геттеры и сеттеры не показаны для краткости.
}

Примечание. В реальных сценариях необходимо быть очень осторожным при использовании float и double для хранения денежных значений, так как это может привести к проблемам округления [Bloch18, Item 60]. Однако в этой вводной статье я хочу сохранить простоту.

Существует также небольшая служебная функция MarketDataUtil::create, которая при вызове будет создавать и возвращать новый случайный объект MarketData:

static MarketData create() {
    MarketData marketData = new MarketData();
    int id = ThreadLocalRandom.current().nextInt(1000);
    marketData.setSecurityId(id);
    float nextFloat = ThreadLocalRandom.current().nextFloat();
    float last = 20 + 100 * nextFloat;

    marketData.setLast(last);
    marketData.setHigh(last * 1.1f);
    marketData.setLow(last * 0.9f);
    marketData.setTime(System.currentTimeMillis());

    return marketData;
}

Теперь задача состоит в том, чтобы создать очередь, которая будет долговечной, параллельной, с низкой задержкой, доступной из нескольких процессов и способной хранить миллиарды объектов.

Наивный подход

Вооружившись этими классами, можно изучить наивный подход к использованию ConcurrentLinkedQueue:

public static void main(String[] args) {
    final Queue queue = new ConcurrentLinkedQueue();
    for (long i = 0; i < 1e9; i++) {
        queue.add(MarketDataUtil.create());
    }
}

Это не удастся по нескольким причинам:

  1. ConcurrentLinkedQueue будет создавать обертывающий узел для каждого элемента, добавленного в очередь. Это фактически удвоит количество создаваемых объектов.

  2. Объекты размещаются на Java куче, что приводит к нехватке памяти в куче и проблемам со сборкой мусора. На моей машине это привело к тому, что вся моя JVM перестала реагировать на запросы, и единственным выходом было принудительное завершение ее работы с помощью "kill -9".

  3. Очередь не может быть прочитана из других процессов (т.е. других JVM).

  4. Как только JVM завершает работу, содержимое очереди теряется. Следовательно, очередь не является долговечной.

Если посмотреть на различные другие стандартные классы Java, то можно сделать вывод, что поддержка больших персистентных очередей отсутствует.

Использование Chronicle Queue

Chronicle Queue — это библиотека с открытым исходным кодом, разработанная с учетом требований, изложенных выше. Вот один из способов его настройки и использования:

public static void main(String[] args) {
    final MarketData marketData = new MarketData();
    final ChronicleQueue q = ChronicleQueue
            .single("market-data");
    final ExcerptAppender appender = q.acquireAppender();

    for (long i = 0; i < 1e9; i++) {
        try (final DocumentContext document =
                     appender.acquireWritingDocument(false)) {
             document
                    .wire()
                    .bytes()
                    .writeObject(MarketData.class, 
                            MarketDataUtil.recycle(marketData));
        }
    }
}

Используя MacBook Pro 2019 года с 8-ядерным процессором Intel Core i9 с тактовой частотой 2,3 ГГц, можно было вставить более 3 000 000 сообщений в секунду, используя только один поток.

Очередь сохраняется через файл, отображенный в памяти, в заданной директории "market-data". Можно было бы ожидать, что объект MarketData будет занимать 4 (int securityId) + 8 (long time) + 4*3 (float last, high и low) = как минимум 24 байта.

В приведенном выше примере был добавлен 1 миллиард объектов, в результате чего сопоставленный файл занял 30 148 657 152 байта, что соответствует примерно 30 байтам на сообщение. На мой взгляд, это действительно очень эффективно.

Как видно, один и тот же экземпляр MarketData можно повторно использовать снова и снова, поскольку Chronicle Queue выравнивает содержимое текущего объекта в файле, отображаемом в памяти, что позволяет использовать объект повторно. Это еще больше снижает нагрузку на память. Вот как работает метод повторного использования:

static MarketData recycle(MarketData marketData) {
    final int id = ThreadLocalRandom.current().nextInt(1000);
    marketData.setSecurityId(id);
    final float nextFloat = ThreadLocalRandom.current().nextFloat();
    final float last = 20 + 100 * nextFloat;

    marketData.setLast(last);
    marketData.setHigh(last * 1.1f);
    marketData.setLow(last * 0.9f);
    marketData.setTime(System.currentTimeMillis());

    return marketData;
}

Чтение из Chronicle Queue

Чтение из Chronicle Queue несложно. Продолжая пример приведенный выше, ниже показано, как первые два объекта MarketData могут быть прочитаны из очереди:

public static void main(String[] args) {
    final ChronicleQueue q = ChronicleQueue
            .single("market-data");
    final ExcerptTailer tailer = q.createTailer();

    for (long i = 0; i < 2; i++) {
        try (final DocumentContext document =
                     tailer.readingDocument()) {
            MarketData marketData = document
                    .wire()
                    .bytes()
                    .readObject(MarketData.class);
            System.out.println(marketData);
        }
    }
}

Это может привести к следующему результату:

!software.chronicle.sandbox.queuedemo.MarketData {
  securityId: 202,
  time: 1634646488837,
  last: 45.8673,
  high: 50.454,
  low: 41.2806
}

!software.chronicle.sandbox.queuedemo.MarketData {
  securityId: 117,
  time: 1634646488842,
  last: 34.7567,
  high: 38.2323,
  low: 31.281
}

Предусмотрены возможности эффективного поиска позиции хвостовика, например, до конца очереди или до определенного индекса.

Что дальше?

Есть много других функций, которые выходят за рамки этой статьи.

Например, файлы очередей можно настроить на прокрутку через определенные промежутки времени (например, каждый день, час или минуту), эффективно создавая декомпозицию информации, так что старые данные могут быть очищены с течением времени.

Также предусмотрена возможность изолировать ЦП и привязывать потоки Java к этим изолированным ЦП, существенно уменьшая флуктуации приложений.

Наконец, существует корпоративная версия с репликацией очередей между кластерами серверов, что открывает путь к высокой доступности и улучшенной производительности в распределенных архитектурах.

Корпоративная версия также включает в себя множество других функций, таких как шифрование, переключение часовых поясов и асинхронные добавления.

Ресурсы

Open-source Chronicle Queue

Chronicle Queue Enterprise

Комментарии (2)


  1. Pyhesty
    14.11.2022 15:57
    +1

    я извиняюсь за занудство, а как описанное решение соотносится с хрестоматийной теорией массового обслуживания?

    Насколько предложенное решение приближается к теоретическому?

    букварь


    1. murkin-kot
      14.11.2022 18:23
      +2

      Теория массового обслуживания занимается выявлением статистически описываемых закономерностей. Информационные системы занимаются обработкой информации. Статья - про последнее. Первое может помочь оптимизировать второе, но в целом имеет очень мало общего с предметом обработки информации в ИС.

      Отличие от статистики в ИС простое - там детерминированные данные. Пропускная способность должна быть ХХХ на вход и УУУ на выход. И никаких вероятностей. Ну кроме флуктуаций в рамках максимально доступной пропускной способности. Эти флуктуации можно статистически оценивать и тем обосновывать выбор количественных показателей распределённых систем, но на практике почти никогда такие сложные вещи не приходят в голову большинству архитекторов ИС. Причина простая - оплачивается лишь возможность обработать некий поток, а минимизация стоимости обработки в список задач не входит из-за существенно большей стоимости разработки решения.