Кадр из к/ф «Грязь»
Кадр из к/ф «Грязь»

Представьте, что вам нужно внедрить файрвол в инфраструктуре предприятия N — крупной компании с сотнями филиалов, несколькими ЦОДами и большим количеством взаимодействий с внешними организациями. Заказчик разводит руками и сам не знает, что с чем у него взаимодействует, по каким адресам и протоколам. И хотя в компании уже есть межсетевые экраны, но настроены они так, что разрешено практически всё. Как учесть всё это при проектировании и не сойти с ума?

Меня зовут Паша, я работаю в подразделении «Solar Интеграция». Вместе с коллегами мы делаем десятки комплексных проектов по кибербезопасности в год и часто решаем подобные головоломки, когда в инфраструктуре заказчика нужно внедрить новый или заменить старый файрвол. В этой статье я поделюсь одним из способов решения задачи по составлению матрицы сетевых взаимодействий: расскажу, как мы проводим анализ реального трафика в месте предполагаемой установки файрвола.

Другие способы сегодня останутся за бортом: можно также собрать информацию по рабочим подсистемам и их взаимодействию между собой, собрать конфигурацию с сетевого оборудования и межсетевых экранов, проанализировать политики сетевого взаимодействия или же логи трафика на оборудовании. Можно даже использовать все перечисленные способы сразу — результат будет ещё точнее, но в этом посте я подробно остановлюсь только на анализе трафика.

Из чего состоит процесс анализа

Если кратко, то процесс решения задачи по анализу реального трафика может выглядеть вот так:

  • в месте предполагаемой установки межсетевого экрана на коммутаторах настраиваем зеркалирование проходящего трафика;

  • монтируем Linux-машину с несколькими сетевыми картами и подаем на неё зеркалированный трафик;

  • производим захват и запись в pcap заголовков L2, L3 и L4;

  • парсим pcap-файлы и записываем в sqlite соединения с уникальными src_ip, dst_ip, protocol L4, dst_port;

  • на выходе получаем файл с базой уникальных соединений.

В результате вместо миллиона соединений мы получаем всего несколько тысяч, которые потом вручную или скриптами легко группируются по подсетям и протоколам. Остаётся только проанализировать необходимость этих взаимодействий и перенести их в проектируемую матрицу доступа. Обычно за пару недель работы мы собираем достаточно данных для этого.

Разберём более подробно пункты, которые касаются захвата трафика и парсинга pcap.

Захват трафика

Для захвата и записи заголовков мы используем утилиту tcpdump. Запуск tcpdump происходит из Python-скрипта.

tcpdump  -i enp4s0 ip -nn -q -c 1000000 -s 58 -w rab1.pcap
  • i – интерфейс для захвата трафика;

  • ip – только IP-протокол;

  • nn – выводить IP-адреса и номера портов;

  • q – сокращённый вывод;

  • с – количество принятых пакетов, после чего выход;

  • s 58 – запишем только заголовки;

  • w – запись в файл.

Сам скрипт для запуска будет выглядеть так:

#! /usr/bin/python3
import os
interface = 'enp4s0' #интерфейс для захвата пакетов
count = 1000000 #количество захваченных пакетов
for i in range (1000):
    result = os.system('tcpdump  -i {} ip -nn -q -c {} -s 58 -w rab_{}_{}.pcap'.format(interface, count, interface , i))
    if result == 0:
        os.replace('rab_{}_{}.pcap'.format(interface, i), 'finaly_{}_{}.pcap'.format(interface, i))

Результат работы этого скрипта — 1000 файлов примерно по 70 мегабайтов каждый. В каждом из них будут заголовки до L4 включительно. Количество файлов и захваченных пакетов можно менять, адаптируя под реальные условия. Если интерфейсов несколько, то запускаем несколько версий скрипта — каждую со своим интерфейсом.

Парсинг pcap

Параллельно с работой скриптов захвата трафика запускаем скрипт для парсинга pcap-файлов. Он проверяет, есть ли в данном каталоге файлы, которые начинаются на «finaly». Это файлы, которые были созданы в результате работы скриптов по захвату трафика. Если таковые нашлись, начинается их построчная обработка. Для работы используется модуль dpkt.

Затем происходит наполнение базы данных. Для UDP- и TCP-протоколов записываются src_ip, dst_ip, номер протокола (17 или 6), src_port, dst_port. Для остальных протоколов записываются src_ip, dst_ip, номер протокола (1, 47, 50, 51 и т. д.).

Для UDP запись и проверка происходят только в том случае, если src_port больше, чем dst_port. Так мы проверяем, кто инициировал соединение. А для TCP запись и проверка происходят, если в заголовке есть флаг SYN+ACK. Так мы проверяем, что сессия установилась, и не проверяем другие взаимодействия в рамках этой сессии.

Проверка на уникальность идёт средствами базы данных, что, конечно, не оптимально. Уникальность записи проверяется по полям src_ip, dst_ip, protocol, dst_port. Уникальность srt_port не проверяется, так как он уникальный только в рамках сессии.

#! /usr/bin/python3
#парсим pcap-файл
#скрипт работает, файлы БД и pcap должны лежать в каталоге с ним

import dpkt, socket, sqlite3, os, time

#функция записи в базу данных
def WRDB (file_name):
    f = open(file_name, 'rb')
    pcap = dpkt.pcap.Reader(f)

    connection = sqlite3.connect(bd_name) 
    cursor = connection.cursor()

    for ts, buf in pcap:
        eth = dpkt.ethernet.Ethernet(buf)
        #проверим, что у нас IP
        if isinstance(eth.data, dpkt.ip.IP):
            ip = eth.data
            #протокол на 4 уровне
            protocol = ip.p
            #UDP
            if isinstance(ip.data, dpkt.udp.UDP):
                #print ('UDP')
                udp = ip.data
                src_ip = socket.inet_ntoa(ip.src)
                dst_ip = socket.inet_ntoa(ip.dst)
                src_port = udp.sport
                dst_port = udp.dport
                if int(src_port) < int(dst_port):
                    protocol = 0 
                else:
                    src_ip = socket.inet_ntoa(ip.src)
                    dst_ip = socket.inet_ntoa(ip.dst)
                                   
            #TCP
            elif isinstance(ip.data, dpkt.tcp.TCP):
                tcp = ip.data
                flag = tcp.flags
                if flag == 18: # 18 - SYN+ACK; 22 - SYN; 16 - ACK; 17 - FIN+ACK 
                    #меняем местами src и dst, потому что мы смотрим fin+ack, то есть пакет не от того, кто инициировал сессию
                    src_ip = socket.inet_ntoa(ip.dst)
                    dst_ip = socket.inet_ntoa(ip.src)
                    src_port = tcp.dport
                    dst_port = tcp.sport
                    
                else:
                    protocol = 0 
            #any L4
            else:
                src_ip = socket.inet_ntoa(ip.src)
                dst_ip = socket.inet_ntoa(ip.dst)    
                src_port = ''
                dst_port = ''
            
            if protocol:             
                #тут пишем в БД, проверка на уникальность идет средствами БД, не проверяем source-порт, однако пишем его
                list_keys=[src_ip, dst_ip, protocol, src_port, dst_port]
                tupl = tuple(list_keys)
                cursor.execute("SELECT * FROM LOG WHERE src_ip='{}' and dst_ip='{}' and protocol='{}' and dst_port='{}';".format(src_ip, dst_ip, protocol, dst_port))
                if cursor.fetchone() == None:
                    query = "REPLACE into LOG values (?, ?, ?, ?, ?)"
                    #Пишем в БД обновленные данные 
                    cursor.execute(query,tupl)  
                    
    connection.commit()
    cursor.close()

#оставим или обнулим таблицу, файл с БД должен существовать
bd_name = 'u_log.db'
aaa = input ('Удалить старую и создать новую таблицу в БД (yes/no, default: no): ')
connection = sqlite3.connect(bd_name) 
cursor = connection.cursor()
if aaa == 'yes':
    cursor.execute("DROP table LOG;")
    cursor.execute("CREATE TABLE LOG (src_ip text not NULL,dst_ip text not NULL,\
    protocol text int(11) DEFAULT 0, src_port text int(11) DEFAULT 0, dst_port text int(11) DEFAULT 0);")
    print ("Созданы новые таблицы. Идет наполнение.")
else:
    print ("Производится анализ лог-файла и дозапись данных в таблицы")

connection.commit()
cursor.close()
while True:
    list_files = os.listdir()
    time.sleep(1)
    for fl in list_files:
        if fl.count('finaly'):
            time.sleep(1)
            WRDB (fl)
            print ('Обработан файл: ', fl)
            os.remove(fl)

Для запуска всех скриптов используем tmux или screen.

При желании приведенные скрипты можно оптимизировать. Как вариант — развить идею в техническом плане, например распараллелить обработку, тем самым увеличив быстродействие.

Заключение

Полученные данные нельзя сразу использовать для написания политик: они требуют анализа человеком. Но мы получаем информацию, с которой можно дальше работать, чтобы создавать правила. Прежде всего, нужно проверить данные на предмет легитимности: действительно ли все эти соединения должны присутствовать. Кроме того, их надо сгруппировать и упорядочить.

Анализ трафика и составление матрицы сетевых взаимодействий на подготовительном этапе даёт возможность внедрять файрвол с уже работающими правилами. Если межсетевой экран устанавливается у заказчика до создания политик, то сначала он работает в режиме обычного роутера с логированием и не фильтрует трафик. То есть по факту не защищает инфраструктуру до тех пор, пока правила не будут написаны.

Автор: Павел Пахомов, ведущий инженер отдела внедрения «Solar Интеграция» компании «Ростелеком-Солар»

Комментарии (9)


  1. AlexeyK77
    31.01.2022 11:17

    я бы netflow направил в какую-нибуль лог-менеджмент систему с развитыми инстурментами аналитики и уже там делал бы итоговый анализ. Заодно по реультатам внедрения фаервола в организации и миониторинг появится ;)


    1. olegtsss
      31.01.2022 11:41

      А почему не поставить устройство с wireshark в разрез и разбираться во всем детально?


      1. Mnemonic0
        31.01.2022 11:59

        А что вы увидите в Wireshark для SSL/TLS трафика? Насколько детально это будет выглядеть?

        Ответ. Ничего. Сорс-дестинейшн-порт. А в текущих реалиях нужны апликейшны.


        1. olegtsss
          31.01.2022 12:10

          На самом деле чуть больше. На этапе установления сессии. Представленное решение дает еще меньше информации, как раз ограничиваясь адресами, портами и флагами. И статья совсем не об MiTM, кстати говоря, а о L3 Firewall, т.е. о адресах, портах…


    1. Dzzzen
      31.01.2022 12:40

      Тогда уж лучше nbar


  1. Mnemonic0
    31.01.2022 11:44
    +1

    Да, расскажите чем поможет ваш дамп трафика в работающей штатно системе в которой 3 плеча резервных LAN каналов и 5 WAN. Всё это счастье с OSPF и VRRP. Будете ломать каждый канал и собирать дампы для каждой ситуации?

    Зачастую, фаервол уже есть и миграция идёт на другого вендора. Но в любом случае подход работаем как роутер не правильный.

    Если маленькая компания, то такие методы с дампами не нужны, поскольку админ знает куда и что должно ходить.

    Если средне-большая, где требуется интегратор, то дамп трафика уже не понадобится, поскольку невозможно во всём трафике разобраться. И зачастую новая система устанавливается сбоку, куда зеркалируется трафик и на этом основании делаются правила (точнее они делается из требований безопасности и бизнеса, а применяются они для материализации данных запросов)

    Сугубо побаловаться, да, интересное решение. Но в заголовок "Внедрение межсетевого экрана без боли" не вписывается. По своему опыту, скажу, Внедрение межсетевого экрана - всегда боль.


  1. apkotelnikov
    31.01.2022 15:33

    Солар вроде как позиционируется как компания, в том числе, решающая вопрос ИБ. Очень странно видеть такой подход "опишем как разрешено, все что сейчас ходит", а дальше разберёмся. Во первых не будет никакого дальше, во вторых межсетевой экран предназначен для разграничения ИС с разным уровнем защищённости и все взаимодействия между такими ИС определяется, в первую очередь, матрицей доступа. Не спорю, предложенный способ очень хорошо закрывает задачу "внедрить межсетевой экран и закрыть акты", но совсем не решает задачу по защите ресурсов заказчика.


    1. rk0se
      31.01.2022 16:53
      +2

      Так там же в заключении написано:

      Полученные данные нельзя сразу использовать для написания политик: они
      требуют анализа человеком. Но мы получаем информацию, с которой можно
      дальше работать, чтобы создавать правила. Прежде всего, нужно проверить
      данные на предмет легитимности: действительно ли все эти соединения
      должны присутствовать.

      Иначе можно было напрямую pcap-ы конвертировать в политики и ни каких проблем :)


  1. apkotelnikov
    31.01.2022 21:08

    Единственный верный ответ на вопрос "должны ли эти соединения присутствовать" содержится в матрице доступа. Особенно в случае наличия сервисов потребность в которых нерегулярна.

    Ну получили вы какие то там потоки данных и что? Ответ на вопрос "должны ли такие соединения иметь возможность установиться или нет" эти данные не содержат. Я понимаю - нормальное обследование делать и лень и денег не приносит, только проблемы ибо заказчик, как правило, не очень то жаждет глобальных исследований и не готов платить за это соответвующие деньги. Но и Ваш подход не имеет никакого отношения к ИБ. Так по быстрому влепить фв и гордо заявить "теперь все защищено" :-(:-(:-(