public abstract class Operation implements Configurable<OperationMeta>


В данной серии статей я подробно рассказываю о том, как написать на Java собственный интерпретатор объектно-ориентированного диалекта SQL с использованием Spark RDD API, заточенный на задачи подготовки и трансформации наборов данных.

Краткое содержание предыдущей серии, посвящённой имплементации спеки языка в коде:
Заметка об использовании prior art
Наборы данных в контексте исполнения
Переменные, настройки контекста исполнения, и метаданные параметров подключаемых функций
Интерпретатор, контекст исполнения, операторы выражений


Разобравшись со всеми контекстами и устройством ядра интерпретатора, можно перейти к описанию API точек расширения, режимов запуска, и технической обвязки сборки исполняемых артефактов.


Предупреждение о рейтинге «M for Mature»

Уровень сложности данной серии статей — высокий. Базовые понятия по ходу текста вообще не объясняются, да и продвинутые далеко не все. Поэтому, если вы не разработчик, уже знакомый с терминологией из области бигдаты и жаргоном из дата инжиниринга, данные статьи будут сложно читаться, и ещё хуже пониматься. Я предупредил.


9. Расширяемость. API подключаемых функций


Подключаемых функций у нас три с половиной вида:


  • адаптеры хранилищ (считаются за полтора, потому что для CREATE и COPY функции хоть и отдельные, но похожи),
  • трансформации,
  • операции.

О метаданных параметров функций мы уже рассказывали. На самом деле, они являются общей частью более специфических метаданных, используемых движком для регистрации функций в контексте выполнения на старте, а также механизмом автоматической генерации документации для генерации этой самой автоматической документации ещё на этапе сборки проекта.


Не знаю как вас, а лично меня порядочно раздражает манера мейнтейнеров большинства опенсорсных проектов ограничиваться сборкой "документации" из Javadoc, в которой про некий property getter будет указано "Getter for <property>", и так 600 раз для 600 геттеров в 100 классах, и никакой тебе конкретики, что это за пропертя, нафига она такого типа, и что вообще туда можно установить. Не говоря уже о примерах использования, которых чуть менее, чем ни одного из реальной жизни. Мне вот почему-то нравится, когда документация пишется для людей, и выглядит как связный текст. Так что задуматься о таких вещах следует с самого начала, и делать так, чтобы метаданные были полезны не только движку, а конечному пользователю. Но об этом речь пойдёт потом.

Функции у нас могут быть где угодно в класпасе, так что для начала движок должен как-то узнать, где ему их искать. Для этого служит аннотация @RegisteredPackage:


package io.github.pastorgl.datacooker;

@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.PACKAGE})
public @interface RegisteredPackage {
    String value() default "";
}

Помимо установки метки на какой-то пакет, мы задаём ему человеко-понятное описание:


// package-info.java
@RegisteredPackage("Basic Transforms of DataStreams from one record type to another")
package io.github.pastorgl.datacooker.commons.transform;

import io.github.pastorgl.datacooker.RegisteredPackage;

При помощи замечательной библиотечки classgraph мы на старте пробежимся по всем пакетам класпаса, и сложим все аннотированные нашим маркером пакеты в кеш, тем самым зарегистрировав их в нашем контексте исполнения.


После того, как пакет зареган, можно искать в нём имплементации соответствующих классов подключаемых функций. Все они имплементируют класс Configurable:


public interface Configurable<M extends ConfigurableMeta> extends Serializable {
    M meta();
}

А метаданные, соответственно, расширяют ConfigurableMeta:


public abstract class ConfigurableMeta {
    public final String verb;
    public final String descr;

    public final Map<String, DefinitionMeta> definitions;

    protected ConfigurableMeta(String verb, String descr, Map<String, DefinitionMeta> definitions);
}

Тут есть глагол — имя функции, которое будет использоваться в операторах языка, её текстовое описание, и мапа параметров функции, уже рассмотренная нами ранее.


Теперь можно пройтись по подклассам функций. Для адаптеров мы имеем:


public abstract class AdapterMeta extends ConfigurableMeta {
    public final StreamType[] type;
    public final String[] paths;

    public AdapterMeta(String verb, String descr, String[] paths, StreamType[] type, Map<String, DefinitionMeta> meta);
}

Здесь в метаданные добавляются тип(ы) наборов данных, с которыми может работать адаптер, и примеры путей, с которыми он может работать. Сами адаптеры наследуются от StorageAdapter и своей специализированной половинки (жаль, что в жабе нет трейтов):


public abstract class StorageAdapter<C extends AdapterMeta> implements Configurable<C> {
    public final C meta;

    protected JavaSparkContext context;
    protected String path;

    protected Configuration resolver;

    public StorageAdapter() {
        this.meta = meta();
    }

    public void initialize(JavaSparkContext ctx, Configuration config, String path) throws InvalidConfigurationException {
        context = ctx;
        resolver = config;
        this.path = path;

        configure();
    }

    abstract protected void configure() throws InvalidConfigurationException;
}

public abstract class InputAdapter extends StorageAdapter<InputAdapterMeta> {
    public abstract Map<String, DataStream> load(int partCount, Partitioning partitioning);
}

public abstract class OutputAdapter extends StorageAdapter<OutputAdapterMeta> {
    public abstract void save(String sub, DataStream rdd);
}

Как это стоит прокомментировать. Ну, инициализируемся-то мы всегда с контекстом Spark, инстансом параметров, и путём. А далее, в зависимости от разновидности адаптера, вызываем либо метод load() с требуемым количеством партов и типом партицинирования (PARTITION <N> BY <Algorithm>), либо save() с подстрокой (для COPY with star syntax) и соответствующим набором данных.


Пример имплементации класса адаптера я приводить не стану, так как стандартные слишком сложно устроены для туториала, а минимальный будет нефункционален, и ничего толком не проиллюстрирует. Если всё-таки интересно, то советую посмотреть на классы адаптеров JDBC из состава Data Cooker Dist, — отдельной утилиты, сопутствующей Data Cooker ETL, — они относительно просты, особенно входной (из-за примитивности спарковской JdbcRDD).


Для трансформаций специфических метаданных требуется несколько больше:


public class TransformMeta extends ConfigurableMeta {
    public final StreamType from;
    public final StreamType to;

    public final TransformedStreamMeta transformed;
    private final Boolean keyAfter;

    public boolean keyAfter() {
        return (keyAfter != null) ? keyAfter : (from == StreamType.PlainText);
    }
}

Тут нам надо знать, из какого типа в какой мы производим преобразование записей набора данных, каким будет преобразованный набор, и когда нам надо пересчитать ключи для записей. А вот для операций (которые, как мы помним, расширенный аналог SELECT FROM … INTO …) знать нужно всего ничего:


public class OperationMeta extends ConfigurableMeta {
    public final DataStreamsMeta input;
    public final DataStreamsMeta output;
}

В одну группу метаданных этих видов функций объединяет наличие метаданных наборов данных — трансформированных, либо на входе / выходе. Они описываются следующим семейством классов:


public class DataStreamsMeta {
}

public class TransformedStreamMeta extends DataStreamsMeta {
    public final DataStreamMeta streams;
}

public class NamedStreamsMeta extends DataStreamsMeta {
    public final Map<String, DataStreamMeta> streams;
}

public class PositionalStreamsMeta extends DataStreamsMeta {
    public final int count;

    public final DataStreamMeta streams;
}

public class DataStreamMeta {
    public final String descr;

    public final StreamType[] type;

    public final Origin origin;

    public final List<String> ancestors;

    public final Map<String, String> generated;

    public final boolean optional;
}

То есть, про какой-то один набор данных, относящийся к функции, нам всегда известны его:


  • описание,
  • список допустимых типов,
  • происхождение (фильтрованный, сгенерированный, аугментированный),
  • список предков (в случае, если не входные все поучаствовали),
  • набор имён вновь появляющихся атрибутов для генерированных и аугментированных наборов,
  • признак опциональности.

Ну и в зависимости от того, что именно это за набор — именованный, позиционный, или просто трансформированный, — в классах-обёртках могут добавляться внутренние имена, ограничение на количество, или же ничего более.


С метаданными определились. Теперь можно и на сами базовые классы функций посмотреть:


public abstract class Transform implements Configurable<TransformMeta> {
    private final TransformMeta meta;

    public Transform() {
        meta = meta();
    }

    public abstract StreamConverter converter();
}

@FunctionalInterface
public interface StreamConverter {
    DataStream apply(DataStream ds, Map<String, List<String>> newColumns, Configuration params);
}

Ну, тут вообще ничего сложного. Одна лямбда, которой на вход подаётся набор данных, выжимка из SET COLUMNS, да параметры. Минимальная имплементация выглядит вот так:


public class PassthruTransform extends Transform {
    @Override
    public TransformMeta meta() {
        return new TransformMeta("passthru", StreamType.Passthru, StreamType.Passthru,
                "Doesn't change a DataStream in any way",

                null,
                null
        );
    }

    @Override
    public StreamConverter converter() {
        return (ds, newColumns, params) -> new DataStream(ds.streamType, ds.rdd, ds.accessor.attributes());
    }
}

Очень нужная и важная трансформация, которая ничего не делает с самим набором. А нужна она для переключевания и/или перепартиционирования набора данных в где-нибудь посередине процесса ETL:


TRANSFORM signals passthru() PARTITION $parts * 10;

Принимает она наборы типа Passthru, это такой особенный тип набора данных, который сообщает движку, что трансформация не трансформирующая, и ест что попало, в отличие от других трансформов, преобразующих наборы строго из одного настоящего типа в другой (разными способами, поэтому их общее количество даже в стандартном комплекте исчисляется десятками).


Теперь посмотрим на операции, наследуемые от


public abstract class Operation implements Configurable<OperationMeta> {
    public final OperationMeta meta;

    protected ListOrderedMap<String, DataStream> inputStreams;
    protected Configuration params;
    protected ListOrderedMap<String, String> outputStreams;

    public Operation() {
        this.meta = meta();
    }

    public void initialize(ListOrderedMap<String, DataStream> input, Configuration params, ListOrderedMap<String, String> output) throws InvalidConfigurationException {
        this.inputStreams = input;
        this.params = params;
        this.outputStreams = output;

        configure();
    }

    abstract protected void configure() throws InvalidConfigurationException;

    abstract public Map<String, DataStream> execute() throws InvalidConfigurationException;
}

На входе мапа входных наборов данных, инстанс с параметрами, и мапа с именами для выходных наборов. А на выходе имплементация выдаёт мапу с наборами, сопоставленными с именами. Минимальной является следующая операция:


public class CountByKeyOperation extends Operation {
    static final String GEN_COUNT = "_count";

    @Override
    public OperationMeta meta() {
        return new OperationMeta("countByKey", "Count values under the same key in all given DataStreams",

                new PositionalStreamsMetaBuilder()
                        .input("Source KeyValue DataStream",
                                StreamType.EVERY
                        )
                        .build(),

                null,

                new PositionalStreamsMetaBuilder()
                        .output("KeyValue DataStream with unique source keys",
                                new StreamType[]{StreamType.Columnar}, Origin.GENERATED, null
                        )
                        .generated(GEN_COUNT, "Count of values under each key in the source DataStream")
                        .build()
        );
    }

    @Override
    protected void configure() throws InvalidConfigurationException {
    }

    @Override
    @SuppressWarnings("unchecked")
    public Map<String, DataStream> execute() {
        if (inputStreams.size() != outputStreams.size()) {
            throw new InvalidConfigurationException("Operation '" + meta.verb + "' requires same amount of INPUT and OUTPUT streams");
        }

        final List<String> indices = Collections.singletonList(GEN_COUNT);

        Map<String, DataStream> output = new HashMap<>();
        for (int i = 0, len = inputStreams.size(); i < len; i++) {
            DataStream input = inputStreams.getValue(i);
            JavaPairRDD<Object, Record<?>> count = input.rdd
                    .mapToPair(t -> new Tuple2<>(t._1, 1L))
                    .reduceByKey(Long::sum)
                    .mapToPair(t -> new Tuple2<>(t._1, new Columnar(indices, new Object[]{t._2})));

            output.put(outputStreams.get(i), new DataStream(StreamType.Columnar, count, Collections.singletonMap(OBJLVL_VALUE, indices)));
        }

        return output;
    }
}

Тоже очень важная и нужная штука, ведь вызвав её как


CALL countByKey() INPUT data OUTPUT keys_counted;

мы получаем аналог SQL-ного SELECT _key, COUNT(*) GROUP BY _key где _key — ключ записи. (В самом-то языке у нас ведь ни GROUP BY никакого нет, ни COUNT(*), потому как обычно в процессах ETL делать такие вещи не особо-то и нужно. А тут как взяли, да и получили. Ещё и показали, как это на уровне Spark RDD API реализуется.)


Другие операции могут имплементировать всё, что только ни попросят аналитики. Включая глубоко аналитическую алгоритмику, выходящую далеко за рамки ETL…


Впрочем, сейчас нам важно было лишь найти всех наследников соответствующих базовых классов подключаемых функций, и сложить их метаданные в кеш. Уже из этого кеша интерпретатор и будет брать классы по имени их глагола, инстанциировать, инициализировать параметрами, и передавать управление.


Так что с точками расширяемости у нас получается всё в полном порядке.


Можно взлетать.


10. Режимы запуска. Пакетный режим, сборка для разных окружений, автотесты


Необходимость интеграции ETL инструмента в несколько различных окружений требует наличие у него различных режимов запуска, включая удалённые и отладочно-интерактивные. Например, у нас получилось вот такая табличка (с ключами командной строки для поддерживаемых режимов запуска — цитирую из документации):


Режим запуска Пакетный [сухой] Интерактивный... … со скриптом [сухой]
На кластере Spark -s [-d]
Локальный -l -s [-d] -R -R -s [-d]
REPL на кластере Spark -e -e -s [-d]
Локальный сервер REPL -l -e -l -e -s [-d]
Клиент REPL -r -r -s [-d]

Режимов много, но «пакетный на кластере Spark» в любом случае будет основным. В продакшене процессы обычно ведь работают из недр какого-нибудь CI/CD сервиса — неважно, запускается ли таска по расписанию, нажатию кнопки на дэшборде, или же через вызов какого-то удалённого API через REST, — но автоматизация в любом случае предполагает пакетность. Когда приходит задача, запустились — отработали, а потом ждём до следующей задачи, и так по кругу.


В нашем конкретном случае все дороги ведут в TeamCity, который дёргает CloudFormation, который выполняет развёртывание кластера EMR, который поднимает на кластере Livy, через который на Spark сабмитится JAR с приложением Data Cooker Dist, которому передаются параметры запуска через командную строку, который обращается к хранилищу S3, чтобы скопировать данные в HDFS, после чего сабмитится уже сам Data Cooker ETL с путём к скрипту и файлу с переменными контекста верхнего уровня, ну и пошла жара. А как посчитали — забрали результат и погасили кластер, чтобы деньги в простое попусту не кушать.

И так столько раз, сколько процессов нужно выполнить. Бывает, и тысячи процессов в день.


Впрочем, при внедрении новых процессов быстро становится понятно, что отлаживать их на сэмплах прямо на кластере где-то в облаке на другой стороне земного шарика — это ой как неудобно. Так что неплохо бы ещё уметь запускаться на физическом железе поблизости, а в идеале — и вовсе на локальной машине, без кластера Spark.


Такое отнюдь не невозможно.


Контекст спарка прекрасно запускается в локальном режиме как обычное JVM приложение. Правда, если упаковать его в FatJAR со всеми зависимостями, весить такая сборка будет под 200 мегабайтов.


Само по себе это не проблема. Проблема, если FatJAR надо деплоить много раз на дню в облако (на той стороне шарика). Что ж, в таком случае надо просто научиться собирать приложение с внешними зависимостями под целевое облачное окружение, то есть, зависеть от JAR-ников тех версий, которые развёрнуты там, в облаке.


Минусом тут будет как раз версионность: облачные кластера Spark, как правило, значительно отстают от актуальных, и тем более от bleeding edge. Официально-то в амазоновском облаке до сих пор крутится Java 8, даже не 11 (хотя она там есть, и неофициально юзать её всё-таки можно). Но как-нибудь переживём, преодолеем.


В качестве сборочной системы у нас Apache Maven, он профили сборки поддерживает.


<profiles>
    <profile>
        <id>emr</id>
        <activation>
            <activeByDefault>true</activeByDefault>
        </activation>
    </profile>
    <profile>
        <id>local</id>
        <activation>
            <activeByDefault>false</activeByDefault>
        </activation>
    </profile>
</profiles>

Поэтому для начала посмотрим на то, что там в составе EMR (допустим, актуальная версия сейчас 6.9), и пропишем в pom.xml зависимости от тех версий либ, которые нам доступны.


<!-- EMR 6.9 -->
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.12</artifactId>
    <version>3.3.0</version>
</dependency>
<dependency>
    <groupId>org.apache.parquet</groupId>
    <artifactId>parquet-hadoop</artifactId>
    <version>1.12.2</version>
</dependency>

<dependency>
    <groupId>com.amazonaws</groupId>
    <artifactId>aws-java-sdk-s3</artifactId>
    <version>1.12.385</version>
</dependency>
<dependency>
    <groupId>commons-cli</groupId>
    <artifactId>commons-cli</artifactId>
    <version>1.5.0</version>
</dependency>
<!-- end EMR deps -->

Далее найдём те либы, которые в облачной версии нас не устраивают, и используем maven-shade-plugin, чтобы переписать имена пакетов на приватную версию. С учётом минимальности наших зависимостей таких библиотек должно быть немного. В идеале вообще не должно быть. По секрету расскажу, что в последних версиях к этой цели мы пришли, так что и показывать нечего.


На последнем шаге используем разные профили для включения или исключения библиотек в результирующий FatJAR, и тем самым добьёмся своей цели. На CI/CD будем собирать артефакт урезанного профиля с внешними зависимостями:


<profile>
    <id>emr</id>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.4.1</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <createDependencyReducedPom>false</createDependencyReducedPom>
                            <transformers>
                                <transform
                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>io.github.pastorgl.datacooker.cli.Main</mainClass>
                                </transform>
                            </transformers>
                            <artifactSet>
                                <excludes>
                                    <exclude>org.apache.hadoop</exclude>
                                    <exclude>commons-collections</exclude>
                                    <exclude>commons-logging</exclude>
                                    <exclude>org.apache.commons:commons-lang3</exclude>
                                    <exclude>org.apache.spark</exclude>
                                    <exclude>org.scala-lang</exclude>
                                </excludes>
                                <includes>
                                    <include>io.github.pastorgl.datacooker</include>
                                    <include>com.opencsv</include>
                                    <include>org.locationtech.jts</include>
                                    <include>org.wololo</include>
                                    <include>io.github.classgraph</include>
                                    <include>com.github.alexmojaki</include>
                                    <include>net.sf.geographiclib</include>
                                    <include>commons-cli</include>
                                    <include>com.uber</include>
                                    <include>io.jenetics</include>
                                    <include>de.undercouch:bson4jackson</include>
                                    <include>io.logz</include>
                                    <include>org.glassfish.hk2:guice-bridge</include>
                                    <include>org.eclipse.jetty</include>
                                    <include>com.google.inject</include>
                                    <include>org.glassfish.jersey.media:jersey-media-json-jackson</include>
                                    <include>org.glassfish.jersey.ext:jersey-entity-filtering</include>
                                </includes>
                            </artifactSet>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</profile>

После исключения Spark из FatJAR размер артефакта составляет 7 мегабайт, что уже нормально. А вот в профиле local мы секцию <artifactSet> попросту не укажем, из-за чего получим полный артефакт, в котором весь classpath включён.


Собственно, осталось запуститься. Согласно табличке из начала этого раздела, для запуска в пакетном режиме нам надо будет использовать такой набор ключей:


java -jar ./datacooker-etl-cli.jar -l -s ./scripts/script.tdl

Класс Main, упомянутый в конфиге мавена выше, является диспетчером, который делает следующие вещи:


  • разбирает ключи командной строки, создавая инстанс конфигурации обработчика режима,
  • выясняет, какой именно из множества режимов запуска имеет в виду пользователь,
  • если нужно, поднимает локальный контекст Spark, и конфигурирует его,
  • создаёт какие-то из 4 возможных классов обработчиков режимов, инициализируя их конфигурацией и контекстом Spark (если это было запрошено),
  • и передаёт им управление в правильном порядке.

Обработчики режимов такие:


  • Runner — разбор синтаксиса, и если нужно, то интерпретация файла скрипта в локальном или кластерном окружении,
  • Local — сервер и клиент REPL в одном флаконе, для локальной интерактивной отладки,
  • Server — сервер REPL на основе jersey, который слушает запросы по HTTP, и исполняет переданные ему команды; может быть запущен и на кластере тоже,
  • Client — соответственно, jax-rs клиент для сервера, не требующий контекста Spark вообще.

А всех нужных нам режимов запуска — для любого мыслимого кейса эксплуатации нашего инструмента — мы добьёмся сочетанием параметров, и диспетчеризацией управления между этими обработчиками.


Более того, для прогона junit тестов при сборке можно запросто унаследоваться от обработчика Runner, заведя TestRunner, в котором задан локальный запуск Spark, а пути к скриптам перехватываются в TestDataContext, и перенаправляются к ресурсам сборки. Таким образом, прогон тестов будет по-настоящему интерпретировать настоящие скрипты из дерева исходников, каждый в своём изолированном контексте. Например,


public class CountByKeyOperationTest {
    @Test
    public void mapCountTest() {
        try (TestRunner underTest = new TestRunner("/test.countByKey.tdl")) {
            Map<String, JavaPairRDD<Object, Record<?>>> ret = underTest.go();

            JavaPairRDD<Object, Record<?>> left = ret.get("left");
            assertEquals(
                    6,
                    left.count()
            );

            Map<Object, Record<?>> result = ret.get("counted").collectAsMap();

            assertEquals(
                    3,
                    result.size()
            );

            for (Record<?> l : result.values()) {
                assertEquals(2L, l.asLong("_count").longValue());
            }
        }
    }
}

…который прогоняет следующий скрипт из ресурсов:


CREATE "left" hadoopText() FROM 'data/bar.csv' PARTITION 1;

TRANSFORM "left" textToColumnar(@delimiter=',')
    VALUE (foo,"2",_,_,_,_,_,_,_,_,_,_,_,_,_,_,_,_,_,_,"21") KEY foo;

CALL countByKey() INPUT "left" OUTPUT counted;

COPY counted hadoopText() INTO 'counted';

Это уже, конечно, совсем не юнит-тесты в их истинном смысле, наоборот, вполне себе интеграционные. Зато и покрытие получается более чем замечательное — ведь каждый тест производит настоящий запуск всего движка целиком в пакетном режиме.


Скрипт, кстати, вполне может послужить примером использования соответствующей функции, поэ… Стоп. Так-так-так.


На самом деле, идея использовать настоящие скрипты из тестов в качестве дополнения к документации пришла мне в голову не сразу, но как только пришла, я сразу перенёс вызов небольшой дополнительной утилиты, которая собирает доку из метаданных, в тестовый scope, и тем самым получил свой «автодокументатор мечты».


Однако, сегодняшняя статья даже для пятничной уже слишком длинная, так что продолжим после выходных. Не переключайтесь!


Ссылочки. Как же без ссылочек?

Исходники: https://github.com/PastorGL/datacooker-etl
Промо-страница: https://pastorgl.github.io/datacooker-etl
Группа в Телеграме: https://t.me/data_cooker_etl
И ещё одна группа в Телеграме, участники которой во многом и надоумили меня взяться за разработку данного проекта: https://t.me/hadoopusers

Комментарии (0)