Некоторое время назад на горизонте возникла и начала набирать популярность платформа Apache Ignite. Вычисления in-memory — это скорость, а значит, скорость должна быть обеспечена на всех этапах работы, особенно при загрузке данных.


Под катом находится описание способа быстрой загрузки данных из реляционной таблицы в распределенный кластер Apache Ignite. Описана предобработка SQL query result set на клиентском узле кластера и распределение данных по кластеру с помощью задания map-reduce. Описаны кеши и соответствующие реляционные таблицы, показано, как создать пользовательский объект из строки таблицы и как применить ComputeTaskAdapter для быстрого размещения созданных объектов. Весь код полностью можно увидеть в репозитории FastDataLoad.


История вопроса


Этот текст — перевод на русский моего поста в In-Memory Computing Blog на сайте GridGain.


Итак, некая компания решает ускорить медленное приложение путем переноса вычислений в in-memory кластер. Исходные данные для вычислений находятся в MS SQL; результат вычислений нужно положить туда же. Кластер распределенный, поскольку данных много уже сейчас, производительность приложения на пределе и объем данных растет. Заданы жесткие ограничения по времени работы.


Прежде чем писать быстрый код для обработки данных, эти данные нужно быстро загрузить. Неистовый поиск в сети показывает явную нехватку примеров кода, которые можно масштабировать до таблиц размером десятки или сотни миллионов строк. Примеров, которые можно загрузить, скомпилировать и пройти по шагам в отладке. Это с одной стороны.


С другой стороны, документация Apache Ignite / GridGain, вебинары и митапы дают представление о внутреннем устройстве кластера. Методом проб и ошибок удается сделать загрузчик, учитывающий распределение данных по партициям. И когда в один прекрасный день начальство спрашивает "а сыграл ли твой козырный туз?", ответ — да, все получилось.
Полученный код кажется некой самоделкой с привкусом внутренней архитектуры, но работает с достаточной скоростью.


Данные для загрузки (World Database)


Поскольку данных много, мы будем хранить записи в партиционированном виде и использовать data collocation, чтобы логически связанные значения хранились на одном и том же узле кластера. В качестве источника данных мы будем использовать файл world.sql из дистрибутива Apache Ignite.


Разделим его на три CSV файла в предположении, что каждый из них — это результат SQL запроса:



Рассмотрим загрузку записей countryCache из файла country.csv. Ключ countryCache — трехсимвольное поле code, тип ключа — String, значение — объект Country, созданный из остальных полей (name, continent, region).



Наивная загрузка


Поскольку опыта нет, то пляшем от печки — будем загружать так же, как в монолитное нераспределенное приложение. Будем создавать пользовательский объект Country для каждой строки таблицы и класть в кеш перед тем, как перейти к следующей строке. Для этого используем библиотеку org.h2.tools.Csv, которая умеет конвертировать файл CSV в java.sql.ResultSet. Эта библиотека уже присутствует на каждом узле Apache Ignite и загружать ее не надо, поскольку подсистема SQL построена на субд H2.


    // define countryCache
    IgniteCache<String,Country> cache = ignite.cache("countryCache");

    try (ResultSet rs = new Csv().read(csvFileName, null, null)) {
     while (rs.next()) {
      String code = rs.getString("Code");
      String name = rs.getString("Name");
      String continent = rs.getString("Continent");
      Country country = new Country(code,name,continent);
      cache.put(code,country);
     }
    }

Код замечательно работает для небольших таблиц на кластере из одного узла. Но, если подать таблицу в десять миллионов строк и добавить узлов в кластер, приложение начинает засыпать. Причем с ростом объема данных засыпает оно как-то по экспоненте.


Это все потому, что кеш распределен по узлам кластера и каждое изменение кеша должно быть синхронизировано между всеми узлами. Чтобы загружать быстро, надо учитывать внутреннее устройство кластера.


Попартиционная загрузка


Основа кластера Apache Ignite — распределенный кеш ключ-значение. Если объем хранимых данных велик, кеш создается в режиме PARTITIONED и каждая пара ключ-значение хранится в некоторой партиции (partition) на некотором узле кластера. Из соображений отказоустойчивости копия этой партиции может храниться еще и на другом узле; мы здесь для простоты будем считать, что копий нет. Чтобы определить расположение пары ключ-значение, кластер использует affinity function, которая определяет, в какой партиции будет находиться данная пара и на каком физическом узле кластера будет находиться эта партиция.


В нашем примере требуется обработать ResultSet на клиентском узле кластера и распределить данные по серверным узлам. Клиентский узел не хранит данные, поэтому распределение данных гарантированно будет происходить по сети. На рисунке показано взаимодействие клиентского узла с тремя серверными узлами.



Чтобы минимизировать сетевой трафик, мы предварительно сгруппируем загружаемые данные по партициям:


  • для хранения предзагруженных данных создадим HashMap вида partition_number -> key -> Value
    Map<Integer, Map<String, Country>> result = new HashMap<>();
  • для каждой строки данных создадим ключ и с помощью affinity function определим его partition_number. Вместо cache.put() для каждой строки положим пару ключ-значение в раздел HashMap с номером partition_number
    try (ResultSet rs = new Csv().read(csvFileName, null, null)) {
     while (rs.next()) {
      String code = rs.getString("Code");
      String name = rs.getString("Name");
      String continent = rs.getString("Continent");
      Country country = new Country(code,name,continent);
      result.computeIfAbsent(affinity.partition(key), k -> new HashMap<>()).put(code,country);
     }
    }

После группировки используем ComputeTaskAdapter в паре с ComputeJobAdapter. В нашем примере число заданий ComputeJobAdapter совпадает с числом партиций и равно 1024. Каждое задание содержит данные своей партиции и отправляется на тот узел кластера, где хранится партиция.


Результатом выполнения задания ComputeJobAdapter служит число добавленных в кеш записей. По завершении каждое задание возвращает результат на клиентский узел, который вычисляет общий итог.


Compute Task, вид изнутри


Согласно документации, "ComputeTaskAdapter initiates the simplified, in-memory, map-reduce process". Клиентский узел кластера сначала создает задания ComputeJobAdapter и выполняет map — определяет, на какой физический узел кластера отправится каждое задание. Затем результаты выполнения заданий возвращаются на клиентский узел и там выполняется reduce — вычисление общего числа добавленных записей.


Задание для узла данных (RenewLocalCacheJob)


В нашем примере партиция любого кеша наполняется данными на клиентском узле, заворачивается в объект RenewLocalCacheJob и отправляется по сети на нужный узел кластера. Там созданная партиция размещается в кеше целиком


targetCache.putAll(addend);

После размещения RenewLocalCacheJob печатает partition_number и число добавленных записей.


Задание для клиентского узла (AbstractLoadTask)


Каждое задание загрузки (пакет loader) — наследник AbstractLoadTask. Задания отличаются именами извлекаемых полей и типами создаваемых пользовательских объектов. Загруженные данные могут предназначаться для кешей с ключами различных типов (примитивных либо пользовательских), поэтому AbstractLoadTask определен с параметром TargetCacheKeyType. Соответственно и предзагружаемый HashMap определен как


    Map<Integer, Map<TargetCacheKeyType, BinaryObject>> result;

В нашем примере только у countryCache ключ имеет примитивный тип String. Остальные кеши в качестве ключа используют пользовательские объекты. AbstractLoadTask определяет тип ключа параметром TargetCacheKeyType, а значение кеша и вовсе BinaryObject. Это все потому, что составной ключ — это пользовательский объект и работать с ним просто так на узлах данных не получается.


Почему BinaryObject вместо пользовательского объекта


Наша цель — положить в память узла кластера некоторое количество пользовательских объектов. Мы помним, что узел этот работает не только в другой JVM, но и на другом хосте где-то в сети. На этом узле class definition пользовательских объектов недоступен, он находится в JAR-файле на клиентском узле. Если мы определим кеш с типом Country


    IgniteCache<String, Country> countryCache;

и будем извлекать из него значение по ключу, то узел кластера попытается десериализовать объект, не найдет ничего на classpath и выдаст исключение ClassNotFound.


Есть два способа преодолеть эту трудность. Первый — обеспечить наличие классов на classpath, сам по себе достаточной элегантный:


  • сделать JAR-файл с пользовательскими классами внутри;
  • положить этот файл на classpath на каждом узле кластера;
  • не забывать обновлять этот файл при каждом изменении любого из пользовательских классов;
  • после обновления файла не забывать перезагружать узел.

Второй способ — использовать интерфейс BinaryObject для доступа к данным в их исходном (сериализованном) виде. В нашем примере это выглядит так:


  • Кеш определяется в бинарном виде
    IgniteCache<String, BinaryObject> countryCache;    
  • Сразу после создания пользовательский объект Country преобразуется в BinaryObject (см. код в LoadCountries.java)
    Country country = new Country(code, name, .. );    
    BinaryObject binCountry = node.binary().toBinary(country);    
  • Созданный бинарный объект помещается в предзагружаемый HashMap, который определяется с типом BinaryObject
    Map<Integer, Map<String, BinaryObject>> result

Предзагрузка целиком выполняется на клиентском узле, где есть определения всех пользовательских классов. На удаленные узлы данные отправляются в бинарном виде, определения классов не нужны, ClassNotFoundException не возникает.


Практическая часть. Запуск узлов данных.


Мы будем запускать кластер Apache Ignite в минимально достаточной конфигурации: два узла данных и один клиентский узел.


Узлы данных


Запускаются почти что из коробки с единственным изменением в файле default-config.xml — мы разрешаем передавать классы заданий по сети между узлами. Шаги для запуска узла данных:


  • Установить GridGain CE по инструкции Installing Using ZIP Archive. На странице загрузки важно выбрать версию 8.7.10, поскольку код в репозитории FastDataLoad сделан имеенно для нее, а кластер из узлов разных версий собрать не получится;
  • В папке {gridgain}\config открыть файл default-config.xml и добавить в него
    строки
    <bean id="grid.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
        <property name="peerClassLoadingEnabled" value="true"/>
    </bean>
  • Открыть окно командной строки, перейти в папку {gridgain}\bin и запустить узел командой ignite.bat. В процессе тестирования оба узла кластера могут размещаться на одном хосте; для разработки и запуска на бою надо использовать разные машины;
  • Открыть еще одно окно командной строки, повторить предыдущий шаг. Если в обоих окнах появилась строка наподобие вот этой, то все получилось
    [08:40:04] Topology snapshot [ver=2, locNode=d52b1db3, servers=2, clients=0, state=ACTIVE, CPUs=8, offheap=3.2GB, heap=2.0GB]

Важно. Если все же нужно загрузить последнюю версию, например 8.7.25, придется указать номер версии в файле pom.xml


    <gridgain.version>8.7.25</gridgain.version>

Иначе версии узлов данных и клиентского узла не совпадут и клиентский узел не сможет войти в кластер


class org.apache.ignite.spi.IgniteSpiException: Local node and remote node have different version numbers (node will not join, Ignite does not support rolling updates, so versions must be exactly the same) [locBuildVer=8.7.25, rmtBuildVer=8.7.10]

Клиентский узел


Вся работа выполняется клиентским приложением, которое содержит определение кешей, пользовательские объекты и логику map-reduce. Приложение — это JAR-файл, который стартует клиентский узел кластера и запускает compute task для загрузки данных. Для демонстрации мы используем один хост Windows, для боевого запуска лучше использовать разные хосты Linux.
Шаги для запуска клиентского узла:


  • Клонировать репозиторий FastDataLoad;
  • Перейти в корневой каталог проекта и собрать проект;
    mvn clean package
  • Находясь в корневом каталоге, запустить приложение.
    java -jar .\target\fastDataLoad.jar

Метод main() класса LoadApp создает пользовательский объект LoaderAgrument с названиями кеша и файла данных и преобразует его в бинарный формат. Далее бинарный объект используется как аргумент map-reduce задания LoadCountries.
LoadCountries создает для каждой партиции задание RenewLocalCacheJob и отправляет его по сети на соответствующий узел данных, где задание выполняется и выводит сообщение с номером обновленной партиции (номера партиций между узлами не пересекаются).


Узел данных #1



Узел данных #2



После этого клиентский узел суммирует возвращенные заданиями результаты и выводит общее число загруженных объектов



Файл country.csv загружен, ключи CountryCode и соответствующие значения собраны в партициях и каждая партиция размещена в памяти своего узла данных. Процесс повторяется для cityCache и countryLanguageCache; клиентское приложение выводит число объектов, время работы и завершается.


Заключение


Пару слов о скорости работы наивной и попартиционной загрузки.


Для больших таблиц наивный способ загрузки за разумное время не заканчивается вовсе.


Попартиционный способ используется автором в реальной боевой эксплуатации и демонстрирует приемлемую скорость работы:


  • свойства загружаемой таблицы (SQL Server Management Studio):
    • число строк — 44 686 837;
    • объем — 1.071 GB;
  • время загрузки и предобработки данных на клиентском узле — 0H:1M:35S;
  • время на создание заданий RenewLocalCacheJob и reduce результатов — 0H:0M:9S.

На распределение данных по кластеру времени тратится меньше, чем на выполнение SQL-запроса.