Всем привет, случалось такое, что вам надо поставить кучу агентов битрикса на крон, а потом сидеть и разбираться - сколько они отрабатывают, отрабатывают ли вообще, когда падают или зависают?
Ну конечно случалось. Так вот, чтобы получить визуальное представление о том, что там происходит, было принято решение, вынести агенты даже не на крон, а на apache airflow. Поведаю вам, как это было реализовано.
Агенты
По факту, агенты - это выполнение php команд по расписанию. Но мы пойдем дальше и выделим их в отдельную сущность, в классы с расширением Agents.php
Создадим интерфейс для агентов
<?php
namespace App\Infrastructure\Contracts\Cli\Agents;
interface AgentInterface
{
/**
* @var string Возвращаемое значение для дагов в Airflow в случае успешного исполнения.
*/
const AIRFLOW_DONE = 'DONE FOR AIRFLOW';
/**
* Метод выполняет код на агентах
* @return string Возвращает строку вызова текущего метода.
*/
public static function execute(string $param = ''): string;
/**
* Метод для запуска из консоли.
* @param string $param
* @return string
*/
public static function console(string $param = ''): string;
}
Далее создаем общий родитель для всех агентов
<?php
namespace App\Infrastructure\Cli\Agents;
use App\Infrastructure\Contracts\Cli\Agents\AgentInterface;
class BaseAgent implements AgentInterface
{
/**
* @inheritDoc
*/
public static function execute(string $param = ''): string
{
return static::execute($param);
}
/**
* @inheritDoc
*/
public static function console(string $param = ''): string
{
$return = static::execute($param);
echo self::AIRFLOW_DONE; // Тут добавляем вывод, если вызов прошел успешно
return $return;
}
}
Ну и пример самого агента
<?php
namespace App\Infrastructure\Cli\Agents;
use App\Infrastructure\Contracts\Cli\Agents\AgentInterface;
use App\Infrastructure\DTO\Cli\Agents\MethodDTO;
use App\Infrastructure\Schemas\Logger\Critical\CatchSchema;
use Bitrix\Main\Loader;
use Bitrix\Main\LoaderException;
use Exception;
class TestImportAgent extends BaseAgent implements AgentInterface
{
/**
* @param string $time
* @return string
* @throws Exception
*/
public static function execute(string $param = ''): string
{
try {
if (Loader::includeModule("...")) {
// Сам запрос
(new TestImport())->moveToStage();
}
} catch (LoaderException|Exception $e) {
LoggerFacade::elk()->critical('Move import error', (new CatchSchema($e))->toArray());
}
return 'It`s done, dude!';
}
}
Что имеем: когда мы вызываем из агентов в админ-панели запись TestImportAgent::execute()
, то нам выводится сообщение 'It`s done, dude!'
а если мы вызываем из терминала TestImportAgent::console(),
то нам возвращается
'It`s done, dude!
DONE FOR AIRFLOW'
и это важно, чтобы AIRFLOW мог понять, что все выполнено корректно.
Консоль: Битрикс внедрили себе красивенькую, рабочую консольку, и можно было бы даже с ней поиграться, если бы она не была жестко завязана на модули, и не стояла устаревшая версия symfony/console
Для того, чтобы нам иметь свою красивую консольку, установим ее отдельно
Добавим local/console.php
<?php
$_SERVER["DOCUMENT_ROOT"] = realpath(dirname(__FILE__) . "/../../../..");
require_once($_SERVER["DOCUMENT_ROOT"] . '/bitrix/modules/main/cli/bootstrap.php');
require_once($_SERVER["DOCUMENT_ROOT"] . '/bitrix/vendor/autoload.php');
use Symfony\Component\Console\Application;
$obApplication = new Application();
$iterator = new RecursiveIteratorIterator(new RecursiveDirectoryIterator($_SERVER["DOCUMENT_ROOT"] . '/local/'));
$regex = new RegexIterator($iterator, '/^.+Command\.php$/i', RegexIterator::GET_MATCH);
function my_autoloader($class): void
{
include $class;
}
foreach ($regex as $file => $value) {
my_autoloader($file);
}
spl_autoload_register('my_autoloader');
foreach (get_declared_classes() as $item) {
if (is_subclass_of($item, '\Symfony\Component\Console\Command\Command')) {
$obApplication->add(new $item);
}
}
try {
$obApplication->run();
} catch (Exception $e) {
}
Что мы тут делаем:
1 Итеративно проходим по всем директориям и ищем файлы, которые заканчиваются на *Command.php
2 Берем найденные файлы, и если, классы этих файлов наследуют '\Symfony\Component\Console\Command\Command'
, то регистрируем их в симфони консоли.
3 Далее файлы с маской Command можно использовать также как в обычном symfony/console
Ну и пишем команду, которая находит, по аналогии, все файлы с маской *Agent.php, наследующие базовый аент, и выполняет <AgentName>Agent::console($params);
<?php
namespace App\Infrastructure\Cli\Airflow;
use App\Shared\Services\Management\ManagementService;
use Bitrix\Main\Loader;
use Bitrix\Main\LoaderException;
use Bitrix\Main\Localization\Loc;
use Exception;
use Symfony\Component\Console;
use Symfony\Component\Console\Attribute\AsCommand;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
#[AsCommand(name: 'agents:run')]
class RunAgentsCommand extends Console\Command\Command
{
/**
* Configures the current command.
* @return void
*/
protected function configure(): void
{
$this
// the command description shown when running "php bin/console list"
->setDescription('Запустить агент"')
// the command help shown when running the command with the "--help" option
->setHelp('Запустить агент по имени класса')
->addArgument('className', InputArgument::OPTIONAL, 'Имя класса агента' )
;
}
/**
* Executes the current command.
* @param InputInterface $input Console input steam.
* @param OutputInterface $output Console output steam.
* @return int
*/
protected function execute(
Console\Input\InputInterface $input,
Console\Output\OutputInterface $output
): int
{
try {
if (!Loader::includeModule(moduleName: "...")) {
throw new \RuntimeException(message:
Loc::getMessage(code: '..._CRM_MODULE_NOT_INSTALLED')
);
}
$className = $input->getArgument('className');
// тут вынес сканирование по маске в отдельный метод
$arItems = ManagementService::create()->filesystem()->modules()->psr()->actions()->registerSplByMask('Agent');
foreach ($arItems as $item) {
if (is_subclass_of($item, 'App\Infrastructure\Cli\Agents\BaseAgent')) {
if(strstr($item, $className)){
$findClass = $item;
}
}
}
if(!empty($findClass)) {
// тут идет вызов метода
(new $findClass)->console();
} else {
throw new \InvalidArgumentException($className . ' is not exists');
}
} catch (LoaderException|Exception $e) {
$output->writeln($e->getMessage());
return Command::FAILURE;
}
return Command::SUCCESS;
}
}
Теперь нам доступен вызов любого агента, с помощью консоли
cd ./local && php ./console.php agents:run TestImportAgent
Вывод в эйрфлоу
Далее мы просто пишем даг для эйрфлоу
"""Запустить агент тестовый"""
import datetime
import pendulum
from airflow import DAG
from functions import create_task_ssh_command, dag_success_callback, container_root, php_command
agent_name = "TestAgent"
dag = DAG(
dag_id=agent_name,
schedule="0 */1 * * *",
start_date=pendulum.datetime(2024, 11, 15, tz="Europe/Moscow"),
catchup=False,
doc_md=__doc__,
max_active_runs=1,
dagrun_timeout=datetime.timedelta(minutes=6),
on_success_callback=dag_success_callback,
tags=[ 'agent', 'test'],
)
test_dag = create_task_ssh_command(dag, agent_name, f"""
cd /path/ && docker compose exec -u user_name php sh -c \"{php_command} -f {container_root}/local/console.php agents:run {agent_name} 2>&1 | tee /var/log/airflow/{agent_name}.log\"
""")
if __name__ == "__main__":
dag.run()
Скрытый текст
Ну и в functions.py пишем обработчик для результатов выполнения
from __future__ import annotations
from airflow.exceptions import AirflowFailException
from pprint import pprint
import logging
import datetime
from airflow.providers.ssh.hooks.ssh import SSHHook
from airflow.operators.python import PythonOperator
from airflow.models import Variable
tz = datetime.timezone(datetime.timedelta(hours=3))
container_root = Variable.get("container_root")
php_command = Variable.get("php_command")
sshHook = SSHHook(ssh_conn_id='...', banner_timeout=60.0)
output = ''
DONE_STRING = 'DONE FOR AIRFLOW'
FLAG_IS_EMPTY = 'EMPTY'
FLAG_IS_DONE = 'DONE'
FLAG_IS_NOT_EMPTY = 'NOT_EMPTY'
def create_task_ssh_command(dag, task_id, task_command, task_success_callback=None):
return PythonOperator(
task_id=task_id,
op_kwargs={"command": task_command},
python_callable=task_method,
# on_success_callback=task_success_callback,
dag=dag
)
def task_method(**kwargs):
run_ssh_command(kwargs['command'])
def dag_success_callback(context):
# logging.info("pprint(vars(context)) dag_success_callback")
# logging.info(pprint(vars(context)))
# send_to_telegram(f"""
# \ud83d\ude00 Задача выполнена успешно.
# ID группы задач: {context.get('dag').safe_dag_id}
# Название группы задач: {context.get('dag').doc_md}
# Время запуска: {context.get('dag_run').start_date.astimezone(tz)}
# Время завершения: {context.get('dag_run').end_date.astimezone(tz)}
# Тип запуска: {context.get('dag_run').run_type}
# Время исполнения: {context.get('dag_run').end_date - context.get('dag_run').start_date}
# """)
return True
def run_ssh_command(command):
ssh_client = None
try:
ssh_client = sshHook.get_conn()
ssh_client.load_system_host_keys()
stdin, stdout, stderr = ssh_client.exec_command(command)
return_result = stdout.read().decode('utf-8')
logging.info(pprint(return_result))
check_output(return_result)
return return_result
finally:
if ssh_client:
ssh_client.close()
def check_output(return_result_arg):
flag = check_done_output(return_result_arg)
if flag == FLAG_IS_NOT_EMPTY:
raise AirflowFailException('Неожиданный ответ команды в терминале: '+"\n"+(output))
elif flag == FLAG_IS_EMPTY:
raise AirflowFailException('Пустой ответ команды в терминале')
def check_line(arg_line):
return arg_line.strip() == DONE_STRING
def check_done_output(arg_output):
is_empty = True
global output
for line in arg_output.splitlines():
str = line.strip();
if check_line(str):
return FLAG_IS_DONE
elif str != '':
is_empty = False
output += str + "\n"
flag = FLAG_IS_EMPTY if is_empty else FLAG_IS_NOT_EMPTY
return flag
И наслаждаемся полученной картинкой
fo_otman
Годнота, кто минусы ставит? Подскажи плиз, как ты подсвечиваешь кусочки кода, как на вон том скриншоте с composer.json?
DVZakusilo Автор
joxy использую для скриншотов и там есть затемнение области.
Минусы ставят - это норма, выкладываю статьи для того, чтобы профи могли посоветовать - как сделать более хардкорно, но с балансом, так чтобы и джуны и мидлы понимали - как работает. Как рук разработки мне важны не супер технологии а такие, которые легко подойдут для всех. Вот то, что минусы без комментариев, это удручает))