Вот уже который год я потихоньку разрабатываю SQL-ный движок на основе Apache Spark, специализированный под задачи ETL. И хотя диалект языка изначально называется «Transform Definition Language», писать трансформации данных непосредственно на нём самом до сих пор было невозможно. Вместо этого на фазе Transform предполагалось использовать подключаемые модули, которые рантайм интерпретатора предоставляет из Java classpath, вызывая их как процедуры.
Это очень эффективный способ трансформации данных — с точки зрения производительности, — но он довольно долгий с точки зрения внедрения, и дорогой в разработке. Потому что сначала трансформацию надо описать формально в виде статьи-whitepaper'а (это делает data scientist), потом написать прототип на Python (ответственность data analyst-а), отладиться на сэмпле реальных данных (чем тоже занимается аналитик), и тогда уже делать и оптимизировать финальную имплементацию на Java с использованием низкоуровневого API Spark (собственно, задача разработчика). Неудобно. Куча участников и изолированных шагов.
При этом в интерпретатор уже была добавлена поддержка процедур и функций, написанных на самом языке. Причём, функции могут вызываться как сами по себе (обычные), так и в контексте записи датасета (RECORD-функции) — то есть, в таких местах, как списки выражений в SELECT.
SELECT выражение, выражение, ... FROM источник INTO назначение WHERE логическое_выражение;
Напрашивается такой вопрос: а почему бы не расширить поддержку функций для множества записей? В конце концов, каждая уважающая себя имплементация SQL поддерживает UDF в том или ином виде (как UDAF, а многие ещё и UDTF). И если обычные функции в TDL являются почти что прямым аналогом хайвовских UDF, то вполне логично добавить и поддержку аббревиатур, отличающиеся всего одной буквой :)
Вопрос только в том, как именно. Тут надо отвлечься и чутка пофилософствовать на уровне абстракций повыше.
Во-первых, у нас тут всё-таки Spark под капотом, а он MapReduce на стероидах, а не многоуровневый агрегатор индекс/блок/страница с произвольной адресацией записи, как классические СУБД. То есть, элементарным адресуемым куском данных в Spark является партиция, а чтобы добраться в ней до конкретной записи, надо использовать итератор — явно ли, неявно ли, но обязательно. Нет в нём «страниц», чтобы из них что-то дёргать по смещению напрямую.
Различие на идеологическом уровне. И если внимательно посмотреть, как написан тот же самый Spark SQL, то там из-за этого с имплементацией некоторых фич SQL всё довольно печально, потому что прямая адресация не очень-то соотносится с итераторами MapReduce. Точнее говоря, в тех местах, семантика у двух подходов совпадает (т.е. когда надо пройтись по множеству записей, как в каких-нибудь агрегирующих функциях или курсорах), он справляется на отлично, а вот там, где нужен настоящий прямой доступ к каждой записи по отдельности... там всё обстоит так себе, потому что требуются некоторые костыли.
Но у нас, в отличие от разработчиков из сообщества Apache, не было и нет цели максимально приближаться к знакомому по другим имплементациям поведению, поэтому я, будучи техлидом, могу позволить себе всяческие вольности в постановке задачи под наш конкретный проект. То есть, реализовывать только то подмножество, что нам нужно, и так, как нам нужно, а не как предписывает какой-либо стандарт. И полностью забить на то, что не нужно ради эффективного ETL.
Тут стоит ещё кое-что прояснить (пусть будет «во-вторых»). СУБДшный «курсор» ведь, вообще говоря, штука изначально довольно-таки избыточная. Помнится, ещё когда в стародавние времена приходилось писать под Oracle, я поначалу сильно удивлялся обилию тамошних способов перебора записей. Таблицы, вьюхи, табличные функции, а ещё материализованные вьюхи, и вдобавок курсоры — которые являются материализацией не просто запроса, а мгновенного слепка запроса, ещё и позволяющие гулять по нему туда-сюда.
Двайте-ка сразу вспомним, как оно там было:
DECLARE
CURSOR cursor_name IS
SELECT column_name, other_column FROM some_table;
cursor_rec cursor_name%ROWTYPE;
BEGIN
OPEN cursor_name;
LOOP
FETCH cursor_name INTO cursor_rec;
EXIT WHEN cursor_name%NOTFOUND;
-- Process the fetched data
END LOOP;
CLOSE cursor_name;
END;
Курсор удобен тем, что в отличие от запроса или вьюхи он позволяет принимать решение на уровне каждой итерируемой записи, что же с нею делать, по мере её получения, а не декларировать намерение до того, как запрос был выполнен.
С точки зрения самого же SQL, описывающего в общем виде, ЧТО хочет получить пользователь, и отдающего на откуп движку, КАК он это получит, курсоры позволяют описать КАК ИМЕННО хочется получить результат.
Императивщина, идущая вразрез со всем остальным, чисто декларативным языком. Поэтому-то в большинстве имплементаций курсоры и доступны только в императивных контекстах — таких как UDF, написанных на языке, и редкая СУБД позволяет использовать их вне императивного контекста (а какой-нибудь PostgreSQL со своим внезапным декларативным FETCH тут, конечно, отличился). Но если императивщиной не злоупотреблять, то она реально удобно для определённого круга задач. А у нас язык и без того сильно императивный...
Так. Исходя из вышесказанного, для простого и эффективного решения нашей задачи можно принять несколько волюнтаристское решение, и не просто провести параллель между такими понятиями как «курсор» из СУБД и «итератор по партиции» из Spark RDD API, а поставить знак равенства между ними. Прямо так и сказать, что передаваемый в лямбду в .mapPartition() итератор — это курсор.
Хм. Но ведь в нормальном SQL никаких «партиций» нет, и курсор работает на уровне всего запроса... Зато у нас Spark, а это означает, что MapReduce продвинутый, и позволяет делать Map–Map–Map, даже и без Reduce вообще. То есть, можно после первого Map воткнуть repartition в одну партицию, и вторым шагом слить результаты «редьюса» каждой партиции в один результат... если необходимо. То есть, писать что-то такое:
ALTER dsName
TRANSFORM aggregatePerPartition(parameters)
PARTITION 1
TRANSFORM aggregatePartitionResults(parameters);
Две агрегирующие функции вместо одной, первая делает reduce на каждую партицию, вторая собирает все агрегации партиций в общий reduce. Ну и вообще, таких шагов в одном ALTER можно делать много — столько, сколько нужно.
Для этого придётся чуть-чуть переделать разбор самого оператора
ALTER, потому что он у нас был рассчитан на синтаксис видаALTER dsName [TRANSFORM transform] [KEY expression] [PARTITION N]в таком вот конкретном порядке, а теперь необходимо рассматривать его какALTER dsName [transform_item]..., где каждый transform item — это любое из выраженийTRANSFORM/KEY/PARTITION, и быть из теперь может сколько угодно. Впрочем, подобное изменение парсера и интерпретатора реализуется достаточно тривиально, чтобы не заморачиваться (и не рассказывать о нём подробно).
(Заодно я ещё решил внести в поддержку ALTER подвыражение INTO newDsName, которое вместо замены датасета делает новую копию; ну и добавил для этого случая поддержку JOIN / UNION в качестве источника, взяв её из SELECT. Всё равно она там уже написана, так чего бы не переиспользовать, если по смыслу подходит? Выглядеть со стороны стандарта это должно довольно странно, но у нас тут свой диалект, свои правила. Рассказывать подробно об этом я тоже не стану, а то объём статьи и без того получается порядочный.)
В-третьих, для наших, чисто ETL-ных задач не стоит замахиваться на всякие сильно продвинутые штуки типа из репертуара серьёзных СУБД, — типа scrollable и/или updatable курсоров, — достаточно быть примитивными, как MySQL, где по ним можно пройти один раз с начала до конца, и на этом ограничиться. То есть, результатом сразу же будет новый DS.
В-последних, в отличие от любой нормальной базы данных, которые без схемы никуда не едут, наш движок SQL с практической точки зрения schema-less (точнее: с контекстной схемой), и описывать формальный тип записи для определения курсора нет необходимости, достаточно лишь указать тип датасета в целом. Также, из-за того, что мы изначально приравняли курсор к итератору по партиции Spark RDD, нам его даже и открывать-то не нужно, мы спокойно обойдёмся и без церемониальных DECLARE CURSOR / OPEN / CLOSE.
Короче, всех этих полу-абстрактных рассуждений уже вполне достаточно, дабы можно было написать желаемую грамматику для определения «курсорной функции»:
CREATE [OR REPLACE] TRANSFORM transformName(@parameter[=defaults]...)
FROM dsType... INTO dsType
AS BEGIN [LOOP]
statements...
END [TRANSFORM];
Выглядит оно так потому, что наш движок с самого начала поддерживает «трансформы» как объекты-плагины из Java classpath — и мы без лишних усложнений вытаскиваем соответствующий Java-интерфейс из недр движка на уровень интерпретатора:
public abstract class Transformer extends Pluggable<Input, Output> {
private Configuration params;
private DataStream inputStream;
private DataStream outputStream;
private Map<ObjLvl, List<String>> newColumns;
protected String outputName;
@Override
final public void configure(Configuration params) throws InvalidConfigurationException {
this.params = params;
}
@Override
final public void initialize(Input input, Output output) throws InvalidConfigurationException {
this.inputStream = input.dataStream;
this.newColumns = output.requested;
this.outputName = (output.name != null) ? output.name : inputStream.name;
}
@Override
final public void execute() throws RuntimeException {
outputStream = transformer().apply(inputStream, newColumns, params);
}
@Override
final public Map<String, DataStream> result() {
return Map.of(outputName, outputStream);
}
protected abstract StreamTransformer transformer();
}
Следовательно, имплементация курсорной функции на уровне Java будет заключаться всего лишь в написании ещё одного такого плагина-трансформа, — просто вместо выполнения какого-то жёстко заданного алгоритма он в методе transformer() будет бегать по коду на SQL. Заранее подготовленному в виде AST. То есть, этакий мини-интерпретатор внутри интерпретатора...
Впрочем, на самом деле и такая шутка у нас уже тоже была написана в прошлом релизе — но для обычных in-language функций. Её надо чуточку причесать и расширить, потому что внутри трансформа необходимо поддерживать чуть больше операторов SQL, чем LET, LOOP, IF, и RETURN, — коли уж мы делаем наши «курсоры» с некоторой оглядкой на Oracle.
Церемонии с DECLARE / OPEN / CLOSE, как ранее было сказано, нам нафиг не сдались. И LOOP необязательно писать в явном виде по той же самой причине — всё тело трансформа представляет собой цикл.
Значит, нужен FETCH и какой-то аналог EXIT. Последний, кстати, у нас тоже уже есть в виде оператора RETURN (который мы заводили для возврата значения из функции). Вполне можно переиспользовать его, чтобы не плодить ключевые слова без острой необходимости... вопрос только, а чего он должен возвращать?
MapReduce тем и отличается от курсоров в SQL, что не на каждый FETCH нужно что-то возвращать. Точнее, если возвращать что-то на каждый, то это будет чистая трансформация. А вот если на несколько фетчей что-то одно — это редьюсер. И наоборот, если возвращать несколько результатов на один фетч, то получим генератор. Значит, RETURN для цели возврата не подходит, и за ним остаётся роль завершителя обработки — выхода из цикла.
Также, в отличие от Oracle, курсор у нас не будет иметь явного имени, потому что внутри каждого трансформа всегда существует только один итератор. Значит, для возврата будем использовать конструкцию с автоматической булевой переменной, которую можно и нужно назвать по аналогии $NOTFOUND:
FETCH;
IF $NOTFOUND THEN RETURN; END;
Теперь разберёмся самим FETCH. В обычных Record-level функциях у нас ключ и объект записи помещаются в анонимные переменные, для доступа к которым используются специальные функции REC_OBJECT(), REC_KEY(), REC_PIVOT() и т.п. — и этого достаточно, по той причине, что в контексте функции запись всегда гарантированно одна. Но в трансформе, который выполняет роль редьюсера, записей может быть саккумулировано много. Неплохо бы дать им явные имена переменных. Получается, что для FETCH надо предусмотреть такие формы:
FETCH [INTO] @rec_obj;
FETCH [INTO] @rec_key, @rec_obj;
(Сигил @ у нас в TDL как раз зарезервирован для определения имён.)
Отлично. Для полноты можно пропатчить обработку Record-level функций, чтобы они тоже имели доступ к ключу и объекту записи через явные переменные:
CREATE FUNCTION funcName(parameters...)
RECORD FETCH INTO @rec_key, @rec_obj AS
BEGIN
statements...
END;
Но не отвлекаемся. Если RETURN в т��ансформах отвечает только за завершение цикла, то для возврата значений будет служить другой оператор. Мелочиться не станем, и назовём его YIELD:
YIELD key_expression, object_expression;
Ну и теперь давайте для примера соберём всё это в кучу:
CREATE OR REPLACE FUNCTION point_to_outline(@lat_center, @lon_center, @radius) AS BEGIN
$earth_radius = 6371210;
$temp_arr = ARR_FILL(13, NULL);
$angles = [0, -30, -60, -90, -120, -150, 180, 150, 120, 90, 60, 30, 0];
LOOP $ang IN RANGE[0, 12] BEGIN
$lat = $lat_center + $radius / $earth_radius * COS(ARR_ITEM($angles, $ang) * PI() / 180) * 180 / PI();
$lon = $lon_center + $radius / $earth_radius * SIN(ARR_ITEM($angles, $ang) * PI() / 180) / COS($lat_center * PI() / 180) * 180 / PI();
ARR_PUT($temp_arr, $ang, ARR_MAKE($lon, $lat));
END;
RETURN $temp_arr;
END;
CREATE OR REPLACE FUNCTION make_polygon(@outline = [], @name) AS BEGIN
$outlines_l = ARR_LENGTH($outline);
IF $outlines_l < 4 THEN
RAISE 'makePolygon: @outline ARRAY must have 4 or more members';
END;
$coords = ARR_RANGE(0, $outlines_l - 1);
LOOP $o IN $coords BEGIN
$item = ARR_ITEM($outline, $o);
ARR_PUT($coords, $o, '[' || ARR_ITEM($item, 0) || ',' || ARR_ITEM($item, 1) || ']');
END;
RETURN POLY_FROM_JSON('\{"type":"Feature","properties":\{"name":"{$name}"\},"geometry":\{"type":"Polygon","coordinates":[[' || ARR_JOIN($coords, ',') || ']]\}\}');
END;
CREATE OR REPLACE TRANSFORM poi_to_polygon(@radius=183.)
FROM Columnar INTO Polygon
BEGIN
FETCH INTO @p_key, @p_record;
IF $NOTFOUND THEN RETURN; END;
$pivot = REC_PIVOT($p_record);
$points_arr = point_to_outline(ARR_ITEM($pivot, 1), ARR_ITEM($pivot, 2), $radius);
YIELD $p_key, make_polygon($points_arr, ARR_ITEM($pivot, 0));
END;
ALTER ad_banners poi_to_polygon();
Что тут у нас происходит? Давайте разберём.
Во-первых, на вход трансформу point_to_polygon предполагается подавать колоночный датасет, в 0-й колонке которого содержится имя, а в 1-й и 2-й — широта и долгота точки. Трансформ получает ключ и запись в переменные, и вызывает функцию REC_PIVOT(), чтобы сделать из объекта колоночной записи индексированный массив. Уже с этим массивом сначала вызывается фун��ция point_to_outline(), генерирующая аппроксимацию круга заданного радиуса 12-угольником (диаметром 183 метра по умолчанию), а затем функция make_polygon() возвращает GeoJSON объект, в который добавлено имя исходной точки.
То есть, наш трансформ для каждой точки исходного датасета генерирует объект-полигон заданного радиуса. Если писать такое на Java, строк исходного кода будет не меньше, да ещё и тестами надо обкладывать, метаданные добавлять, документацию писать... Аналитик же набросала себе такую функцию за полчаса, прогнала на кусочке реального датасета, и посмотрела на результат в QGIS — вот и весь путь от идеи до продакшена.
Возьмём теперь более простой пример, с тупеньким генератором:
CREATE OR REPLACE TRANSFORM bullshitGenerator(@how_much Comment 'how much is such?' = 100 comment '100 by default')
FROM Any INTO Columnar COMMENT '''tis a generator'
BEGIN
FETCH INTO @k, @v;
IF $NOTFOUND THEN RETURN; END;
LOOP $BS IN ARR_RANGE(0, $how_much - 1) BEGIN
YIELD $k, COLUMNAR_MAKE(['bs'], ARR_MAKE($BS));
END;
END;
ALTER sample TRANSFORM bulshitGenerator(@how_much=10) INTO sample_x10;
Для нагрузки (и просто тестирования алгоритмов) иногда надо бывает нагенерировать болваночный датасет. До сих пор приходилось брать какой-нибудь кусок реальных данных в виде файла, и руками копировать его много раз. Неудобно, плюс ещё и записи повторяются. Ну а теперь мы можем задавать свои правила, каким образом создавать копии — в точности, или полным рандомом, или как-то ещё. Здесь вот на каждый исходный ключ создаётся своя новая нумерованная запись.
Для редьюсера я, честно говоря, примера от наших аналитиков так и не дождался, потому что основной хлеб нашего бизнеса это разнообразный ETL, а для базовой статистики в языке есть специальный оператор ANALYZE — и его обычно хватает. (Впрочем, у нас есть в classpath плагин-трансформ по имени countUniques, считающий уникальные значения под ключами, и рано или поздно его возможностей станет недостаточно, поэтому редьюсеры под частные задачи будут написаны когда-нибудь в будущем.)
Ну, разобрали семантику на уровне SQL, можно перейти к имплементации этого безобразия на Java.
Но начнём, как обычно, с грамматики ANTLR:
create_transform
: ( K_CREATE ( S_OR K_REPLACE )? )? K_TRANSFORM func ( S_OPEN_PAR params_decl? S_CLOSE_PAR )?
from_stream_type into_stream_type comment?
K_AS? K_LOOP? K_BEGIN transform_stmts K_END K_TRANSFORM?
;
from_stream_type
: K_FROM stream_type ( S_COMMA stream_type )*
;
into_stream_type
: K_INTO stream_type
;
stream_type
: T_COLUMNAR | T_PASSTHRU | T_POINT | T_POLYGON | T_RAW | T_STRUCT | T_TRACK
;
transform_stmts
: ( transform_stmt S_SCOL )*
;
transform_stmt
: let_func | loop_transform | if_transform | fetch_stmt | yield_stmt | return_transform | raise_stmt
;
return_transform
: K_RETURN
;
fetch_stmt
: K_FETCH ( K_INTO? S_AT L_IDENTIFIER ( S_COMMA S_AT L_IDENTIFIER )? )?
;
yield_stmt
: K_YIELD expression S_COMMA expression
;
raise_stmt
: K_RAISE msg_lvl? expression
;
loop_transform
: K_LOOP var_name S_IN? expression K_BEGIN transform_stmts ( K_ELSE transform_stmts )? K_END K_LOOP?
;
if_transform
: K_IF expression K_THEN transform_stmts ( K_ELSE transform_stmts )? K_END K_IF?
;
Правилам из transform_stmts будут соответствовать такие вот классы на Java:
public enum TDLStatement {
RETURN, LET, IF, LOOP, RAISE, YIELD, FETCH
}
public class StatementItem implements Serializable {
final TDLStatement statement;
final String[] control;
final List<Expressions.ExprItem<?>>[] expression;
final List<StatementItem> mainBranch;
final List<StatementItem> elseBranch;
public static class Builder {
private final TDLStatement statement;
private String[] control = null;
private List<Expressions.ExprItem<?>>[] expression = null;
private List<StatementItem> mainBranch = null;
private List<StatementItem> elseBranch = null;
public Builder(TDLStatement statement) {
this.statement = statement;
}
public Builder control(String control) {
this.control = new String[]{control};
return this;
}
public Builder control(String[] control) {
this.control = control;
return this;
}
public Builder expression(List<Expressions.ExprItem<?>> expression) {
this.expression = new List[]{expression};
return this;
}
public Builder expression(List<Expressions.ExprItem<?>>[] expression) {
this.expression = expression;
return this;
}
public Builder mainBranch(List<StatementItem> mainBranch) {
this.mainBranch = mainBranch;
return this;
}
public Builder elseBranch(List<StatementItem> elseBranch) {
this.elseBranch = elseBranch;
return this;
}
public StatementItem build() {
return new StatementItem(statement, control, expression, mainBranch, elseBranch);
}
}
private StatementItem(TDLStatement statement, String[] control, List<Expressions.ExprItem<?>>[] expression, List<StatementItem> mainBranch, List<StatementItem> elseBranch) {
this.statement = statement;
this.control = control;
this.expression = expression;
this.mainBranch = mainBranch;
this.elseBranch = elseBranch;
}
@Override
public String toString() {
return statement.name() + ((control != null) ? "$" + String.join(", $", control) : "");
}
}
То есть, каждый оператор может иметь (при необходимости):
массив control variables (
FETCH);массив expressions (
LET,YIELD);и две ветки исполнения: основную и
ELSE(IF,LOOP).
А может ничего этого и не иметь (как RETURN).
Портянку кода интерпретатора, который пробегается по AST правила create_transform, сгенерированным ANTLR, и собирает список операторов в список StatementItem я приводить не хочу. Она хоть и страшная, но тривиальная (при желании можете посмотреть в репозитории самостоятельно, класс TDLInterpreter, методы createTransform() и transformStatements().
А вот имплементацию интерпретатора этого списка, хоть она и является чуточку расширенной версией интерпретатора кода функций, привести стоит, несмотря на её объём:
public class TDLTransform {
public static Builder builder(String name, String descr, StreamType.StreamTypes from, StreamType.StreamTypes into, List<StatementItem> items, VariablesContext vc) {
return new Builder(name, descr, from, into, items, vc);
}
public static StatementItem fetch(String[] control) {
return new StatementItem.Builder(TDLStatement.FETCH).control(control).build();
}
public static StatementItem yield(List<Expressions.ExprItem<?>>[] expression) {
return new StatementItem.Builder(TDLStatement.YIELD).expression(expression).build();
}
public static StatementItem transformReturn() {
return new StatementItem.Builder(TDLStatement.RETURN).build();
}
public static StatementItem let(String controlVar, List<Expressions.ExprItem<?>> expression) {
return new StatementItem.Builder(TDLStatement.LET).control(controlVar).expression(expression).build();
}
public static StatementItem transformIf(List<Expressions.ExprItem<?>> expression, List<StatementItem> ifBranch, List<StatementItem> elseBranch) {
return new StatementItem.Builder(TDLStatement.IF).expression(expression).mainBranch(ifBranch).elseBranch(elseBranch).build();
}
public static StatementItem loop(String controlVar, List<Expressions.ExprItem<?>> expression, List<StatementItem> loopBranch, List<StatementItem> elseBranch) {
return new StatementItem.Builder(TDLStatement.LOOP).control(controlVar).expression(expression).mainBranch(loopBranch).elseBranch(elseBranch).build();
}
public static StatementItem raise(String level, List<Expressions.ExprItem<?>> expression) {
return new StatementItem.Builder(TDLStatement.RAISE).control(level).expression(expression).build();
}
public static class Builder extends ParamsBuilder<Builder> {
private final String name;
private final String descr;
private final StringJoiner descrJoiner = new StringJoiner(", ");
private final List<StatementItem> items;
private final VariablesContext vc;
private final StreamType resultType;
private final PluggableMetaBuilder metaBuilder;
private Builder(String name, String descr, StreamType.StreamTypes from, StreamType.StreamTypes into, List<StatementItem> items, VariablesContext vc) {
this.name = name;
this.descr = descr;
this.items = items;
this.vc = vc;
this.metaBuilder = new PluggableMetaBuilder(name);
metaBuilder.transform();
metaBuilder.input(from, "Input DS types");
metaBuilder.output(into, "Output DS type");
this.resultType = into.types[0];
}
public Builder mandatory(String name, String comment) {
if (descr == null) {
descrJoiner.add("@" + name);
}
metaBuilder.def(name, comment);
return this;
}
public Builder optional(String name, String comment, Object value, String defComment) {
if (descr == null) {
descrJoiner.add("@" + name + " = " + value);
}
metaBuilder.def(name, comment, value.getClass(), value, defComment);
return this;
}
public PluggableInfo build() {
metaBuilder.descr((descr == null) ? descrJoiner.toString() : descr);
PluggableMeta meta = metaBuilder.build();
Pluggable<?, ?> transformer = new FunctionTransformer(name, resultType, items, vc) {
public PluggableMeta meta() {
return meta;
}
};
return new PluggableInfo(meta, transformer);
}
}
private static abstract class FunctionTransformer extends Transformer {
private final String name;
private final StreamType resultType;
private final List<StatementItem> items;
private final VariablesContext vc;
public FunctionTransformer(String name, StreamType resultType, List<StatementItem> items, VariablesContext vc) {
this.name = name;
this.resultType = resultType;
this.items = items;
this.vc = vc;
}
protected StreamTransformer transformer() {
return (ds, newAttrs, params) -> {
final Map<ObjLvl, List<String>> attrs = (newAttrs != null) ? newAttrs : ds.attributes();
VariablesContext thisCall = new VariablesContext(vc);
for (String param : params.definitions()) {
thisCall.putHere(param, params.get(param));
}
Broadcast<VariablesContext> broadVars = JavaSparkContext.fromSparkContext(ds.rdd().context()).broadcast(thisCall);
Broadcast<List<StatementItem>> broadStmt = JavaSparkContext.fromSparkContext(ds.rdd().context()).broadcast(items);
return new DataStreamBuilder(outputName, attrs)
.transformed(name, (resultType == StreamType.Passthru) ? ds.streamType : resultType, ds)
.build(ds.rdd().mapPartitionsToPair(it -> {
List<Tuple2<Object, DataRecord<?>>> ret = new ArrayList<>();
VariablesContext vars = new VariablesContext(broadVars.getValue());
List<StatementItem> stmts = broadStmt.getValue();
CallContext cc = new CallContext(it, ret);
while (!cc.returnReached) {
cc.eval(stmts, vars);
}
return ret.iterator();
}, true));
};
}
}
private static class CallContext {
private final Iterator<Tuple2<Object, DataRecord<?>>> it;
private final List<Tuple2<Object, DataRecord<?>>> ret;
private Object key;
private DataRecord<?> rec;
boolean returnReached = false;
public CallContext(Iterator<Tuple2<Object, DataRecord<?>>> it, List<Tuple2<Object, DataRecord<?>>> ret) {
this.it = it;
this.ret = ret;
}
void eval(List<StatementItem> items, VariablesContext vc) {
for (StatementItem fi : items) {
if (returnReached) {
return;
}
switch (fi.statement) {
case FETCH: {
if (it.hasNext()) {
Tuple2<Object, DataRecord<?>> t = it.next();
key = t._1;
rec = t._2;
vc.putHere(Constants.FETCH_VAR, false);
} else {
key = null;
rec = null;
vc.putHere(Constants.FETCH_VAR, true);
}
if (fi.control.length == 1) {
vc.putHere(fi.control[0], rec);
} else if (fi.control.length == 2) {
vc.putHere(fi.control[0], key);
vc.putHere(fi.control[1], rec);
}
break;
}
case YIELD: {
Object rec = Expressions.eval(key, this.rec, fi.expression[1], vc);
ret.add(new Tuple2<>(Expressions.eval(key, this.rec, fi.expression[0], vc),
(rec instanceof DataRecord<?>) ? (DataRecord<?>) rec : new PlainText(String.valueOf(rec))));
break;
}
case RETURN: {
returnReached = true;
return;
}
case LET: {
Object o = Expressions.eval(key, rec, fi.expression[0], vc);
if (fi.control[0] != null) {
vc.put(fi.control[0], o);
}
break;
}
case IF: {
if (Expressions.bool(key, rec, fi.expression[0], vc)) {
eval(fi.mainBranch, vc);
} else {
if (fi.elseBranch != null) {
eval(fi.elseBranch, vc);
}
}
break;
}
case LOOP: {
Object expr = Expressions.eval(key, rec, fi.expression[0], vc);
boolean loop = expr != null;
Object[] loopValues = null;
if (loop) {
loopValues = new ArrayWrap(expr).data();
loop = loopValues.length > 0;
}
if (loop) {
VariablesContext vvc = new VariablesContext(vc);
for (Object loopValue : loopValues) {
if (returnReached) {
return;
}
vvc.putHere(fi.control[0], loopValue);
eval(fi.mainBranch, vvc);
}
} else {
if (fi.elseBranch != null) {
eval(fi.elseBranch, vc);
}
}
break;
}
case RAISE: {
Object msg = Expressions.eval(key, rec, fi.expression[0], vc);
switch (MsgLvl.get(fi.control[0])) {
case INFO -> System.out.println(msg);
case WARNING -> System.err.println(msg);
default -> {
returnReached = true;
throw new RaiseException(String.valueOf(msg));
}
}
break;
}
}
}
}
}
}
Для каждого из операторов, которые могут встретиться в теле трансформа, заведён свой личный build-метод.
Далее, есть builder для трасформа целиком, который позволяет зарегистрировать в контексте исполнения метаданные для всех параметров (как и трансформа в целом) — ведь помимо всего прочего наш интерпретатор имеет REPL, в котором есть команды \SHOW и \DESC, показывающие их пользователю. Не просто же так мы завели поддержку выражения COMMENT в декларации трансформа (которую подсмотрели в MySQL).
Метод transformer() имплементирует наш собственный API для Java classpath плагинов. Именно в него передаётся Spark RDD, над которой производится экзекуция, тут же и дёргается .mapPartitionToPair(), в которой становится доступен наш итератор по партиции — он же курсор.
И, наконец, внутренний класс CallContext и является тем самым магическим контекстом исполнения операторов SQL над объектами Java — самая глубоко вложенная часть нашей общей стековой виртуальной машины с контекстами переменных, наследуемых начиная аж из окружения сначала на уровень скрипта, потом в функцию трансформа, а потом и в контекст каждого цикла, ветвления, фетча, и всего остального, что там нам необходимо...
Если всё это разматывать с самого начала — то надо прочитать все мои предыдущие статьи :) где описываются и все контексты интерпретатора, и сам жизненный цикл интерпретации скрипта.
Хотя, на этом этапе уже ничего особенно сложного — всю подготовительную работу мы начали делать ещё пару лет назад, и строимся на основе ранее имплементированного.
В частности, экземпляр плагина-трансформа регистрируется в том же самом объекте-библиотеке рантай��овых сущностей, что и «нормальные» Java classpath плагины трансформов, и вызывается ровно тем же самым стандартным способом, что и они.
Последнее, что необходимо имплементировать для обеспечения работы с генераторами — это функции и операторы для создания объектов записей, а также манипулирования их атрибутами. Ранее они были не нужны, потому что объекты записей в SQL мы только читали, а создавались они всегда на стороне Java кода (либо в плагинах, либо в части интерпретатора, отвечающей за SELECT). А теперь, когда у нас у нас есть оператор YIELD, куда передаётся полный готовый объект — либо созданный с нуля, либо полученный из курсора, и изменённый — мы должны уметь создавать их напрямую в SQL.
Но таковые функции конструкторов объектов получаются весьма тривиальными:
public static class MakeColumnar extends Function.Binary<Columnar, ArrayWrap, ArrayWrap> {
@Override
public Columnar call(Deque<Object> args) {
ArrayWrap keys = Evaluator.popArray(args);
ArrayWrap values = Evaluator.popArray(args);
return new Columnar(Arrays.stream(keys.data()).map(String::valueOf).toList(), values.data());
}
@Override
public String name() {
return "COLUMNAR_MAKE";
}
@Override
public String descr() {
return "Create a Columnar object using first array as names of columns, second as their values";
}
}
public static class ATTR_REMOVE extends Function.Binary<DataRecord<?>, DataRecord<?>, String> {
@Override
public DataRecord<?> call(Deque<Object> args) {
DataRecord<?> obj = (DataRecord<?>) args.pop();
obj.remove(Evaluator.popString(args));
return obj;
}
@Override
public String name() {
return "ATTR_REMOVE";
}
@Override
public String descr() {
return "Take a Record Object, and return it without the specified Attribute";
}
}
public static class GeoStructured extends Function.Unary<Structured, SpatialRecord<?>> {
@Override
public Structured call(Deque<Object> args) {
try {
Object obj = args.pop();
ObjectMapper om = new ObjectMapper();
om.enable(DeserializationFeature.USE_JAVA_ARRAY_FOR_JSON_ARRAY);
String json;
if (obj instanceof PointEx point) {
json = new Feature(new org.wololo.geojson.Point(new double[]{point.getX(), point.getY()}), (Map<String, Object>) point.getUserData()).toString();
} else if (obj instanceof PolygonEx polygon) {
json = new Feature(PolygonConverter.convert(polygon), (Map<String, Object>) polygon.getUserData()).toString();
} else {
return null;
}
return new Structured(om.readValue(json, Object.class));
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@Override
public String name() {
return "STRUCT_FROM_GEO";
}
@Override
public String descr() {
return "Convert a Polygon or Point to Structured Object";
}
}
Все они сводятся к тем или иным вызовам соответствующих конструкторов или методов Java-представления объекта записи, и используют готовую инфраструктуру движка.
Итого: поддержка MapReduce в SQL обошлась нам в 2300 строк кода на Java (из которых 900 — это адаптированный существующий код, и только 1400 написаны с нуля) и 100 на ANTLR, или примерно три недели работы программиста в IDE, после пары недель раздумий и планирования (я до сих пор с удовольствием чёркаю синтаксисы и блок-схемы алгоритмов на бумаге). На внедрение в продакшен, конечно, ушло больше времени, но это уже совсем другая тема...
Исходники: https://github.com/PastorGL/datacooker-etl
Официальная группа в телеге: https://t.me/data_cooker_etl