
Привет, Хабр! Это Антон Дятлов, инженер по защите информации в Selectel. В одной из предыдущих статей я рассказывал, как настроить скрипт, который через API «Сканер-ВС 6» запустит сканирование, создаст отчеты и отправит уведомление в Telegram. Мы научились запускать сканер по расписанию через cron, импортировать IP-адреса и подсети, получать отчеты об уязвимостях. Такой подход хорошо работал на небольших объемах, но в реальных задачах — особенно в инфраструктурах с десятками и сотнями хостов — быстро всплыли ограничения: скрипт требовал ручного контроля на многих этапах, переход между стадиями (сетевой скан → скан уязвимостей → отчет) приходилось отслеживать вручную, проявилась ошибка в сканере, которая не позволяла корректно удалять ассеты.
В этой статье разберемся во второй версии скрипта — с переосмысленным пайплайном, расширенной поддержкой входных данных (IP и подсети), минимизацией ручных действий и автоматическим контролем всех этапов, а также удалением ассетов через SQL.
Требования к коду
Для начала рассмотрим, что должно уметь решение.
Регулярно сканировать множество хостов и подсетей.
Объединять адреса по группам с помощью имен и ID, чтобы оптимизировать сканирование и экономить ресурсы.
Работать в фоне: от запуска по cron до получения отчетов, без вмешательства со стороны человека.
Как это выглядело в первой версии
Исходный формат входных данных:
[
{"id": "1", "name": "zone1", "ip": "10.1.1.1"},
{"id": "2", "name": "zone1", "ip": "10.1.1.2"}
]
Базовый цикл работы:
for client in clients:
asset_id = create_asset(client['ip'])
scan_id = add_netscan_task(client['ip'])
run_task(scan_id)
while not is_completed(scan_id):
time.sleep(3600)
vulnscan_id = add_vulnscan_task(asset_id)
run_task(vulnscan_id)
while not is_completed(vulnscan_id):
time.sleep(3600)
report_id = add_report(asset_id)
Однако тут видим несколько нюансов.
Нельзя обрабатывать IP (один адрес) и net (подсети) одновременно. Это затрудняет гибкость кода: нельзя сканировать подсети, приходится находить активные адреса и записывать их.
Переход между этапами сканирования завязан на cron и требует расчета времени, за которое просканируются адреса. Если не вычислить, за какое время просканируются конкретные адреса, есть риск перезапуска кода после завершения процесса. Это чревато лишними ручными действиями — придется удалять задачи и активы вручную.
Cron запускает пайплайн по расписанию — без проверки, завершился ли предыдущий этап, и с риском запуститься «в холостую» — когда не все задачи были завершены, а количество запусков cron ограничено.
Что еще не устраивало
Переходы между стадиями: если какой-то этап задержался, следующий откладывается и просто ждет следующего запуска cron.
Нет контроля состояния: пайплайн может «застрять», не досканировать или повторно запустить то, что уже просканировано.
Нет самовосстановления: если скрипт/сервер перезапустили, нужно начинать все заново или вручную перебирать промежуточные файлы и разбираться, где он остановился.

Игровой сервер с криперами и порталом в Незер. Добывайте ресурсы, стройте объекты, исследуйте мир Selectel в Minecraft и получайте призы.
Архитектура v2.0
Ключевые идеи новой версии — универсальность, надежность и автоматизация.
Автоматическое определение типа входных данных (IP/net), динамическое формирование всех адресов.
Группировка по name, чтобы на каждого клиента, отдел или внутреннюю структуру запускалось по одному netscan, vulnscan и отчету.
Строгая стадийность: netscan → vulnscan → report. Все управление — с помощью внутреннего флага и JSON-файла, этапы идут друг за другом через at, а не cron.
Один JSON-файл хранит все: ID, прогресс, статусы. После завершения все возвращается к исходному виду.
Флаг done и защита от повторных запусков: cron может запускать скрипт хоть каждый час, но лишней работы не будет.
Возможность жесткой очистки ресурсов SQL-командой.
Пример кода v2.0 — главный цикл
Одно из ключевых преимуществ обновленной версии — гибкий запуск через cron. Скрипт можно запускать сколько угодно раз, но цикл выполнится только единожды в месяц — пока не будут созданы отчеты. Это избавляет от дублирования задач и активов, а также позволяет не следить вручную за количеством запусков:
не нужно «высчитывать», сколько раз запускать скрипт;
нет дублирующихся задач и сканов;
весь контроль происходит автоматически на уровне кода.
def main():
original_data = load_json(client_list_file)
yyyymm = get_current_yyyymm()
# 1. Если есть флаг done — ничего не делаем
if isinstance(original_data, dict) and original_data.get("done") and original_data.get("month") == yyyymm:
print("Pipeline уже завершен в этом месяце. Повторный запуск не требуется.")
return
# 2. Стадия (netscan/vulnscan/report) и восстановление состояния
if isinstance(original_data, dict) and "stage" in original_data:
stage = original_data["stage"]
orig_clients = original_data.get("original_clients", [])
clients = original_data.get("clients", [])
else:
stage = "netscan"
orig_clients = list(original_data)
clients = list(original_data)
# 3. Группировка по name, авторизация, запуск
groups = group_clients_by_name(clients)
auth_cookies = authorization()
base_update(auth_cookies)
Обработка сетей и IP
В версии 1.0 скрипт умел обрабатывать только конкретные IP-адреса, а подсети игнорировались:
for client in clients:
if "ip" in client:
# Работать с адресом
# Если net — не обрабатывали
В версии 2.0 добавлена поддержка подсетей (net). Скрипт пингует все адреса в заданной сети и выбирает только «живые»:
def get_alive_ips(network):
net = ipaddress.ip_network(network, strict=False)
alive = []
for ip in net.hosts():
ip_str = str(ip)
try:
result = subprocess.run(['ping', '-c', '1', '-W', '1', ip_str],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
if result.returncode == 0:
alive.append(ip_str)
except Exception:
continue
return alive
Затем все клиенты группируются по имени (name). В результате формируются группы с адресами и идентификаторами:
def group_clients_by_name(clients):
groups = {}
for client in clients:
name = client["name"]
if name not in groups:
groups[name] = {"name": name, "ips": [], "ids": [], "assets": []}
if "ip" in client:
groups[name]["ips"].append(client["ip"])
groups[name]["ids"].append(client["id"])
elif "net" in client:
alive_ips = get_alive_ips(client["net"])
groups[name]["ips"].extend(alive_ips)
groups[name]["ids"].append(client["id"])
return groups
Если в клиентском списке указана подсеть (net), скрипт сам разбивает ее на IP, определяет, какие адреса доступны, и связывает их с конкретным ID клиента — по сути, каждому активному IP из подсети назначается тот же идентификатор, что и у источника сети. Это позволяет точно отслеживать, какие адреса относятся к какому клиенту.
Группировка по name
v1.0: каждая задача сканирования запускалась для одного IP. Один адрес — одна задача. Это быстро приводило к спаму задачами: сканер мог перегружаться, а интерфейс начинал тормозить. Кроме того, это было просто неудобно — особенно для больших клиентов с несколькими адресами: вместо одной задачи рождались десятки.
v2.0: все адреса, относящиеся к одному клиенту или внутреннему отделу, объединяются в одну задачу. Это упрощает управление, снижает нагрузку на сканер, а также ускоряет выполнение сканов:
for name, group in groups.items():
if not group.get("id_netscan"):
task_id = add_netscan_task(auth_cookies, name, group["ips"])
if task_id:
group["id_netscan"] = task_id
Итог: меньше задач, проще контролировать и быстрее обрабатывать.
Переход между этапами
v1.0: тесная связка с cron, а ожидание — по расписанию. Если netscan закончился быстро — допустим, за 10 минут, то следующий этап придется ожидать еще шесть часов.
В v2.0 каждый этап по завершению сам «назначает» запуск следующего:
def schedule_next_run(delay_seconds=120):
cmd = f'echo "python3 {os.path.abspath(__file__)} --clientlist {client_list_file}" | at now + {delay_seconds//60} minutes'
subprocess.run(cmd, shell=True)
Итог: скорость работы максимальная, время простоя между стадиями — минимальное.
Флаг done и защита от лишних запусков
v1.0: можно нечаянно перезапустить пайплайн и получить дублирование.
v2.0: в файл после завершения записывается флаг на этот месяц. Все последующие cron-запуски ничего не делают до следующего месяца:
{"done": true, "month": "202507"}
Итог: защита от повторов, не нужно проверять вручную.
Автоматическая чистка ресурсов
v1.0: API-удаление ресурсов иногда не срабатывает.
v2.0: в конце пайплайна вызывается SQL-скрипт, который удаляет все «зависшие» ресурсы:
def force_delete_all_assets_psql():
cmd = [
"psql",
"postgres://scannervs:scannervs@localhost:5435/scanner-asset",
"-c",
"DELETE FROM scanner.assets"
]
subprocess.run(cmd, ...)
Итог: база не захламляется, ресурсы не висят «грузом».
Реальный сценарий запуска: cron + at
Пример cron для «первой среды месяца», четырех стартов в сутки (каждые шесть часов) в течение трех дней подряд:
0 1,9,15,21 2,3,4 7 * python3 /way/to/script.py --clientlist /way/to/clientlist.json >> /way/to/scan1.log 2>&1
Если пайплайн занимает около 20 часов, cron может запустить его хоть 12 раз — но результат будет лишь один. Каждый этап запускает следующий через at
.
Итоги и преимущества v2.0
Минимум ручных действий: один JSON и cron — все, дальше пайплайн работает сам. Если у вас больше одного JSON, можно настроить их обработку поочередно.
Есть поддержка любого сценария: IP и сети. Это позволяет производить сканирование как точечно, так и массово.
Восстановление после сбоев. Все стадии хранятся в JSON, запуск продолжается с нужного места. Не страшно, если ВМ со сканером упадет — прогресс сохранится.
Оптимизация. Никакого дублирования задач и отчетов, ручная работа сведена к минимуму.
Максимальная скорость. Следующий этап стартует сразу после завершения предыдущего.
Гибкость. Интеграция с внешними системами, уведомления приходят в Telegram.
Ниже — небольшая шпаргалка для интеграции своего пайплайна.
1. Добавьте свой список JSON в следующем формате:
[
{"id": "1", "name": "имя сети 1", "net": "X.X.X.X/X"},
{"id": "2", "name": "имя сети 2", "ip": "Y.Y.Y.Y"}
]
2. Пропишите cron с нужной частотой — обычно сканирование занимает от 6 до 48 часов в зависимости от размера подсети или количества открытых портов для IP-адресов.
3. Добавьте уведомления о завершении. Это поможет всегда быть в курсе готовности сканирований. Уведомление можно отправлять Telegram, чтобы оперативно выгружать отчеты и приступать к принятию мер по устранению уязвимостей.
Почему стоит мигрировать на новую версию
Ресурсы используются эффективнее.
Нет рисков пропустить скан или получить дубли.
Код стал проще, модульнее и надежнее.
Автоматизация процесса сканирования — вам нужно только все настроить и выгружать отчеты из сканера.
В будущем изучу возможность выгрузки отчетов в Telegram или почту, для еще более простого использования сканера. Если у вас есть опыт в этом направлении — делитесь в комментариях, будет интересно обсудить!