Всем привет, случалось такое, что вам надо поставить кучу агентов битрикса на крон, а потом сидеть и разбираться - сколько они отрабатывают, отрабатывают ли вообще, когда падают или зависают?

Ну конечно случалось. Так вот, чтобы получить визуальное представление о том, что там происходит, было принято решение, вынести агенты даже не на крон, а на apache airflow. Поведаю вам, как это было реализовано.

Агенты

По факту, агенты - это выполнение php команд по расписанию. Но мы пойдем дальше и выделим их в отдельную сущность, в классы с расширением Agents.php

Создадим интерфейс для агентов

<?php

namespace App\Infrastructure\Contracts\Cli\Agents;

interface AgentInterface
{
    /**
     * @var string Возвращаемое значение для дагов в Airflow в случае успешного исполнения.
     */
    const AIRFLOW_DONE = 'DONE FOR AIRFLOW';

    /**
     * Метод выполняет код на агентах
     * @return string Возвращает строку вызова текущего метода.
     */
    public static function execute(string $param = ''): string;

    /**
     * Метод для запуска из консоли.
     * @param string $param
     * @return string
     */
    public static function console(string $param = ''): string;
}

Далее создаем общий родитель для всех агентов

<?php

namespace App\Infrastructure\Cli\Agents;

use App\Infrastructure\Contracts\Cli\Agents\AgentInterface;

class BaseAgent implements AgentInterface
{

    /**
     * @inheritDoc
     */
    public static function execute(string $param = ''): string
    {
        return static::execute($param);
    }

    /**
     * @inheritDoc
     */
    public static function console(string $param = ''): string
    {
        $return = static::execute($param);
        echo self::AIRFLOW_DONE; // Тут добавляем вывод, если вызов прошел успешно
        return $return;
    }
}

Ну и пример самого агента

<?php

namespace App\Infrastructure\Cli\Agents;

use App\Infrastructure\Contracts\Cli\Agents\AgentInterface;
use App\Infrastructure\DTO\Cli\Agents\MethodDTO;
use App\Infrastructure\Schemas\Logger\Critical\CatchSchema;
use Bitrix\Main\Loader;
use Bitrix\Main\LoaderException;
use Exception;

class TestImportAgent extends BaseAgent implements AgentInterface
{
    /**
     * @param string $time
     * @return string
     * @throws Exception
     */
    public static function execute(string $param = ''): string
    {
        try {
            if (Loader::includeModule("...")) {
                // Сам запрос
                (new TestImport())->moveToStage();
            }
        } catch (LoaderException|Exception $e) {
            LoggerFacade::elk()->critical('Move import error', (new CatchSchema($e))->toArray());
        }

        return 'It`s done, dude!';
    }
}

Что имеем: когда мы вызываем из агентов в админ-панели запись TestImportAgent::execute(), то нам выводится сообщение 'It`s done, dude!'
а если мы вызываем из терминала
TestImportAgent::console(), то нам возвращается
'It`s done, dude!
DONE FOR AIRFLOW'
и это важно, чтобы AIRFLOW мог понять, что все выполнено корректно.

Консоль: Битрикс внедрили себе красивенькую, рабочую консольку, и можно было бы даже с ней поиграться, если бы она не была жестко завязана на модули, и не стояла устаревшая версия symfony/console

Для того, чтобы нам иметь свою красивую консольку, установим ее отдельно

Добавим local/console.php

<?php
$_SERVER["DOCUMENT_ROOT"] = realpath(dirname(__FILE__) . "/../../../..");
require_once($_SERVER["DOCUMENT_ROOT"] . '/bitrix/modules/main/cli/bootstrap.php');
require_once($_SERVER["DOCUMENT_ROOT"] . '/bitrix/vendor/autoload.php');

use Symfony\Component\Console\Application;

$obApplication = new Application();

$iterator = new RecursiveIteratorIterator(new RecursiveDirectoryIterator($_SERVER["DOCUMENT_ROOT"] . '/local/'));
$regex = new RegexIterator($iterator, '/^.+Command\.php$/i', RegexIterator::GET_MATCH);

function my_autoloader($class): void
{
    include $class;
}

foreach ($regex as $file => $value) {
    my_autoloader($file);
}

spl_autoload_register('my_autoloader');

foreach (get_declared_classes() as $item) {
    if (is_subclass_of($item, '\Symfony\Component\Console\Command\Command')) {
        $obApplication->add(new $item);
    }
}

try {
    $obApplication->run();
} catch (Exception $e) {
}

Что мы тут делаем:
1 Итеративно проходим по всем директориям и ищем файлы, которые заканчиваются на *Command.php

2 Берем найденные файлы, и если, классы этих файлов наследуют '\Symfony\Component\Console\Command\Command', то регистрируем их в симфони консоли.

3 Далее файлы с маской Command можно использовать также как в обычном symfony/console

Ну и пишем команду, которая находит, по аналогии, все файлы с маской *Agent.php, наследующие базовый аент, и выполняет <AgentName>Agent::console($params);

<?php

namespace App\Infrastructure\Cli\Airflow;

use App\Shared\Services\Management\ManagementService;
use Bitrix\Main\Loader;
use Bitrix\Main\LoaderException;
use Bitrix\Main\Localization\Loc;
use Exception;
use Symfony\Component\Console;
use Symfony\Component\Console\Attribute\AsCommand;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;

#[AsCommand(name: 'agents:run')]
class RunAgentsCommand extends Console\Command\Command
{
    /**
     * Configures the current command.
     * @return void
     */
    protected function configure(): void
    {
        $this
            // the command description shown when running "php bin/console list"
            ->setDescription('Запустить агент"')
            // the command help shown when running the command with the "--help" option
            ->setHelp('Запустить агент по имени класса')
            ->addArgument('className', InputArgument::OPTIONAL, 'Имя класса агента' )
        ;
    }

    /**
     * Executes the current command.
     * @param InputInterface $input Console input steam.
     * @param OutputInterface $output Console output steam.
     * @return int
     */
    protected function execute(
        Console\Input\InputInterface $input,
        Console\Output\OutputInterface $output
    ): int
    {
        try {
            if (!Loader::includeModule(moduleName: "...")) {
                throw new \RuntimeException(message:
                    Loc::getMessage(code: '..._CRM_MODULE_NOT_INSTALLED')
                );
            }

            $className = $input->getArgument('className');
          // тут вынес сканирование по маске в отдельный метод
            $arItems = ManagementService::create()->filesystem()->modules()->psr()->actions()->registerSplByMask('Agent');

            foreach ($arItems as $item) {
                if (is_subclass_of($item, 'App\Infrastructure\Cli\Agents\BaseAgent')) {
                    if(strstr($item, $className)){
                        $findClass = $item;
                    }
                }
            }
            if(!empty($findClass)) {
              // тут идет вызов метода
                (new $findClass)->console();
            } else {
                throw new \InvalidArgumentException($className . ' is not exists');
            }

        } catch (LoaderException|Exception $e) {
            $output->writeln($e->getMessage());
            return Command::FAILURE;
        }

        return Command::SUCCESS;
    }
}

Теперь нам доступен вызов любого агента, с помощью консоли

cd ./local && php ./console.php agents:run TestImportAgent

Вывод в эйрфлоу

Устанавливаем систему

Знакомство с Apache Airflow: установка и запуск первого DAGа
Привет! Меня зовут Алексей Карпов, я прикладной администратор (MLOps) отдела сопровождения моделей м...
habr.com

Далее мы просто пишем даг для эйрфлоу

"""Запустить агент тестовый"""
import datetime
import pendulum
from airflow import DAG
from functions import create_task_ssh_command, dag_success_callback, container_root, php_command

agent_name = "TestAgent"

dag = DAG(
    dag_id=agent_name,
    schedule="0 */1 * * *",
    start_date=pendulum.datetime(2024, 11, 15, tz="Europe/Moscow"),
    catchup=False,
    doc_md=__doc__,
    max_active_runs=1,
    dagrun_timeout=datetime.timedelta(minutes=6),
    on_success_callback=dag_success_callback,
    tags=[ 'agent', 'test'],
)

test_dag = create_task_ssh_command(dag, agent_name, f"""
cd /path/ && docker compose exec -u user_name php sh -c \"{php_command} -f {container_root}/local/console.php agents:run {agent_name} 2>&1 | tee /var/log/airflow/{agent_name}.log\"
""")

if __name__ == "__main__":
    dag.run()
Скрытый текст

Ну и в functions.py пишем обработчик для результатов выполнения

from __future__ import annotations
from airflow.exceptions import AirflowFailException

from pprint import pprint
import logging
import datetime
from airflow.providers.ssh.hooks.ssh import SSHHook
from airflow.operators.python import PythonOperator
from airflow.models import Variable

tz = datetime.timezone(datetime.timedelta(hours=3))
container_root = Variable.get("container_root")
php_command = Variable.get("php_command")

sshHook = SSHHook(ssh_conn_id='...', banner_timeout=60.0)
output = ''
DONE_STRING = 'DONE FOR AIRFLOW'
FLAG_IS_EMPTY = 'EMPTY'
FLAG_IS_DONE = 'DONE'
FLAG_IS_NOT_EMPTY = 'NOT_EMPTY'

def create_task_ssh_command(dag, task_id, task_command, task_success_callback=None):
    return PythonOperator(
        task_id=task_id,
        op_kwargs={"command": task_command},
        python_callable=task_method,
        # on_success_callback=task_success_callback,
        dag=dag
    )


def task_method(**kwargs):
    run_ssh_command(kwargs['command'])

def dag_success_callback(context):
    # logging.info("pprint(vars(context)) dag_success_callback")
    # logging.info(pprint(vars(context)))
	#     send_to_telegram(f"""
	#         \ud83d\ude00 Задача выполнена успешно.
	#         ID группы задач: {context.get('dag').safe_dag_id}
	#         Название группы задач: {context.get('dag').doc_md}
	#         Время запуска: {context.get('dag_run').start_date.astimezone(tz)}
	#         Время завершения: {context.get('dag_run').end_date.astimezone(tz)}
	#         Тип запуска: {context.get('dag_run').run_type}
	#         Время исполнения: {context.get('dag_run').end_date - context.get('dag_run').start_date}
	#     """)
	return True

def run_ssh_command(command):
    ssh_client = None
    try:
        ssh_client = sshHook.get_conn()
        ssh_client.load_system_host_keys()
        stdin, stdout, stderr = ssh_client.exec_command(command)
        return_result = stdout.read().decode('utf-8')
        logging.info(pprint(return_result))
        check_output(return_result)
        return return_result
    finally:
        if ssh_client:
            ssh_client.close()


def check_output(return_result_arg):
    flag = check_done_output(return_result_arg)
    if flag == FLAG_IS_NOT_EMPTY:
        raise AirflowFailException('Неожиданный ответ команды в терминале: '+"\n"+(output))
    elif flag == FLAG_IS_EMPTY:
        raise AirflowFailException('Пустой ответ команды в терминале')


def check_line(arg_line):
    return arg_line.strip() == DONE_STRING


def check_done_output(arg_output):
    is_empty = True
    global output
    for line in arg_output.splitlines():
        str = line.strip();
        if check_line(str):
            return FLAG_IS_DONE
        elif str != '':
            is_empty = False
            output += str + "\n"
    flag = FLAG_IS_EMPTY if is_empty else FLAG_IS_NOT_EMPTY
    return flag

И наслаждаемся полученной картинкой

Комментарии (2)


  1. fo_otman
    20.12.2024 06:32

    Годнота, кто минусы ставит? Подскажи плиз, как ты подсвечиваешь кусочки кода, как на вон том скриншоте с composer.json?


    1. DVZakusilo Автор
      20.12.2024 06:32

      joxy использую для скриншотов и там есть затемнение области.

      Минусы ставят - это норма, выкладываю статьи для того, чтобы профи могли посоветовать - как сделать более хардкорно, но с балансом, так чтобы и джуны и мидлы понимали - как работает. Как рук разработки мне важны не супер технологии а такие, которые легко подойдут для всех. Вот то, что минусы без комментариев, это удручает))