Недавно меня попросили помочь в определении источников утечки трафика в одной из организаций. Задачу усугубляло большое количество устройств в одном широковещательном домене, множество неуправляемых коммутаторов, отсутствие любой карты сети, а также старенький роутер на входе. В общем, это были настоящие "Авгиевы конюшни", но в итоге задача была решена, и данная статья посвящена методам, которые я использовал. Кто оказался виновником, я раскрою в конце статьи, чтобы не портить интригу.

Мой телеграмм канал - сообщество, где делятся опытом

https://t.me/IT_Chuyana

В текущей задаче я придерживался следующих принципов:

Для определения источника аномального трафика я разбил задачу на несколько подзадач:

  1. Круглосуточный мониторинг скорости доступа к интернету в различных частях организации.

  2. Непрерывное сканирование всего трафика с целью выявления хостов, потребляющих его в наибольшем объеме.

  3. Анализ хостов, генерирующих наибольший трафик, с последующим устранением выявленных проблем.

Зачем все это нужно и как этого достичь
Зачем все это нужно и как этого достичь

Задача 1: Круглосуточный мониторинг скорости доступа к интернету в разных уголках организации

На узлы я установил пользовательские экспортеры на Python, которые запускают Speedtest CLI в заданные промежутки времени и отправляют метрики в Prometheus.

Экспортер выдает для Prometheus такие метрики, как:

  • internet_download_speed_mbps # Скорость загрузки из интернета

  • internet_upload_speed_mbps # Скорость выгрузки в интернет

  • internet_ping_ms # Задержка пинга


Код экспортера представлен ниже - speedtest_exporter.py:

Скрытый текст
# metrics prometheus
# Скорость загрузки из интернета
# avg(internet_download_speed_mbps) by (instance)
# Скорость выгрузки в интернет
# avg(internet_upload_speed_mbps) by (instance)
# Задержка пинга
# avg(internet_ping_ms) by (instance)

from flask import Flask, Response
import speedtest
import logging
import time
import threading

app = Flask(__name__)

# Настройка логирования
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# Переменные для кеша
metrics_data = {
    "download_speed": 0.0,
    "upload_speed": 0.0,
    "ping": 0.0
}
last_test_time = 0
data_lock = threading.Lock()
cache_duration = 60  # Измеряем раз в минуту

def run_speedtest():
    global metrics_data, last_test_time
    while True:
        try:
            logging.info("Запуск измерений интернет-соединения.")
            st = speedtest.Speedtest()
            st.get_best_server()
            
            download_speed = st.download() / 1e6  # Конвертируем в Мбит/с
            upload_speed = st.upload() / 1e6      # Конвертируем в Мбит/с
            ping = st.results.ping               # Пинг в миллисекундах
            
            with data_lock:
                metrics_data["download_speed"] = download_speed
                metrics_data["upload_speed"] = upload_speed
                metrics_data["ping"] = ping
                last_test_time = time.time()

            logging.info(f"Измерения завершены: Загрузка: {download_speed:.2f} Mbps, "
                         f"Выгрузка: {upload_speed:.2f} Mbps, "
                         f"Пинг: {ping:.2f} ms.")
        except Exception as e:
            logging.error(f"Ошибка при измерениях: {str(e)}")
        
        time.sleep(cache_duration)

@app.route('/metrics')
def metrics():
    with data_lock:
        response = f"""
# HELP internet_download_speed_mbps Download speed in Mbps
# TYPE internet_download_speed_mbps gauge
internet_download_speed_mbps {metrics_data["download_speed"]}
# HELP internet_upload_speed_mbps Upload speed in Mbps
# TYPE internet_upload_speed_mbps gauge
internet_upload_speed_mbps {metrics_data["upload_speed"]}
# HELP internet_ping_ms Ping latency in milliseconds
# TYPE internet_ping_ms gauge
internet_ping_ms {metrics_data["ping"]}
"""
    return Response(response, mimetype="text/plain")

if __name__ == '__main__':
    # Запускаем тест скорости в отдельном потоке
    speedtest_thread = threading.Thread(target=run_speedtest, daemon=True)
    speedtest_thread.start()
    
    # Запускаем Flask
    app.run(host='0.0.0.0', port=9101)

Задача 2: Круглосуточное сканирование всего трафика и определение адресов хостов, наиболее сильно этот трафик расходующих

Мы проводим мониторинг трафика в местах его схождения. Удобнее всего это делать на шлюзе, а в нашем случае роутере, либо настраивая зеркалирование трафика на порт, к которому подключаем анализирующее устройство. Для этих целей я использовал Orange Pi, так как эти дешевые и многофункциональные одноплатные компьютеры просты в использовании и удобны в работе. О применении Orange Pi я писал в предыдущих статьях тут и тут.

Код анализатора трафика представлен ниже - traffic_monitor.py:

Скрытый текст
import time
import logging
from logging.handlers import RotatingFileHandler
from collections import defaultdict
from scapy.all import sniff, IP
from threading import Thread
import datetime
import ipaddress 

# =======================
# Статичные переменные
# =======================

# Задержка между логированиями в секундах (по умолчанию 5 минут)
DELAY_SECONDS = 60  # 300 секунд = 5 минут

# Список префиксов сетевых адресов для широковещательного трафика
NETWORKS = ['192.168.2.0/24', '192.168.10.0/24']  # Добавьте необходимые префиксы
# Вычисляем широковещательные адреса для каждой сети
NETWORK_BROADCAST_ADDRS = [str(ipaddress.IPv4Network(net).broadcast_address) for net in NETWORKS]
# Широковещательный адрес по умолчанию
BROADCAST_IP = '255.255.255.255'

# Хранение статистики трафика
traffic_stats = defaultdict(int)
broadcast_traffic_stats = defaultdict(int)

# =======================
# Настройка логирования с ротацией
# =======================

LOG_FILENAME = 'top_ip_traffic.log'

# Создаем обработчик ротации логов
rotating_handler = RotatingFileHandler(
    LOG_FILENAME,      # Имя файла
    maxBytes=5 * 1024 * 1024,  # Максимальный размер файла (5 MB)
    backupCount=3      # Максимальное количество резервных копий (3 файла)
)

# Настраиваем формат логов
rotating_handler.setFormatter(logging.Formatter('%(message)s'))

# Получаем корневой логгер и устанавливаем ему обработчик ротации
logger = logging.getLogger()
logger.setLevel(logging.INFO)
logger.addHandler(rotating_handler)


def packet_callback(packet):
    """Callback функция для обработки пакетов."""
    if IP in packet:
        src_ip = packet[IP].src
        dst_ip = packet[IP].dst
        packet_length = len(packet)

        # Проверяем, является ли адрес назначения широковещательным
        is_broadcast = False
        if dst_ip == BROADCAST_IP or dst_ip in NETWORK_BROADCAST_ADDRS:
            is_broadcast = True
        # else:
        #     for prefix in NETWORK_PREFIXES:
        #         if dst_ip.startswith(prefix):
        #             is_broadcast = True
        #             break

        if is_broadcast:
            # Увеличиваем трафик для источника и назначения для широковещательного трафика
            broadcast_traffic_stats[src_ip] += packet_length
            broadcast_traffic_stats[dst_ip] += packet_length
        else:
            # Увеличиваем трафик для обычного трафика
            traffic_stats[src_ip] += packet_length
            traffic_stats[dst_ip] += packet_length

def log_top_ips():
    """Логирует IP-адреса с наибольшим трафиком."""
    while True:
        time.sleep(DELAY_SECONDS)
        try:
            # Получаем текущую метку времени
            now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')

            # Определяем топ IP для обычного трафика
            top_ips = sorted(traffic_stats.items(), key=lambda x: x[1], reverse=True)[:10]
            # Определяем топ IP для широковещательного трафика
            top_broadcast_ips = sorted(broadcast_traffic_stats.items(), key=lambda x: x[1], reverse=True)[:10]

            # Логируем информацию
            logging.info("." * 37)
            logging.info(f"  Топ 10 IP на {now}")
            logging.info("            Общий трафик ")
            # Добавляем заголовки таблицы
            logging.info(f"{'IP-адрес':<20} {'Байты':>15}")
            logging.info("-" * 35)
            for ip, bytes_used in top_ips:
                logging.info(f"{ip:<20} {bytes_used:>15}")

            logging.info("      Широковещательный трафик ")
            logging.info(f"{'IP-адрес':<20} {'Байты':>15}")
            logging.info("-" * 35)
            for ip, bytes_used in top_broadcast_ips:
                logging.info(f"{ip:<20} {bytes_used:>15}")
            logging.info("." * 37)

            # Очищаем статистику обоих типов трафика
            traffic_stats.clear()
            broadcast_traffic_stats.clear()
        except Exception as e:
            logging.error(f"Произошла ошибка при логировании: {e}")

def main():
    """Основная функция для захвата пакетов и логирования трафика."""
    try:
        # Запускаем поток для логирования
        logger_thread = Thread(target=log_top_ips, daemon=True)
        logger_thread.start()

        # Запускаем захват пакетов с оптимизированным фильтром
        # Фильтр захватывает только IP-пакеты для повышения эффективности
        sniff(iface=None, prn=packet_callback, store=0, filter="ip")

    except KeyboardInterrupt:
        logging.info("Завершение работы программы пользователем...")
    except Exception as e:
        logging.error(f"Произошла ошибка в основной функции: {e}")

# Запуск программы
if __name__ == "__main__":
    main()

Созданием виртуального окружения, установкой зависимостей и настройкой служб для указанных выше скриптов займется Ansible. Плейбук, который я применял, приведен ниже: ansible_speedtest.yml.

Скрытый текст
---
- name: Установка Python и настройка Speedtest Exporter
  hosts: localhost
  tasks:
    - name: Обновление списка пакетов
      apt:
        update_cache: true
      become: true

    - name: Установка необходимых пакетов
      apt:
        name:
          - curl
          - python3-venv
          - python3.11-dev
          - libffi-dev
          - libssl-dev
          - nmap
        state: present
      become: true

    - name: Проверка существования виртуального окружения
      stat:
        path: "/etc/apt/sources.list.d/ookla_speedtest-cli.list"
      register: speedtest_repo

    - name: Добавление репозитория speedtest-cli
      shell: curl -s https://packagecloud.io/install/repositories/ookla/speedtest-cli/script.deb.sh | bash
      args:
        executable: /bin/bash
      become: true
      when: not speedtest_repo.stat.exists

    - name: Установка speedtest
      apt:
        name: speedtest
        state: present
      become: true

    - name: Проверка существования виртуального окружения
      stat:
        path: "{{ playbook_dir }}/.venv"
      register: venv_directory

    - name: Создание виртуального окружения
      command: python3 -m venv .venv
      args:
        chdir: "{{ playbook_dir }}"
      when: not venv_directory.stat.exists

    - name: Обновить pip и setuptools в виртуальном окружении
      pip:
        name:
          - pip
          - setuptools
        state: latest
        virtualenv: "{{ playbook_dir }}/.venv"

    - name: Установка зависимостей из requirements.txt
      pip:
        requirements: "{{ playbook_dir }}/requirements.txt"
        virtualenv: "{{ playbook_dir }}/.venv"
    
    # службы
    # speedtest exporter
    - name: Создание файла службы для speedtest exporter
      become: true
      copy:
        dest: /etc/systemd/system/speedtest_exporter.service
        content: |
          [Unit]
          Description=Служба Speedtest Exporter
          After=network.target

          [Service]
          User={{ ansible_env.USER }}
          WorkingDirectory={{ playbook_dir }}
          ExecStart={{ playbook_dir }}/.venv/bin/python3 {{ playbook_dir }}/speedtest_exporter.py
          Restart=always
          Environment=PYTHONUNBUFFERED=1

          [Install]
          WantedBy=multi-user.target

    - name: Перезагрузка системных служб
      become: true
      command: systemctl daemon-reload

    - name: Запуск службы speedtest_exporter
      become: true
      systemd:
        name: speedtest_exporter
        state: started
        enabled: yes

    ## traffic monitor
    - name: Создание файла службы для traffic monitor (привилегированный режим)
      become: true
      copy:
        dest: /etc/systemd/system/traffic_monitor.service
        content: |
          [Unit]
          Description=Служба Traffic Monitor
          After=network.target

          [Service]
          User=root
          WorkingDirectory={{ playbook_dir }}
          ExecStart={{ playbook_dir }}/.venv/bin/python3 {{ playbook_dir }}/traffic_monitor.py
          Restart=always
          RestartSec=5s
          TimeoutSec=30
          StandardOutput=journal
          StandardError=journal
          Environment=PYTHONUNBUFFERED=1

          [Install]
          WantedBy=multi-user.target

    - name: Перезагрузка системных служб
      become: true
      command: systemctl daemon-reload

    - name: Запуск службы traffic_monitor
      become: true
      systemd:
        name: traffic_monitor
        state: restarted
        enabled: yes

Задача 3: Изучение хостов, генерирующего наибольших трафик, устранение проблем

Далее подключаем Grafana к Prometheus и изучаем периоды падения трафика на примере тестовой среды.

Среда тестовая, на одном их хостов запущено обновление зеркала репозиториев, для иллюстрации падения общего трафика
Среда тестовая, на одном их хостов запущено обновление зеркала репозиториев, для иллюстрации падения общего трафика

Также Alertmanager позволяет настроить предупреждения по пороговым значениям метрик. В моем случае, при падении трафика ниже допустимого, приходит оповещение, например, в телеграмм.

Теперь в моменты падения графика мы изучаем лог нашего анализатора трафика и определяем ip адреса узлов, наиболее активно использующих трафик:

На первом месте узел, потребляющий больше всего трафика
На первом месте узел, потребляющий больше всего трафика

После того, как подозрительный ip найден, мы должны определиться, что мы хотим узнать о нем:

Из инструментария, мы используем утилиту nmap, с помощью которой определим местоположение, выполним обычное, а затем агрессивное сканирование. Напишем нужный скрипт на python:

Скрытый текст
import sys
import socket
import requests
from pythonping import ping
import re
import nmap
import datetime
import time
import subprocess

def validate_ip(ip):
    pattern = re.compile(r"^(?:(?:25[0-5]|2[0-4]\d|[01]?\d\d?)\.){3}(?:25[0-5]|2[0-4]\d|[01]?\d\d?)$")
    return pattern.match(ip) is not None

def reverse_dns(ip):
    try:
        return socket.gethostbyaddr(ip)[0]
    except socket.herror:
        return "Не удалось найти доменное имя (reverse DNS lookup)."
      
def check_ping(ip):
    """
    Проверяет доступность хоста по IP адресу с помощью команды ping.

    :param ip: IP-адрес хоста
    :return: Среднее время отклика в миллисекундах (float) если доступен, иначе False
    """
    try:
        # Выполняем команду ping с 1 пакетом
        output = subprocess.check_output(
            ["ping", "-c", "1", "-W", "1", ip],
            stderr=subprocess.STDOUT,
            universal_newlines=True
        )
        # Парсим вывод ping для получения среднего времени отклика
        for line in output.splitlines():
            if "time=" in line:
                # Пример строки: "64 bytes from 8.8.8.8: icmp_seq=1 ttl=117 time=14.2 ms"
                time_ms = float(line.split("time=")[1].split(" ")[0])
                return time_ms
        return False
    except subprocess.CalledProcessError:
        return False

def wait_for_host(ip, timeout=3600, interval=60):
    """
    Ждет, пока хост станет доступен или истечет таймаут.

    :param ip: IP-адрес хоста
    :param timeout: Таймаут в секундах (по умолчанию 1 час)
    :param interval: Интервал между проверками в секундах (по умолчанию 60 секунд)
    :return: True если хост стал доступен, False если таймаут истек
    """
    start_time = datetime.datetime.now()
    end_time = start_time + datetime.timedelta(seconds=timeout)
    while datetime.datetime.now() < end_time:
        ping_result = check_ping(ip)
        if isinstance(ping_result, float):
            print(f"Хост {ip} доступен. Среднее время отклика: {ping_result} ms")
            return True
        else:
            remaining = end_time - datetime.datetime.now()
            remaining_seconds = int(remaining.total_seconds())
            hours, remainder = divmod(remaining_seconds, 3600)
            minutes, seconds = divmod(remainder, 60)
            print(f"Хост {ip} недоступен. Ожидание... Осталось времени: {hours}ч {minutes}мин {seconds}с")
            time.sleep(interval)
    print(f"Таймаут ожидания хоста {ip} истек после {timeout / 3600} часа(ов).")
    return False

def geolocation(ip):
    try:
        response = requests.get(
            f"http://ip-api.com/json/{ip}?fields=status,country,regionName,city,zip,lat,lon,timezone,isp,org,as",
            timeout=10
        )
        data = response.json()
        if data.get('status') == 'success':
            return data
        else:
            return {"error": "Не удалось получить геолокацию."}
    except Exception as e:
        return {"error": str(e)}

def print_geolocation(geo):
    if 'error' in geo:
        print(f"Геолокация: {geo['error']}")
    else:
        print("Геолокация:")
        print(f"  Страна: {geo.get('country')}")
        print(f"  Регион: {geo.get('regionName')}")
        print(f"  Город: {geo.get('city')}")
        print(f"  Почтовый индекс: {geo.get('zip')}")
        print(f"  Широта: {geo.get('lat')}")
        print(f"  Долгота: {geo.get('lon')}")
        print(f"  Часовой пояс: {geo.get('timezone')}")
        print(f"  Провайдер: {geo.get('isp')}")
        print(f"  Организация: {geo.get('org')}")
        print(f"  Автономная система: {geo.get('as')}")


def scan_ip(ip):
    """
    Сканирует указанный IP-адрес и выводит результаты сканирования.
    
    Параметры:
    ip (str): IP-адрес для сканирования.
    """
    scanner = nmap.PortScanner()
    try:
        # Проведение сканирования с опциями:
        # -sS : TCP SYN scan
        # -sV : Определение версий сервисов
        # -O   : Определение операционной системы
        scanner.scan(ip, arguments='-sS -sV -O')

        result = {}
        host_info = scanner[ip]
        
        # Инициализация
        result['hostname'] = "Не определено"
        result['mac_address'] = "Не доступен"
        result['open_ports'] = []
        result['services'] = {}
        result['os'] = "Не определена"

        # Получение имени хоста
        if 'hostnames' in host_info and host_info['hostnames']:
            hostname = host_info['hostnames'][0]['name']
            result['hostname'] = hostname if hostname else "Не определено"

        # Получение MAC-адреса
        if 'addresses' in host_info:
            if 'mac' in host_info['addresses']:
                result['mac_address'] = host_info['addresses']['mac']

        # Обработка TCP-портов
        if 'tcp' in host_info:
            for port in host_info['tcp']:
                state = host_info['tcp'][port]['state']
                if state == 'open':
                    result['open_ports'].append(port)
                    service_name = host_info['tcp'][port]['name']
                    service_product = host_info['tcp'][port].get('product', 'Неизвестно')
                    service_version = host_info['tcp'][port].get('version', 'Неизвестно')
                    result['services'][port] = f"{service_name} ({service_product} {service_version})"

        # Обработка UDP-портов (опционально)
        if 'udp' in host_info:
            for port in host_info['udp']:
                state = host_info['udp'][port]['state']
                if state == 'open':
                    result['open_ports'].append(port)
                    service_name = host_info['udp'][port]['name']
                    service_product = host_info['udp'][port].get('product', 'Неизвестно')
                    service_version = host_info['udp'][port].get('version', 'Неизвестно')
                    result['services'][port] = f"{service_name} ({service_product} {service_version})"

        # Определение операционной системы
        if 'osmatch' in host_info and host_info['osmatch']:
            result['os'] = host_info['osmatch'][0]['name']

        # Вывод результатов
        print(f"Результаты сканирования для {ip}:")
        print(f"  Имя хоста: {result['hostname']}")
        print(f"  MAC-адрес: {result['mac_address']}")
        print(f"  Операционная система: {result['os']}")
        
        if result["open_ports"]:
        # Сортировка портов для удобства
            open_ports_sorted = sorted(result["open_ports"])
            print("  Открытые порты:", ", ".join(map(str, open_ports_sorted)))
            print("  Детали сервисов:")
            for port in open_ports_sorted:
                service_info = result["services"].get(port, "Информация недоступна")
                print(f"    Порт {port}: {service_info}")
        else:
            print("  Открытых портов не обнаружено.")

    except Exception as e:
        print(f"Результаты сканирования для {ip}:")
        print(f"  Ошибка: {str(e)}")

        
def aggressive_scan(ip):
    """
    Выполняет "агрессивное" сканирование ip с помощью Nmap.
    
    :param ip: IP-адрес (или диапазон), который необходимо просканировать.
    :return: Словарь с информацией об узле:
        {
            'hostname': str (имя хоста или None),
            'mac_address': str (MAC-адрес или None),
            'open_ports': список открытых портов,
            'services': словарь {порт: {протокол, сервис, продукт, версия}},
            'os': список возможных ОС (с их баллами совпадения) или None
        }
    """
    scanner = nmap.PortScanner()
    
    # Аргументы:
    #   -A  — агрессивное сканирование (включает проверку версий и ОС)
    #   -T4 — скорость сканирования (T4 обычно достаточно быстрая)
    #   1-65535 — диапазон всех портов (можно сузить при необходимости)
    scanner.scan(hosts=ip, ports="1-65535", arguments="-A -T4")

    results = {
        'hostname': None,
        'mac_address': None,
        'open_ports': [],
        'services': {},
        'os': None
    }

    if ip not in scanner.all_hosts():
        return results  # Хост не найден или не ответил
    
    host_info = scanner[ip]

    # Получаем имя хоста (если удалось определить)
    hostname = host_info.hostname()
    if hostname:
        results['hostname'] = hostname
    
    # MAC-адрес (если удалось определить)
    if 'addresses' in host_info and 'mac' in host_info['addresses']:
        results['mac_address'] = host_info['addresses']['mac']

    # Определение ОС
    if 'osmatch' in host_info:
        results['os'] = [
            {
                'name': os_item['name'],
                'accuracy': os_item['accuracy'],
                'os_family': os_item.get('osclass', [{}])[0].get('osfamily')
            }
            for os_item in host_info['osmatch']
        ]
    
    # Список протоколов (tcp, udp и т.д.)
    for proto in host_info.all_protocols():
        ports = host_info[proto].keys()
        for port in ports:
            port_state = host_info[proto][port]['state']
            if port_state == 'open':
                results['open_ports'].append(port)
                
                # Информация о сервисе
                service_name = host_info[proto][port].get('name', '')
                service_product = host_info[proto][port].get('product', '')
                service_version = host_info[proto][port].get('version', '')
                
                results['services'][port] = {
                    'protocol': proto,
                    'service': service_name,
                    'product': service_product,
                    'version': service_version
                }

    return results


def print_scan_results(scan_data):
    """
    Печатает результаты агрессивного сканирования в консоль.
    """
    print("\nРезультаты агрессивного сканирования:")
    if not scan_data['hostname'] and not scan_data['open_ports']:
        print("Хост не ответил или не найден.")
        return

    print(f"Имя хоста: {scan_data['hostname']}")
    print(f"MAC-адрес: {scan_data['mac_address']}")
    
    if scan_data['os']:
        print("Возможные ОС:")
        for os_info in scan_data['os']:
            print(f"  - {os_info['name']} (точность: {os_info['accuracy']}%), семейство: {os_info['os_family']}")
    else:
        print("Не удалось определить ОС.")

    if scan_data['open_ports']:
        print("\nОткрытые порты и сервисы:")
        for port in sorted(scan_data['open_ports']):
            service_info = scan_data['services'][port]
            print(f"  Порт {port}/{service_info['protocol']}: {service_info['service']} "
                  f"(продукт: {service_info['product']}, версия: {service_info['version']})")
    else:
        print("Открытых портов не найдено.")
   

        
def main():
    if len(sys.argv) != 2:
        print("Использование: python py_ip_monitor.py <IP-адрес>")
        sys.exit(1)
    
    ip = sys.argv[1]
    
    if not validate_ip(ip):
        print("Некорректный IP-адрес.")
        sys.exit(1)
    
    print(f"\nИнформация для IP: {ip}\n{'='*40}")
    
    # Обратный DNS
    dns = reverse_dns(ip)
    print(f"Обратный DNS: {dns}")
    
    # Геолокация
    geo = geolocation(ip)
    print_geolocation(geo)
    
    # Ping
    ping_result = check_ping(ip)
    if isinstance(ping_result, float):
        print(f"Среднее время отклика (ping): {ping_result} ms")
    else:
        print(f"Ping: {ping_result}")
    
    # Сканирование портов
    scan_ip(ip)
    
    # 4. Расширенное сканирование портов (aggressive_scan + вывод)
    scan_result = aggressive_scan(ip)
    print_scan_results(scan_result)

if __name__ == "__main__":
    main()

Запустим сканирование и получим следующие данные:

mac-адрес

тип операционной системы

открытые порты

Все это позволит нам понять какие сервисы запущены на узле и определить его предназначение.

Если доступ к аномальным узлам есть, и мы можем на них авторизоваться, то изучаем запущенные процессы и ищем те, что активно используют сетевое соединение. Например c помощью утилиты nethogs.

Итог

Что теперь с этим делать? Зависит от ваших задач.

Если узел расходует трафик который не должен расходовать, то проще всего заблокировать MAC-адрес на роутере, добавив соответствующее правило, или внести необходимые изменения в файрвол (при его наличии).

Также изучая arp таблицы можно найти местоположение устройства.

В моем же случае виновником был смарт-телевизор, применение которого весьма чувствительно для сети, поставляемой через VPN. Хозяин устройства был найден, а телевизор изъят.

В данной статье мы рассмотрели простые способы создания пользовательских экспортеров для prometheus, способы мониторинга трафика в сети, а также методы быстрого развертывания наших скриптов на узлах с применением системы управления конфигурациями Ansible.

Всем спасибо за внимание!

Комментарии (10)


  1. PereslavlFoto
    31.12.2024 00:33

    виновником был смарт-телевизор

    Человек принёс из дома телевизор и сам повесил его на стену в цеху?


    1. andrey_chuyan Автор
      31.12.2024 00:33

      Да, в комнату. Обычно у них телевизоры ставят с коаксиальным кабелем, поэтому на этот факт изначально никто не обратил внимания.


  1. peacemakerv
    31.12.2024 00:33

    Так это был какой-то паразитный трафик телека (тогда модель и марку - просьба, в студию) или киношки тунеядца ?


    1. andrey_chuyan Автор
      31.12.2024 00:33

      Второе, телевизор работал фоном.


  1. sharptop
    31.12.2024 00:33

    Интересно, почему не стали использовать типичный сценарий, когда трафик с роутера снимается через netflow и потом обрабатывается коллекторами? Как вариант - связка ELK для хранения, сбора/обработки и отображения.


    1. andrey_chuyan Автор
      31.12.2024 00:33

      Проблема, что в качестве роутера выступал старенький D-Link, поэтому зеркалирование трафика было единственным быстрым решением. А ELK это хорошо, лишним точно не будет.


      1. HardWrMan
        31.12.2024 00:33

        В далёком 2005м у меня был D-Link DSL-524T:

        Уже тогда на форуме D-Link была доступна альтернативная прошивка, которая умела вот так:

        И это без очевидных датчиков/счётчиков, которые там 100% доступны через SNMP, знаю потому что сам активно пользовался. Поэтому присоединяюсь к недоумению sharptop.


        1. andrey_chuyan Автор
          31.12.2024 00:33

          тогда определенно стоит посмотреть, может ли это текущая прошивка, либо альтернативная. Благодарю.

          Если этот вариант сработает, тогда децентрализованные экспортеры будут нужны для проверки качества сети на различных участках, и, как следствие, поиска "узких мест", где трафик замедляется из-за некачественного сетевого оборудования.


    1. ejik_kg
      31.12.2024 00:33

      Как вариант если роутер тупой.

      Ставим в разрыв Комп с 2-мя сетевухами с линуксом в бридже снимаем трафик(fprobe) и отправляем на анализ по нетфлоу в софт типа netflow analyzer. Там и графики есть и статистика и всякое всякое и по портам по адресам и вообще как захочешь.


      1. HardWrMan
        31.12.2024 00:33

        Как вариант если роутер тупой.

        А можно услышать ваши критерии тупости роутера?