В предыдущих сериях (FAQ 1 2 3 4 5 6 ) мы весьма подробно рассмотрели, как написать на Java собственный интерпретатор объектно-ориентированного диалекта SQL поверх Spark RDD API, заточенный на задачи подготовки и трансформации наборов данных.

В данной части поговорим о том, как добавить в собственный диалект SQL поддержку процедур. Например,


-- library.tdl

CREATE PROCEDURE dwellTimeByMode(@signals, @target, @outPrefix,
  @modes = ['pedestrian', 'non_pedestrian', 'car', 'bike'],
  @groupid='cell10') AS BEGIN
    LOOP $mode IN $modes BEGIN
        SELECT * FROM $signals INTO "{$signals}/{$mode}" WHERE mode=$mode;

        CALL dwellTime(@signals_userid_attr=userid,
            @target_userid_attr=userid,
            @target_grouping_attr=$groupid
        ) INPUT signals FROM "{$signals}/{$mode}", target FROM $target
        OUTPUT INTO "{$outPrefix}/{$mode}";

        ANALYZE "{$signals}/{$mode}";
        ANALYZE "{$outPrefix}/{$mode}";
    END;
END;

--- ... --- ... --- ... ---

-- script.tdl

CALL dwellTimeByMode(@signals=$this_month, @target=$population, @outPrefix=$this_month);

Нафига это надо?


Ну, допустим, у нас уже есть некоторое количество SQL ETL кода, наработанного за время эксплуатации инструмента в продакшене, и становится заметно, что значительная часть скриптов на разных проектах совпадает, и из раза в раз повторяется. Логично было бы вынести все эти совпадающие куски в библиотеку, чтобы держать в одном месте, да и вызывать с какими надо параметрами, когда надо. Вот прям как на примере выше.


Ну, если интерпретатор уже написан, то реализовать в нём процедры — это как нефиг делать. Очень простая задача.


Ведь что есть процедура с точки зрения интерпретатора?


Процедура с точки зрения интерпретатора — это кусок AST, который:
  • выдирается при первичном просмотре исходного кода,
  • не интерпретируется, откладывается куда-то под заданным именем,
  • многократно интерпретируется в моменты вызовов,
  • имеет свой дочерний контекст переменных,
  • переменные берутся из родительского контекста, и аугментируются параметрами процедуры.

И чем обработка процедуры отличается от контекста, например, цикла в таком случае? Вторым пунктом из списочка выше. AST цикла никуда не откладывается, и имени не имеет. А так, в нём ровно всё то же самое.


Это означает, что нам придётся завести какое-то место, куда можно откладывать кусок AST. А также, каким-то образом запоминать имена параметров, чтобы порождать переменные, соответствующие переданным в процедуру параметрам, в дочернем контексте при её вызове. Всего лишь, и дело в шляпе.


Но сначала стоит описать формальный синтаксис процедур для парсера SQL.


create_proc
 : ( K_CREATE ( S_OR K_REPLACE )? )? K_PROCEDURE func
     ( S_OPEN_PAR proc_param ( S_COMMA proc_param )* S_CLOSE_PAR )?
     K_AS? K_BEGIN
       statements
     K_END K_PROCEDURE?
 ;

proc_param
 : param
 | S_AT L_IDENTIFIER
 ;

drop_proc
 : K_DROP K_PROCEDURE func ( S_COMMA func )*
 ;

А также расширить оператор CALL возможностью вызвать процедуру (которая у нас, в отличие от написанных на Java операций, исполняется в том же контексте, что и сам скрипт, и потому не требует отдельного указания INPUT и OUTPUT датасетов, а потребляет и создаёт их там напрямую).


call_stmt
 : K_CALL func_expr operation_io
 | K_CALL func_expr
 ;

Таким образом, синтаксис — как и подразумевающаяся семантика — заданы. Согласно им, процедуру можно создать, заменить, и даже дропнуть. А параметры у неё могут как иметь значения по умолчанию (то есть, быть опциональными), так и не иметь оных (то есть, быть обязательными). Прекрасно, осталось всё это дело поддержать в интерпретаторе.


Ну и небольшой нюанс — раз у нас процедуры SQL вызываются той же синтаксической конструкцией, что и операции Java, причём по имени, то и живут они автоматически в том же пространстве имён. Значит, не забыть, что имя при создании надо проверять на занятость.


Итак. Давайте опишем метаданные самой процедуры.


public class Procedure {
    @JsonIgnore
    public final TDL4.StatementsContext ctx;

    public final Map<String, Param> params;

    private Procedure(TDL4.StatementsContext ctx, Map<String, Param> params) {
        this.ctx = ctx;
        this.params = params;
    }

    @JsonCreator
    public Procedure(Map<String, Param> params) {
        this.params = params;
        this.ctx = null;
    }

    public static Builder builder(TDL4.StatementsContext ctx) {
        return new Builder(ctx);
    }

    public static class Builder {
        private final TDL4.StatementsContext ctx;
        private final Map<String, Param> params = new HashMap<>();

        private Builder(TDL4.StatementsContext ctx) {
            this.ctx = ctx;
        }

        public Builder mandatory(String name) {
            params.put(name, new Param());
            return this;
        }

        public Builder optional(String name, Object value) {
            params.put(name, new Param(value));
            return this;
        }

        public Procedure build() {
            return new Procedure(ctx, params);
        }
    }

    public static class Param {
        public final boolean optional;
        public final Object defaults;

        @JsonCreator
        public Param(boolean optional, Object defaults) {
            this.optional = optional;
            this.defaults = defaults;
        }

        private Param(Object defaults) {
            this.optional = true;
            this.defaults = defaults;
        }

        private Param() {
            this.optional = false;
            this.defaults = null;
        }
    }
}

Кажется, что кода много, но это Java. Даже если 17 версии, всё равно по-прежнему многословный язык. За то и люблю. Но тут ничего, кроме куска AST и мапы с параметрами. А теперь давайте положим их куда-нибудь, типа заведём библиотеку:


public class Library {
    public final Map<String, Procedure> procedures = new HashMap<>();
}

… и подключим её в контекст интерпретатора:


public class TDL4Interpreter {
//пропущено всё сейчас неважное

    private final String script;

    private DataContext dataContext;

    private final OptionsContext options;

    private VariablesContext variables;

    private final TDL4ErrorListener errorListener;
    private TDL4.ScriptContext scriptContext;

    private final Library library;

    public TDL4Interpreter(Library library, String script, VariablesContext variables, OptionsContext options, TDL4ErrorListener errorListener) {
        this.library = library;
        this.script = script;
        this.variables = variables;
        this.options = options;
        this.errorListener = errorListener;
    }

//пропущено всё сейчас неважное

Таким образом, создав один раз экземпляр Library, мы можем переиспользовать его столько раз, сколько экземпляров интерпретатора у нас вызывается (как и другие контексты, типа глобальных переменных, опций, и данных — то есть, сам Spark). Что крайне полезно при подключении каталога библиотечных скриптов, когда надо пропарсить кучу файлов, и собрать из них все процедуры.


Кстати, для этой цели стоит пропатчить обработчик командных ключей, чтобы он мог брать не один файл со скриптом, а сразу много. Что-то типа --script "s3://projects/commons/v2.0/lib/*.tdl,s3://projects/this_project/step25.tdl". К счастью, поддержка glob patterns в путях у нас уже есть, закопанная где-то в обёртке источника данных для классов Hadoop FileSystem, её и переиспользуем.


Выглядит это дело страшновато, конечно. Ещё один парсер в проекте, на этот раз потоковый посимвольный, да… Увы, но готовой библиотечки для разбора glob patterns на Java мне найти не удалось — по крайней мере, такой, чтобы она либо не умела слишком мало, либо не тащила с собой левые зависимости, так что пришлось писать самому (чего я терпеть не могу):


Получается под 200 строк кода
public class HadoopStorage {
    public static final String PATH_PATTERN = "^([^:]+:/*[^/]+)/(.+)";

    public static List<Tuple2<String, String>> pathToGroups(String inputPath) throws InvalidConfigurationException {
        List<Tuple2<String, String>> ret = new ArrayList<>();

        int curlyLevel = 0;

        List<String> splits = new ArrayList<>();

        StringBuilder current = new StringBuilder();
        for (int i = 0; i < inputPath.length(); i++) {
            char c = inputPath.charAt(i);

            switch (c) {
                case '\\': {
                    current.append(c).append(inputPath.charAt(++i));
                    break;
                }
                case '{': {
                    curlyLevel++;
                    current.append(c);
                    break;
                }
                case '}': {
                    curlyLevel--;
                    current.append(c);
                    break;
                }
                case ',': {
                    if (curlyLevel == 0) {
                        splits.add(current.toString());
                        current = new StringBuilder();
                    } else {
                        current.append(c);
                    }
                    break;
                }
                default: {
                    current.append(c);
                }
            }
        }
        splits.add(current.toString());

        for (String split : splits) {
            Matcher m = Pattern.compile(PATH_PATTERN).matcher(split);
            if (m.matches()) {
                String rootPath = m.group(1);
                String path = m.group(2);

                List<String> transSubs = new ArrayList<>();
                int groupingSub = -1;

                String sub = path;
                int s = 0;

                nextSub:
                while (true) {
                    StringBuilder translatedSub = new StringBuilder();

                    curlyLevel = 0;
                    boolean inSet = false;
                    for (int i = 0; i < sub.length(); i++) {
                        char c = sub.charAt(i);

                        switch (c) {
                            case '/': {
                                if (!inSet && (curlyLevel == 0)) {
                                    transSubs.add(translatedSub.toString());

                                    if (++i != sub.length()) {
                                        s++;

                                        sub = sub.substring(i);
                                        continue nextSub;
                                    } else {
                                        break nextSub;
                                    }
                                } else {
                                    translatedSub.append(c);
                                }
                                break;
                            }
                            case '\\': {
                                translatedSub.append(c);
                                if (++i != sub.length()) {
                                    translatedSub.append(sub.charAt(i));
                                }
                                break;
                            }
                            case '$':
                            case '(':
                            case ')':
                            case '|':
                            case '+': {
                                translatedSub.append('\\').append(c);
                                break;
                            }
                            case '{': {
                                curlyLevel++;
                                translatedSub.append("(?:");
                                if (groupingSub < 0) {
                                    groupingSub = s - 1;
                                }
                                break;
                            }
                            case '}': {
                                if (curlyLevel > 0) {
                                    curlyLevel--;
                                    translatedSub.append(")");
                                } else {
                                    translatedSub.append(c);
                                }
                                break;
                            }
                            case ',': {
                                translatedSub.append((curlyLevel > 0) ? '|' : c);
                                break;
                            }
                            case '?': {
                                translatedSub.append('.');
                                if (groupingSub < 0) {
                                    groupingSub = s - 1;
                                }
                                break;
                            }
                            case '*': {
                                if ((i != (sub.length() - 1)) && (sub.charAt(i + 1) == '*')) {
                                    translatedSub.append(".*");
                                    i++;
                                } else {
                                    translatedSub.append("[^/]*");
                                }
                                if (groupingSub < 0) {
                                    groupingSub = s - 1;
                                }
                                break;
                            }
                            case '[': {
                                inSet = true;
                                translatedSub.append(c);
                                if (groupingSub < 0) {
                                    groupingSub = s - 1;
                                }
                                break;
                            }
                            case '^': {
                                if (inSet) {
                                    translatedSub.append('\\');
                                }
                                translatedSub.append(c);
                                break;
                            }
                            case '!': {
                                translatedSub.append(inSet && ('[' == sub.charAt(i - 1)) ? '^' : '!');
                                break;
                            }
                            case ']': {
                                inSet = false;
                                translatedSub.append(c);
                                break;
                            }
                            default: {
                                translatedSub.append(c);
                            }
                        }
                    }

                    if (inSet || (curlyLevel > 0)) {
                        throw new InvalidConfigurationException("Glob pattern '" + split + "' contains unbalances range [] or braces {} definition");
                    }

                    if (groupingSub < 0) {
                        groupingSub = s;
                    }

                    transSubs.add(translatedSub.toString());

                    break;
                }

                if (s < 1) {
                    groupingSub = 0;
                }

                String groupSub = transSubs.get(groupingSub);

                transSubs.remove(groupingSub);
                transSubs.add(groupingSub, "(" + groupSub + ")");

                rootPath += "/" + StringUtils.join(transSubs.subList(0, groupingSub), '/');
                ret.add(new Tuple2<>(
                        rootPath + "/" + groupSub,
                        ".*/" + StringUtils.join(transSubs.subList(groupingSub, transSubs.size()), '/') + ".*"
                ));
            } else {
                throw new InvalidConfigurationException("Glob pattern '" + split + "' must have protocol specification and its first path part must be not a grouping candidate");
            }
        }

        return ret;
    }
}

Нууууу, с подготовкой вроде всё.


Теперь сам кусок интерпретатора, который бы создавал, удалял, и — главное — исполнял процедуры, получается донельзя тривиальным.


    private void createProcedure(TDL4.Create_procContext ctx) {
        String procName = resolveName(ctx.func().L_IDENTIFIER());

        if (Operations.OPERATIONS.containsKey(procName)) {
            throw new InvalidConfigurationException("Attempt to CREATE PROCEDURE which overrides OPERATION \"" + procName + "\"");
        }

        if ((ctx.K_REPLACE() == null) && library.procedures.containsKey(procName)) {
            throw new InvalidConfigurationException("PROCEDURE " + procName + " has already been defined. Offending definition at line " + ctx.K_CREATE().getSymbol().getLine());
        }

        Procedure.Builder proc = Procedure.builder(ctx.statements());
        for (TDL4.Proc_paramContext procParam : ctx.proc_param()) {
            if (procParam.param() == null) {
                proc.mandatory(resolveName(procParam.L_IDENTIFIER()));
            } else {
                proc.optional(resolveName(procParam.param().L_IDENTIFIER()), Expressions.evalLoose(expression(procParam.param().attr_expr().children, ExpressionRules.LET), variables));
            }
        }
        library.procedures.put(procName, proc.build());
    }

    private void dropProcedure(TDL4.Drop_procContext ctx) {
        for (TDL4.FuncContext func : ctx.func()) {
            String procName = resolveName(func.L_IDENTIFIER());

            library.procedures.remove(procName);
        }
    }

    private void call(TDL4.Call_stmtContext ctx) {
        TDL4.Func_exprContext funcExpr = ctx.func_expr();

        String verb = resolveName(funcExpr.func().L_IDENTIFIER());
        Map<String, Object> params = resolveParams(funcExpr.params_expr());
        if (ctx.operation_io() != null) {
            if (!Operations.OPERATIONS.containsKey(verb)) {
                throw new InvalidConfigurationException("CALL \"" + verb + "\"() refers to unknown Operation");
            }

            callOperation(verb, params, ctx.operation_io());
        } else {
            if (!library.procedures.containsKey(verb)) {
                throw new InvalidConfigurationException("CALL \"" + verb + "\"() refers to undefined PROCEDURE");
            }

            callProcedure(verb, params);
        }
    }

    private void callProcedure(String procName, Map<String, Object> params) {
        if (verbose) {
            System.out.println("CALLing PROCEDURE " + procName + " with params " + params + "\n");
        }

        Procedure proc = library.procedures.get(procName);

        for (Map.Entry<String, Procedure.Param> defEntry : proc.params.entrySet()) {
            String name = defEntry.getKey();
            Procedure.Param paramDef = defEntry.getValue();

            if (!paramDef.optional && !params.containsKey(name)) {
                throw new InvalidConfigurationException("PROCEDURE " + procName + " CALL must have mandatory parameter @" + name + " set");
            }
        }

        variables = new VariablesContext(variables);
        variables.putAll(params);

        for (TDL4.StatementContext stmt : proc.ctx.statement()) {
            statement(stmt);
        }

        variables = variables.parent;
    }

Как и было сказано в начале, при разборе мы тупо выдираем кусок AST, и складываем его в Library, добавляя описание параметров. А при вызове превращаем переданные параметры в переменные дочернего контекста, и исполняем запомненный кусок AST с этим контекстом.


Что ещё?


Ну, у нас есть REPL, например, в котором есть команды \SHOW и \DESCRIBE. Для них нужно добавить соответствующие подкоманды \SHOW PROCEDURE и \DESCRIBE PROCEDURE. но это настолько тривиальная задача, что мне лень её описывать. Ну, в команду \SCRIPT ещё надо добавить поддержку glob patterns. Но это тоже до жути тривиально, и занимает пять минут.


В полном объёме посмотреть на все описанные сегодня изменения можно в соответствующем пулл реквесте.


Таким образом, когда у тебя уже есть какой-то интерпретатор, то расширять его новыми фичами — одно удовольствие. Другое дело, чтобы написать такой интерпретатор, который легко расширяется, нужно сначала планировать язык и контексты где-то около года. Чего всем и советую.

— Тааааак, а погодите-ка! У нас есть контексты переменных, есть процедуры, в интерпретаторе есть даже пекеджы — и что нам теперь мешает запилить классы с объектами?!
— Ничего не мешает, только здравый смысл. В ETL SQL классы не нужны. Я не настолько наркоман, чтобы запилить их даже ради шутки. Но много времени подобное упражнение не займёт...


Промка: https://pastorgl.github.io/datacooker-etl
Исходники: https://github.com/PastorGL/datacooker-etl
Официальная группа в телеге: https://t.me/data_cooker_etl

Комментарии (0)