Практически каждому разработчику, рано или поздно, приходится разбираться с возникающими ошибками в своем, вроде бы идеальном, коде. Казалось, все ходы просчитаны и ошибок быть не может, но не тут‑то было. Причин может быть масса, от логической ошибки в связанном функционале, до независящего от разработчика обновления стороннего софта или железа. Чтобы обезопасить свой программный продукт, например от потенциальных ошибок, вытекающих из новых доработок, код покрывается контрольными тестами.

Со временем, в разрабатываемой/сопровождаемой системе, количество тестов увеличивается => этап автоматической проверки качества требует все больше драгоценного времени, тем самым замедляет процесс доставки ценности до конечного потребителя.

Когда мыслей по оптимизации производительности тестов и тестируемого кода больше не осталось, остается вариант параллельного тестирования.

Вдохновленный заинтересованностью коллеги — @alisichkin идеей ускорения конвейера CI/CD, набросал вариант параллельного запуска unit‑тестов на базе utPLSQL.

Давайте разберем по порядку, что и как у нас получилось.

В первую очередь, нам потребуется подготовить фундамент для параллельного запуска — хранимую процедуру вида:

-- Процедура параллельного запуска скрипта p_sql_task
procedure parallel_execute
(
    p_sql_task          in varchar2,                -- Скрипт задачи
    p_task_name         in varchar2,                -- Название задания
    p_parallel_level    in number default 16        -- Количество сессий выполнения задачи
)
is
    v_try               number := 0;
    v_cnt_chunk         number;
    v_sql_chunks        varchar2(4000 char);
begin
    -- Завершаем задания прежнего запуска (если они были)
    for r in
    (
        select
            t.task_name
        from
            user_parallel_execute_tasks t
        where
            t.task_name = p_task_name
    )
    loop
        dbms_parallel_execute.drop_task(task_name => p_task_name);
    end loop;
    -- Запрос определения количества частей задачи - отдельных сессий на сервере.
    v_sql_chunks := '
        select
            level               start_id,
            count(*) over ()    end_id
        from
            dual
        connect by
            level <= ' || p_parallel_level;
    -- создаем задачу
    dbms_parallel_execute.create_task
    (
        task_name => p_task_name
    );
    -- Выделяем части
    dbms_parallel_execute.create_chunks_by_sql
    (
        task_name   => p_task_name,
        sql_stmt    => v_sql_chunks,
        by_rowid    => false
    );
    -- Проверяем наличие выделенных порций
    select
        count(*)
    into
        v_cnt_chunk
    from
        user_parallel_execute_chunks ec
    where
        ec.status = upper('unassigned');
    -- Если не удалось выделить части, логируем/вызываем ошибку/просто выходим, т.е. выполняем требуемое по ситуации действие
    if dbms_parallel_execute.task_status(p_task_name) != dbms_parallel_execute.chunking and v_cnt_chunk = 0 then
        -- логируем (процедура для примера)
        log_message
        (
            p_proc_name     => 'parallel_execute',
            p_log_message   => 'could not get chunks ' || chr(10) || 'SQL TASK:' || chr(10) || p_sql_task
        );
    else
        -- запускаем задачу
        dbms_parallel_execute.run_task
        (
            task_name       => p_task_name,
            sql_stmt        => p_sql_task,
            language_flag   => dbms_sql.native,
            parallel_level  => p_parallel_level
        );
        -- ждем завершения задачи или превышения кол-ва попыток запуска
        loop
            exit when
                dbms_parallel_execute.task_status(p_task_name) = dbms_parallel_execute.finished
                or v_try > 3;      -- Три попытки
            dbms_parallel_execute.resume_task(p_task_name);
            v_try := v_try + 1;
        end loop;
    end if;
    -- Удаляем задачу
    dbms_parallel_execute.drop_task(task_name => p_task_name);
exception
    when others then
        log_message
        (
            p_proc_name     => 'parallel_execute',
            p_log_message   => sqlerrm || chr(10) || dbms_utility.format_error_backtrace || chr(10) || 'SQL TASK:' || chr(10) || p_sql_task
        );
end parallel_execute;

Стоит отметить, что по документации Oracle количество chunk‑ов задачи должно формироваться результатом запроса, переданным в параметре sql_stmt, сам запрос должен обязательно содержать поля start_id, end_id, каждая запись результата выполнения, запроса идентифицирует конкретную часть разбиваемой задачи. В данном варианте, количество частей всегда будет выделяться равным значению p_parallel_level, если нет иного ограничения в настройках схемы СУБД (job_queue_processes).

Далее нам следует продумать — где и как будем агрегировать результат выполнения тестов (можно реализовать обмен между сессиями через DBMS_PIPE, либо сохранить в специально‑созданной для этого таблице). Выбрал вариант с таблицей, т.к. появится возможность дополнительного ретроспективного анализа и последующего ребаланса тестов.

create table utp_test_results
(
    task_name           varchar2(120 char),
    chunk_num           number,
    start_date          date default sysdate,
    end_date            date,
    success             number(1),
    test_result         clob,
    --
    constraint utp_test_results_pk primary key (task_name, chunk_num) using index tablespace tblspc_index
);
comment on table  utp_test_results                is 'Таблица результатов запуска UTP-тестов';
comment on column utp_test_results.task_name ....

Следующим шагом создаем процедуру запуска конкретной части задачи:

/***************************************************/
/* Процедура запуска задачи UTP тестирования */
procedure run_utp_task
(
    p_chunk_num         in number,        -- Номер части   <=> :start_id
    p_chunk_count       in number,        -- Кол-во частей <=> : end_id
    p_task_name         in varchar2       -- Название заздачи
)
is
    v_pack_list         utp.ut_varchar2_list;
    v_error             varchar2(4000 char);
    -- Процедура чтения output-а
    procedure save_output
    is
        l_buffer    dbms_output.chararr;
        l_num_lines pls_integer;
        l_clob      clob;
    begin
        l_num_lines := 4000;
        dbms_output.get_lines(l_buffer, l_num_lines);
        for i in 1..l_buffer.count loop
            if l_buffer(i) like '%test%' then
                l_clob := l_clob || l_buffer(i) || chr(10);
            end if;
        end loop;
        -- Фиксируем результат работы chunk-а
        update
            utp_test_results t
        set
            t.test_result = l_clob,
            t.end_date = sysdate,
            t.success = 1
        where
            t.task_name = p_task_name
            and t.chunk_num = p_chunk_num;
        commit;
    end save_output;
begin
    -- Регистрация в таблице результатов
    update
        utp_test_results
    set
        start_date = sysdate,
        success = null,
        test_result = null
    where
        task_name = p_task_name
        and chunk_num = p_chunk_num;
    --
    if sql%rowcount = 0 then
        insert
        into
            utp_test_results
            (task_name, chunk_num)
        values
        (
            p_task_name,
            p_chunk_num
        );
    end if;
    commit;
    -- читаем список всех имеющихся тестовых пакетов
    select
        pack_name
    bulk collect into
        v_pack_list
    from
        (
            select
                name as pack_name,
                row_number() over (order by name) pack_num,   -- Сортировка обязательна, порядок должен быть согласован для всех частей задачи
                count(*) over () count_total
            from
                user_source
            where
                text like '%\%suite%' escape '\'              -- В тестируемом объекте обязательно должен быть указан таг %suite - без него фреймворк UTP тестировать объект не будет
                and name like 'TST%'                          -- Тестовый объект начинается с TST_
                and name <> 'TST_UTILS'                       -- Исключаем пакет запуска тестов
            group by
                name
        ) s
    where
        p_chunk_num = mod(s.pack_num, p_chunk_count) + 1;     -- Фильтр для выбора части тестовых пакетов
    -- Старт тестов
    utp.ut.run
    (
        a_paths => v_pack_list,
        a_coverage_schemes => utp.ut_varchar2_list('<schema_name>'), -- <schema_name> - название тестируемой схемы в СУБД
        a_reporter => utp.ut_junit_reporter
    );
    -- UTP пишет в output, считываем его и сохраняем в свою таблицу
    save_output;
exception
    when others then
        log_message
        (
            p_log_message   => sqlerrm,
            p_proc_name     => 'run_utp_task'
        );
        v_error := sqlerrm;
        update
            utp_test_results
        set
            success = 0,
            test_result = v_error
        where
            task_name = p_task_name
            and chunk_num = p_chunk_num;
        commit;
    raise;
end run_utp_task;

Осталось добавить финальную хранимку для параллельного запуска тестов:

/********************************/
/* Процедура запуска UTP тестов */
procedure run_utp
(
    p_parallel_level    in number default 16
)
is
    v_task_name         varchar2(120 char) := 'task_utp_parallel_' || userenv('sessionid');
    v_clob              clob;
    -- Выдод clob в output
    procedure output_clob
    (
        p_length        in number,
        p_start_index   in number default 1
    )
    is
        v_buffer        varchar2(4000 char);   -- размер буфера максимально для нашей версии Oracle = 32000, берем излюбленное значение в 4000
        v_end_index     number;
    begin
        if p_start_index < p_length then
            v_buffer := dbms_lob.substr(v_clob, 4000, p_start_index);
            v_end_index := length(v_buffer);
            if v_end_index < 4000 then
                dbms_output.put_line(v_buffer);
            else
                v_end_index := instr(v_buffer, '>', -1);                -- Бьем по закрывающему символу
                dbms_output.put_line(substr(v_buffer, 1, v_end_index));
                output_clob(p_length, p_start_index + v_end_index);
            end if;
        end if;
    end;
begin
    parallel_execute
    (
        p_sql_task   => '
        begin
            run_utp_task
            (
                p_chunk_num     => :start_id,
                p_chunk_count   => :end_id,
                p_task_name     => ''' || v_task_name || '''
            );
        end;',
        p_task_name  => v_task_name,
        p_parallel_level => p_parallel_level
    );
    -- Формируем итоговый XML
    select
        xmlelement
        (
            "testsuites",
            xmlattributes
            (
                sum(tests) "tests",
                sum(disabled) "disabled",
                sum(errors) "errors",
                sum(failures) "failures",
                sum(time) "time"
            ),
            xmlagg(suite)
        ).getclobval()
    into
        v_clob
    from
        (
            select
                case
                    when test_result is not null then
                        xmltype(test_result) 
                end xml_result
            from 
                amt_utp_test_results t 
            where 
                t.task_name = v_task_name
        ) s,
        xmltable
        (
            'testsuites'
            passing s.xml_result
            columns
                suite       xmltype             path './testsuite',
                tests       number              path '@tests',
                disabled    number              path '@disabled',
                errors      number              path '@errors',
                failures    number              path '@failures',
                name        varchar2(2000 char) path '@name',
                time        number              path '@time'
        ) t;
    -- Выводим в output
    output_clob(dbms_lob.getlength(v_clob));
end run_utp;

На этом все, цель достигнута.

Процедура run_utp стартует задачу параллельного запуска тестов, ожидает его завершения, после чего собирает результаты в единый XML, который далее разбирается JUnit‑парсером.

Пример запуска в 16 сессий:

call tst_utils.run_utp(16);

P. S. результаты:

Время работы в 16 сессий по холодным данным составило 362 секунды ~6 минут

По таблице результатов можем проанализировать, что в большей степени «тормозит» наш процесс и сгруппировать «тяжелые» тесты с «легкими», либо увеличить уровень параллельности:

Таблица результатов
Таблица результатов

Из результатов видим, что итоговое время тестирования определилось самым долгоиграющим тестом (набором тестов).

Совокупное время проверки 84 тестов в рамках одной сессии составило бы 1569.5 секунд, т. е. примерно 26 минут:

Результаты из output
Результаты из output

Фактически, получаем выигрыш по времени в 20 минут с одного тестирования. Если учесть необходимость запуска тестов по каждой задаче несколько раз (у нас их 3), то это уже 60 минут, в дополнение умножаем полученное время на кол‑во разработчиков и радуемся очередной победе )...

Здоровая критика и обратная связь приветствуется:).

Спасибо за внимание.

Комментарии (3)


  1. Glays
    00.00.0000 00:00

    Так я и не осилил dbms_parallel_execute, кажется легче просто делать пачку джобов

    BEGIN 
    FOR i IN 0 .. 9 
    LOOP 
    DBMS_SCHEDULER.create_job (job_name   => 'j_jobname_' || i...

    Но если понадобится ожидать завершения всех и правда удобнее в таком варианте, спасибо за идею с connect by.


    1. AICode Автор
      00.00.0000 00:00

      Берите на вооружение, код рабочий и довольно простой)


  1. UsArin
    00.00.0000 00:00

    Так же смотрел в сторону dbms_parallel_execute и для сохранения логов подламывал пакеты UTP, но ваш способ dbms_output.get_lines элегантнее. Но в итоге не взлетело, остались на стеке Jenkins+Maven+Java+TestNG+Allure. Там и параллельность быстрее и результаты тестов нагляднее. Как вы обрабатываете результаты тестов?