В предыдущих сериях (FAQ • 1 • 2 • 3 • 4 • 5 • 6 ) мы весьма подробно рассмотрели, как написать на Java собственный интерпретатор объектно-ориентированного диалекта SQL поверх Spark RDD API, заточенный на задачи подготовки и трансформации наборов данных.
В данной части поговорим о том, как добавить в собственный диалект SQL поддержку процедур. Например,
-- library.tdl
CREATE PROCEDURE dwellTimeByMode(@signals, @target, @outPrefix,
@modes = ['pedestrian', 'non_pedestrian', 'car', 'bike'],
@groupid='cell10') AS BEGIN
LOOP $mode IN $modes BEGIN
SELECT * FROM $signals INTO "{$signals}/{$mode}" WHERE mode=$mode;
CALL dwellTime(@signals_userid_attr=userid,
@target_userid_attr=userid,
@target_grouping_attr=$groupid
) INPUT signals FROM "{$signals}/{$mode}", target FROM $target
OUTPUT INTO "{$outPrefix}/{$mode}";
ANALYZE "{$signals}/{$mode}";
ANALYZE "{$outPrefix}/{$mode}";
END;
END;
--- ... --- ... --- ... ---
-- script.tdl
CALL dwellTimeByMode(@signals=$this_month, @target=$population, @outPrefix=$this_month);
Нафига это надо?
Ну, допустим, у нас уже есть некоторое количество SQL ETL кода, наработанного за время эксплуатации инструмента в продакшене, и становится заметно, что значительная часть скриптов на разных проектах совпадает, и из раза в раз повторяется. Логично было бы вынести все эти совпадающие куски в библиотеку, чтобы держать в одном месте, да и вызывать с какими надо параметрами, когда надо. Вот прям как на примере выше.
Ну, если интерпретатор уже написан, то реализовать в нём процедры — это как нефиг делать. Очень простая задача.
Ведь что есть процедура с точки зрения интерпретатора?
Процедура с точки зрения интерпретатора — это кусок AST, который:
- выдирается при первичном просмотре исходного кода,
- не интерпретируется, откладывается куда-то под заданным именем,
- многократно интерпретируется в моменты вызовов,
- имеет свой дочерний контекст переменных,
- переменные берутся из родительского контекста, и аугментируются параметрами процедуры.
И чем обработка процедуры отличается от контекста, например, цикла в таком случае? Вторым пунктом из списочка выше. AST цикла никуда не откладывается, и имени не имеет. А так, в нём ровно всё то же самое.
Это означает, что нам придётся завести какое-то место, куда можно откладывать кусок AST. А также, каким-то образом запоминать имена параметров, чтобы порождать переменные, соответствующие переданным в процедуру параметрам, в дочернем контексте при её вызове. Всего лишь, и дело в шляпе.
Но сначала стоит описать формальный синтаксис процедур для парсера SQL.
create_proc
: ( K_CREATE ( S_OR K_REPLACE )? )? K_PROCEDURE func
( S_OPEN_PAR proc_param ( S_COMMA proc_param )* S_CLOSE_PAR )?
K_AS? K_BEGIN
statements
K_END K_PROCEDURE?
;
proc_param
: param
| S_AT L_IDENTIFIER
;
drop_proc
: K_DROP K_PROCEDURE func ( S_COMMA func )*
;
А также расширить оператор CALL
возможностью вызвать процедуру (которая у нас, в отличие от написанных на Java операций, исполняется в том же контексте, что и сам скрипт, и потому не требует отдельного указания INPUT
и OUTPUT
датасетов, а потребляет и создаёт их там напрямую).
call_stmt
: K_CALL func_expr operation_io
| K_CALL func_expr
;
Таким образом, синтаксис — как и подразумевающаяся семантика — заданы. Согласно им, процедуру можно создать, заменить, и даже дропнуть. А параметры у неё могут как иметь значения по умолчанию (то есть, быть опциональными), так и не иметь оных (то есть, быть обязательными). Прекрасно, осталось всё это дело поддержать в интерпретаторе.
Ну и небольшой нюанс — раз у нас процедуры SQL вызываются той же синтаксической конструкцией, что и операции Java, причём по имени, то и живут они автоматически в том же пространстве имён. Значит, не забыть, что имя при создании надо проверять на занятость.
Итак. Давайте опишем метаданные самой процедуры.
public class Procedure {
@JsonIgnore
public final TDL4.StatementsContext ctx;
public final Map<String, Param> params;
private Procedure(TDL4.StatementsContext ctx, Map<String, Param> params) {
this.ctx = ctx;
this.params = params;
}
@JsonCreator
public Procedure(Map<String, Param> params) {
this.params = params;
this.ctx = null;
}
public static Builder builder(TDL4.StatementsContext ctx) {
return new Builder(ctx);
}
public static class Builder {
private final TDL4.StatementsContext ctx;
private final Map<String, Param> params = new HashMap<>();
private Builder(TDL4.StatementsContext ctx) {
this.ctx = ctx;
}
public Builder mandatory(String name) {
params.put(name, new Param());
return this;
}
public Builder optional(String name, Object value) {
params.put(name, new Param(value));
return this;
}
public Procedure build() {
return new Procedure(ctx, params);
}
}
public static class Param {
public final boolean optional;
public final Object defaults;
@JsonCreator
public Param(boolean optional, Object defaults) {
this.optional = optional;
this.defaults = defaults;
}
private Param(Object defaults) {
this.optional = true;
this.defaults = defaults;
}
private Param() {
this.optional = false;
this.defaults = null;
}
}
}
Кажется, что кода много, но это Java. Даже если 17 версии, всё равно по-прежнему многословный язык. За то и люблю. Но тут ничего, кроме куска AST и мапы с параметрами. А теперь давайте положим их куда-нибудь, типа заведём библиотеку:
public class Library {
public final Map<String, Procedure> procedures = new HashMap<>();
}
… и подключим её в контекст интерпретатора:
public class TDL4Interpreter {
//пропущено всё сейчас неважное
private final String script;
private DataContext dataContext;
private final OptionsContext options;
private VariablesContext variables;
private final TDL4ErrorListener errorListener;
private TDL4.ScriptContext scriptContext;
private final Library library;
public TDL4Interpreter(Library library, String script, VariablesContext variables, OptionsContext options, TDL4ErrorListener errorListener) {
this.library = library;
this.script = script;
this.variables = variables;
this.options = options;
this.errorListener = errorListener;
}
//пропущено всё сейчас неважное
Таким образом, создав один раз экземпляр Library
, мы можем переиспользовать его столько раз, сколько экземпляров интерпретатора у нас вызывается (как и другие контексты, типа глобальных переменных, опций, и данных — то есть, сам Spark). Что крайне полезно при подключении каталога библиотечных скриптов, когда надо пропарсить кучу файлов, и собрать из них все процедуры.
Кстати, для этой цели стоит пропатчить обработчик командных ключей, чтобы он мог брать не один файл со скриптом, а сразу много. Что-то типа --script "s3://projects/commons/v2.0/lib/*.tdl,s3://projects/this_project/step25.tdl"
. К счастью, поддержка glob patterns в путях у нас уже есть, закопанная где-то в обёртке источника данных для классов Hadoop FileSystem, её и переиспользуем.
Выглядит это дело страшновато, конечно. Ещё один парсер в проекте, на этот раз потоковый посимвольный, да… Увы, но готовой библиотечки для разбора glob patterns на Java мне найти не удалось — по крайней мере, такой, чтобы она либо не умела слишком мало, либо не тащила с собой левые зависимости, так что пришлось писать самому (чего я терпеть не могу):
public class HadoopStorage {
public static final String PATH_PATTERN = "^([^:]+:/*[^/]+)/(.+)";
public static List<Tuple2<String, String>> pathToGroups(String inputPath) throws InvalidConfigurationException {
List<Tuple2<String, String>> ret = new ArrayList<>();
int curlyLevel = 0;
List<String> splits = new ArrayList<>();
StringBuilder current = new StringBuilder();
for (int i = 0; i < inputPath.length(); i++) {
char c = inputPath.charAt(i);
switch (c) {
case '\\': {
current.append(c).append(inputPath.charAt(++i));
break;
}
case '{': {
curlyLevel++;
current.append(c);
break;
}
case '}': {
curlyLevel--;
current.append(c);
break;
}
case ',': {
if (curlyLevel == 0) {
splits.add(current.toString());
current = new StringBuilder();
} else {
current.append(c);
}
break;
}
default: {
current.append(c);
}
}
}
splits.add(current.toString());
for (String split : splits) {
Matcher m = Pattern.compile(PATH_PATTERN).matcher(split);
if (m.matches()) {
String rootPath = m.group(1);
String path = m.group(2);
List<String> transSubs = new ArrayList<>();
int groupingSub = -1;
String sub = path;
int s = 0;
nextSub:
while (true) {
StringBuilder translatedSub = new StringBuilder();
curlyLevel = 0;
boolean inSet = false;
for (int i = 0; i < sub.length(); i++) {
char c = sub.charAt(i);
switch (c) {
case '/': {
if (!inSet && (curlyLevel == 0)) {
transSubs.add(translatedSub.toString());
if (++i != sub.length()) {
s++;
sub = sub.substring(i);
continue nextSub;
} else {
break nextSub;
}
} else {
translatedSub.append(c);
}
break;
}
case '\\': {
translatedSub.append(c);
if (++i != sub.length()) {
translatedSub.append(sub.charAt(i));
}
break;
}
case '$':
case '(':
case ')':
case '|':
case '+': {
translatedSub.append('\\').append(c);
break;
}
case '{': {
curlyLevel++;
translatedSub.append("(?:");
if (groupingSub < 0) {
groupingSub = s - 1;
}
break;
}
case '}': {
if (curlyLevel > 0) {
curlyLevel--;
translatedSub.append(")");
} else {
translatedSub.append(c);
}
break;
}
case ',': {
translatedSub.append((curlyLevel > 0) ? '|' : c);
break;
}
case '?': {
translatedSub.append('.');
if (groupingSub < 0) {
groupingSub = s - 1;
}
break;
}
case '*': {
if ((i != (sub.length() - 1)) && (sub.charAt(i + 1) == '*')) {
translatedSub.append(".*");
i++;
} else {
translatedSub.append("[^/]*");
}
if (groupingSub < 0) {
groupingSub = s - 1;
}
break;
}
case '[': {
inSet = true;
translatedSub.append(c);
if (groupingSub < 0) {
groupingSub = s - 1;
}
break;
}
case '^': {
if (inSet) {
translatedSub.append('\\');
}
translatedSub.append(c);
break;
}
case '!': {
translatedSub.append(inSet && ('[' == sub.charAt(i - 1)) ? '^' : '!');
break;
}
case ']': {
inSet = false;
translatedSub.append(c);
break;
}
default: {
translatedSub.append(c);
}
}
}
if (inSet || (curlyLevel > 0)) {
throw new InvalidConfigurationException("Glob pattern '" + split + "' contains unbalances range [] or braces {} definition");
}
if (groupingSub < 0) {
groupingSub = s;
}
transSubs.add(translatedSub.toString());
break;
}
if (s < 1) {
groupingSub = 0;
}
String groupSub = transSubs.get(groupingSub);
transSubs.remove(groupingSub);
transSubs.add(groupingSub, "(" + groupSub + ")");
rootPath += "/" + StringUtils.join(transSubs.subList(0, groupingSub), '/');
ret.add(new Tuple2<>(
rootPath + "/" + groupSub,
".*/" + StringUtils.join(transSubs.subList(groupingSub, transSubs.size()), '/') + ".*"
));
} else {
throw new InvalidConfigurationException("Glob pattern '" + split + "' must have protocol specification and its first path part must be not a grouping candidate");
}
}
return ret;
}
}
Нууууу, с подготовкой вроде всё.
Теперь сам кусок интерпретатора, который бы создавал, удалял, и — главное — исполнял процедуры, получается донельзя тривиальным.
private void createProcedure(TDL4.Create_procContext ctx) {
String procName = resolveName(ctx.func().L_IDENTIFIER());
if (Operations.OPERATIONS.containsKey(procName)) {
throw new InvalidConfigurationException("Attempt to CREATE PROCEDURE which overrides OPERATION \"" + procName + "\"");
}
if ((ctx.K_REPLACE() == null) && library.procedures.containsKey(procName)) {
throw new InvalidConfigurationException("PROCEDURE " + procName + " has already been defined. Offending definition at line " + ctx.K_CREATE().getSymbol().getLine());
}
Procedure.Builder proc = Procedure.builder(ctx.statements());
for (TDL4.Proc_paramContext procParam : ctx.proc_param()) {
if (procParam.param() == null) {
proc.mandatory(resolveName(procParam.L_IDENTIFIER()));
} else {
proc.optional(resolveName(procParam.param().L_IDENTIFIER()), Expressions.evalLoose(expression(procParam.param().attr_expr().children, ExpressionRules.LET), variables));
}
}
library.procedures.put(procName, proc.build());
}
private void dropProcedure(TDL4.Drop_procContext ctx) {
for (TDL4.FuncContext func : ctx.func()) {
String procName = resolveName(func.L_IDENTIFIER());
library.procedures.remove(procName);
}
}
private void call(TDL4.Call_stmtContext ctx) {
TDL4.Func_exprContext funcExpr = ctx.func_expr();
String verb = resolveName(funcExpr.func().L_IDENTIFIER());
Map<String, Object> params = resolveParams(funcExpr.params_expr());
if (ctx.operation_io() != null) {
if (!Operations.OPERATIONS.containsKey(verb)) {
throw new InvalidConfigurationException("CALL \"" + verb + "\"() refers to unknown Operation");
}
callOperation(verb, params, ctx.operation_io());
} else {
if (!library.procedures.containsKey(verb)) {
throw new InvalidConfigurationException("CALL \"" + verb + "\"() refers to undefined PROCEDURE");
}
callProcedure(verb, params);
}
}
private void callProcedure(String procName, Map<String, Object> params) {
if (verbose) {
System.out.println("CALLing PROCEDURE " + procName + " with params " + params + "\n");
}
Procedure proc = library.procedures.get(procName);
for (Map.Entry<String, Procedure.Param> defEntry : proc.params.entrySet()) {
String name = defEntry.getKey();
Procedure.Param paramDef = defEntry.getValue();
if (!paramDef.optional && !params.containsKey(name)) {
throw new InvalidConfigurationException("PROCEDURE " + procName + " CALL must have mandatory parameter @" + name + " set");
}
}
variables = new VariablesContext(variables);
variables.putAll(params);
for (TDL4.StatementContext stmt : proc.ctx.statement()) {
statement(stmt);
}
variables = variables.parent;
}
Как и было сказано в начале, при разборе мы тупо выдираем кусок AST, и складываем его в Library
, добавляя описание параметров. А при вызове превращаем переданные параметры в переменные дочернего контекста, и исполняем запомненный кусок AST с этим контекстом.
Что ещё?
Ну, у нас есть REPL, например, в котором есть команды \SHOW
и \DESCRIBE
. Для них нужно добавить соответствующие подкоманды \SHOW PROCEDURE
и \DESCRIBE PROCEDURE
. но это настолько тривиальная задача, что мне лень её описывать. Ну, в команду \SCRIPT
ещё надо добавить поддержку glob patterns. Но это тоже до жути тривиально, и занимает пять минут.
В полном объёме посмотреть на все описанные сегодня изменения можно в соответствующем пулл реквесте.
Таким образом, когда у тебя уже есть какой-то интерпретатор, то расширять его новыми фичами — одно удовольствие. Другое дело, чтобы написать такой интерпретатор, который легко расширяется, нужно сначала планировать язык и контексты где-то около года. Чего всем и советую.
— Тааааак, а погодите-ка! У нас есть контексты переменных, есть процедуры, в интерпретаторе есть даже пекеджы — и что нам теперь мешает запилить классы с объектами?!
— Ничего не мешает, только здравый смысл. В ETL SQL классы не нужны. Я не настолько наркоман, чтобы запилить их даже ради шутки. Но много времени подобное упражнение не займёт...
Промка: https://pastorgl.github.io/datacooker-etl
Исходники: https://github.com/PastorGL/datacooker-etl
Официальная группа в телеге: https://t.me/data_cooker_etl