Вам необходимо перенести 1000 объектов и 600 правил из excel-таблички в UserGate? Или, наоборот, экспортировать из него все политики в таблицы для дальнейшего документирования и анализа? А может быть вы просто хотите погрузиться в API UserGate, но не понимаете с чего начать? Тогда эта статья для вас!

Всем привет! Меня зовут Данила Лопатин, я системный инженер в К2 Кибербезопасность. Если у вас стоит NGFW от UserGate, то вы знаете, что у него есть API-интерфейс, который позволяет автоматизировать многие рутинные процессы с помощью скриптов. В этой статье я расскажу про принципы его работы и приведу примеры написания скриптов. 

Все приведенные в статье скрипты я выполняю с помощью python3.12 на WSL с ОС Ubuntu 24.04 через Visual Studio Code.

Предварительно на WSL были установлены пакеты с библиотеками xlrd и xlwt для взаимодействия с xls файлами следующими командами:

  • $ sudo apt install python3-xlrd

  • $ sudo apt install python3-xlwt

Все скрипты продублированы на GitHub.

API в UserGate и принципы взаимодействия с ним

По API вы можете взаимодействовать с UserGate NGFW, MC и LogAn на версиях 6.1.7 и выше, а также на версии 5.0.6. На более старых версиях работоспособность API вендором не гарантируется.

У UserGate есть свой официальный инструмент в открытом доступе на GitHub. В целом он довольно многофункционален: умеет конвертировать и импортировать конфигурацию с большого количества зарубежных NGFW, а также между версиями UserGate. 

Однако он предназначен для работы только с конфигурациями в формате JSON. В случае, если необходима работа с xls, csv и т.д., он не подойдёт и придется разбираться своими силами. Тем не менее, выложенный на GitHub код часто можно переиспользовать в собственных скриптах или смотреть на него как на пример использования интересующего метода.

Именно такой случай мы и разберём ниже.

У UserGate есть 2 способа взаимодействия с его API:

  1. Через точку входа http://<IP-адрес>:4040/rpc.

  2. Через точку входа http://<IP-адрес>:4040/web_api.

В данной статье мы рассмотрим первый вариант для UserGate NGFW версии 7.2.0.76784R.

По умолчанию API разрешен только в зоне «Management». Если требуется включить его в других зонах, то необходимо перейти в веб-консоль по специальному URL: https://<IP-адрес UG>:8001/?features=zone-xml-rpc. Для MC и LogAn используется порт 8080.

Далее открыть настройки «Контроль доступа» в разделе «Зоны» и поставить флажок напротив «XML-RPC для управления».

Взаимодействие с API происходит через компоненты и методы из библиотеки xmlrpc. Каждый компонент соответствует своему разделу настроек в UserGate. Например, компонент firewall отвечает за правила межсетевого экранирования (и вместе с этим за инспекцию нешифрованных туннелей).

Все объекты, с которыми взаимодействуют методы, обладают своей структурой, состоящей из полей разных типов. Например, при использовании метода v1.libraries.services.fetch из компонента libraries будет возвращен список объектов, обладающих структурой NetworkServiceInfo, состоящей из полей:

  • id — идентификатор объекта (readonly, тип UID).

  • guid — устаревший идентификатор объекта (readonly, тип UID). Необходим для обратной совместимости с версией 6.1.9 и ниже.

  • name — название сервиса (тип String).

  • description — описание сервиса (тип String).

  • readonly — характеристика, показывающая можно ли изменять объект (readonly, тип Boolean).

  • protocols — перечень используемых протоколов (тип List<NetworkServiceProtocolInfo>).

  • cc — характеристика, показывающая наследуется ли объект с MC (readonly, тип Boolean).

Поля в свою очередь могут быть следующих типов:

  • Integer — целочисленный;

  • String — строковый (в кодировке UTF8);

  • Double — значения с двойной плавающей точкой;

  • Date — дата/время в формате ISO8601: YYYY-MM-DDTHH:MMZ;

  • Boolean — логический;

  • Base64 — двоичные данные, закодированные в Base64;

  • List — массив элементов;

  • UID — числовой идентификатор;

  • Variant — значение смешанного типа, может быть одним из любых описанных выше;

  • Null — специальный тип с единственным значением null.

Поля, помеченные как readonly, игнорируются при записи объектов. Например, все UID доступны только для чтения.

Поля, помеченные как writeonly, исключаются при чтении объектов. Например, пароли доступны только для записи.

Поля, помеченные как createonly, игнорируются при обновлении объекта и принимаются только при создании нового объекта в соответствующем методе.

Важно отметить, что пароли, которые содержатся внутри некоторых объектов и правил не выгружаются никакими методами.

Все структуры и описания их полей приведены здесь.

Закрепим теорию на практике

Экспорт и парсинг данных из правил

Данный скрипт приводится в качестве примера в документации по API UserGate:

#!/usr/bin/env python3

import xmlrpc.client

from pprint import pprint

SERVER = '192.168.84.26'

USER = 'API_Admin'

PASSWORD = 'Sup3rS#cretP@ssword!'

server = xmlrpc.client.ServerProxy('http://' + SERVER + ':4040/rpc', verbose=False)

res = server.v2.core.login(USER, PASSWORD, {})

auth_token = res['auth_token']

fw_rules = server.v1.firewall.rules.list(auth_token, 0, 100, {})

pprint(fw_rules)

server.v2.core.logout(auth_token)

Разберём этот пример

В начале файла объявляется, что это python-скрипт и импортируются все необходимые библиотеки:

#!/usr/bin/env python3

import xmlrpc.client

from pprint import pprint

Далее вписывается IP-адрес интерфейса UserGate, на который назначена зона с разрешением XML-RPC в настройках контроля доступа, а также логин и пароль администратора с правами доступа к API:

SERVER = '192.168.84.26'

USER = 'API_Admin'

PASSWORD = 'Sup3rS#cretP@ssword!'

Рекомендуем выделить для этого отдельную УЗ, т.к. учетные данные передаются в открытом виде по http. Лучшим вариантом будет построить RA VPN от АРМ администратора до UserGate, даже в случае, если вы подключаетесь из внутреннего сегмента сети, выделенного под управление и администрирование СЗИ.

Запись пути до входной точки API в переменную server:

server = xmlrpc.client.ServerProxy('http://' + SERVER + ':4040/rpc', verbose=False)

Авторизация в UserGate с помощью логина и пароля администратора:

res = server.v2.core.login(USER, PASSWORD, {})

В res содержатся следующие данные:

Нам необходимо извлечь данные только из поля auth_token и сохранить их в отдельную переменную:

auth_token = res['auth_token']

По такому же принципу можно обращаться к любым полям внутри объекта и выносить их в отдельные переменные.

Запись списка правил межсетевого экранирования в переменную fw_rules и вывод этой переменной:

fw_rules = server.v1.firewall.rules.list(auth_token, 0, 100, {})

pprint(fw_rules)

Завершение сессии:

server.v2.core.logout(auth_token)

В результате выполнения скрипта получаем выгрузку до 100 правил межсетевого экранирования в следующем виде:

Можно заметить, что в полученной структуре для ключей: dst_ips, dst_zones, src_ips, src_zones, services — в качестве значений используются списки уникальных идентификаторов объектов. В качестве примера добавим следующий код к прошлому скрипту, чтобы обратиться по одному из ID, и посмотрим, к какой конкретно информации он нас адресует: 

service_id = fw_rules['items'][0]['services'][0][1]

service = server.v1.libraries.service.fetch(auth_token, service_id)

pprint(service)

В первой строке обращаемся к первому объекту (правилу) содержащемуся в переменной fw_rules, в правиле переходим в поле services, в котором находится один список с двумя элементами: типом вложенного списка и ID сервиса, который мы копируем в переменную service_id.

Далее с помощью метода v1.libraries.service.fetch по ID сервиса получаем его и записываем в переменную service.

Результат выполнения скрипта:

Далее можно вынести имя сервиса в отдельную переменную, обратившись к полю name в объекте service:

service_name = service['name']

Таким способом можно парсить все объекты и формировать внушительные таблицы, которые в дальнейшем можно использовать для оптимизации существующих политик безопасности и библиотек объектов. Но перед этим необходимо разобраться в работе циклов. 

Наиболее удобным методом перебора объектов и полей является цикл вида:

for <объект> in <переменная>['<ключ>']:

Например, в переменной fw_rules находится словарь со списком правил, которые доступны по ключу ‘items’:

for item in fw_rules['items']:

В этом случае вместо переменной item можно использовать любую другую, но я предпочитаю называть их в соответствии с ключом. 

Часто вам придется обращаться к словарям внутри другого словаря или списка. В нашем примере это будет выглядеть следующим образом:

for item in fw_rules['items']:

for service in item['services']:

В результате будут перечислены все сервисы в каждом правиле.

Теперь можно соединить циклы с методами выше и получить следующий код:

#!/usr/bin/env python3

import xmlrpc.client

from pprint import pprint

SERVER = '192.168.84.26'

USER = 'API_Admin'

PASSWORD = 'Sup3rS#cretP@ssword!'

server = xmlrpc.client.ServerProxy('http://' + SERVER + ':4040/rpc', verbose=False)

res = server.v2.core.login(USER, PASSWORD, {})

auth_token = res['auth_token']

fw_rules = server.v1.firewall.rules.list(auth_token, 0, 100, {})

service_names = []

for item in fw_rules['items']:

    if len(item['services']) >= 1:

        for service in item['services']:

            service_name = server.v1.libraries.service.fetch(auth_token, service[1])['name']

            service_names.append(service_name)

    else:

        service_name = 'Any'

        service_names.append(service_name)  

print(service_names)

server.v2.core.logout(auth_token)

В переменную service_names поочередно добавляются имена сервисов из каждого правила с помощью метода server.v1.libraries.service.fetch(auth_token, service[1])['name']

В результате выполнения скрипта выводится список со всеми используемыми сервисами в каждом правиле:

Важно обратить внимание на следующую строку:

if len(item['services']) >= 1:

С помощью этого условия выбираются только те правила, в которых есть сервисы. Все правила с пустым полем сервисов обрабатываются через оператор else:.

Почему вместо этого не использовать следующую часть кода?

service_names = []

for item in fw_rules['items']:

    for service in item['services']:

        service_name = server.v1.libraries.service.fetch(auth_token, service[1])['name']

        if service_name == '':

            service_name = 'Any'

        service_names.append(service_name)

Дело в том, что если в правиле нет сервисов, то цикл с условием for service in item['services']: не выполняет итерацию, т.к. под ключом ‘services’ нет значений. Подобный нюанс важно учитывать при обработке любых объектов.

Под ключом ‘services’ находится обычный список, потому в нем необходимо обращаться к индексу элемента, а не к ключу. Обычно ID объектов (IP-списков, сервисов и т.д.) находится под индексом 1, однако ID зон выводятся иначе.

Вернемся к коду. Сейчас все сервисы выводятся в списке подряд без группировки по тем правилам, в которых они используются. Изменим условие if len(item['services']) >= 1: на  if len(item['services']) == 1: и elif len(item['services']) > 1:. Таким образом, код примет следующий вид:

#!/usr/bin/env python3

import xmlrpc.client

from pprint import pprint

SERVER = '192.168.84.26'

USER = 'API_Admin'

PASSWORD = 'Sup3rS#cretP@ssword!

server = xmlrpc.client.ServerProxy('http://' + SERVER + ':4040/rpc', verbose=False)

res = server.v2.core.login(USER, PASSWORD, {})

auth_token = res['auth_token']

fw_rules = server.v1.firewall.rules.list(auth_token, 0, 100, {})

service_names = []

for item in fw_rules['items']:

    if len(item['services']) == 1:

        for service in item['services']:

            service_name = server.v1.libraries.service.fetch(auth_token, service[1])['name']

            service_names.append(service_name)

    elif len(item['services']) > 1:

        service_name_arr = []

        for service in item['services']:

            service_name = server.v1.libraries.service.fetch(auth_token, service[1])['name']

            service_name_arr.append(service_name)

        service_names.append(service_name_arr)

    else:

        service_name = 'Any'

        service_names.append(service_name)

print(service_names)

server.v2.core.logout(auth_token)

Результат выполнения скрипта будет следующим:

Теперь сервисы сгруппированы по правилами и количество элементов в списке соответствует количеству правил.

Импортируем несколько дополнительных библиотек: os, sys, xlwt (необходима установка библиотеки через pip3), xlrd (необходима установка библиотеки через pip3). Вместо вывода в терминал через оператор print() будем делать запись сервисов в ячейки таблицы с помощью следующего кода:

#!/usr/bin/env python3

import os

import xmlrpc.client

import xlwt

from xlwt import Workbook

SERVER = '192.168.84.26'

USER = 'API_Admin'

PASSWORD = 'Sup3rS#cretP@ssword!

server = xmlrpc.client.ServerProxy('http://' + SERVER + ':4040/rpc', verbose=False)

res = server.v2.core.login(USER, PASSWORD, {})

auth_token = res['auth_token']

fw_rules = server.v1.firewall.rules.list(auth_token, 0, 100, {})

wb = Workbook()

style_1 = xlwt.XFStyle()

style_1.alignment.wrap = 1

sheet1 = wb.add_sheet('Services')

style_2 = xlwt.easyxf('font: bold 1')

sheet1.write(0,0,'Services',style_2)

col = 1

service_names = []

for item in fw_rules['items']:

    if len(item['services']) == 1:

        for service in item['services']:

            sheet1.write(col,0,server.v1.libraries.service.fetch(auth_token, service[1])['name'],style_1)

        col += 1

    elif len(item['services']) > 1:

        service_name_arr = []

        for service in item['services']:

            service_name = server.v1.libraries.service.fetch(auth_token, service[1])['name'] + '\n'

            service_name_arr.append(service_name)

        service_name_arr[-1] = service_name_arr[-1].replace('\n', '')

        sheet1.write(col,0,service_name_arr,style_1)

        col+=1

    else:

        service_name = 'Any'

        sheet1.write(col,0,service_name,style_1)

        col+=1

file_path = os.path.join('Абсолютный', 'путь', 'до', 'создаваемого', 'файла.xls')

wb.save(file_path)

server.v2.core.logout(auth_token)

Добавление символа \n используется для переноса строки внутри ячейки.

В результате получаем файл в формате xls с сервисами сгруппированными в ячейках таблицы в соответствии с правилами, в которых они используются:

Аналогичным образом можно обработать остальные столбцы правил межсетевого экранирования и получить их полный перечень для дальнейшей обработки, например, для харденинга политик безопасности.

Экспорт и парсинг данных из библиотеки объектов

Для выгрузки самих сервисов и их содержимого необходимо использовать метод server.v1.libraries.services.list(auth_token, 0, 100, {}, []). После выгрузки всего списка в переменную остается только снова распарсить полученные данные по ячейкам таблицы. Скрипт будет выглядеть следующим образом:

#!/usr/bin/env python3

import os

import xmlrpc.client

import xlwt

from xlwt import Workbook

SERVER = '192.168.84.26'

USER = 'API_Admin'

PASSWORD = 'Sup3rS#cretP@ssword!

server = xmlrpc.client.ServerProxy('http://' + SERVER + ':4040/rpc', verbose=False)

res = server.v2.core.login(USER, PASSWORD, {})

auth_token = res['auth_token']

services = server.v1.libraries.services.list(auth_token, 0, 100, {}, [])

wb = Workbook()

style_1 = xlwt.XFStyle()

style_1.alignment.wrap = 1

sheet1 = wb.add_sheet('Services')

style_2 = xlwt.easyxf('font: bold 1')

sheet1.write(0,0,'Services',style_2)

sheet1.write(0,1,'Ports',style_2)

row = 1

service_names = []

for item in services['items']:

    col = 0

    protocol_arr = []

    sheet1.write(row,col,item['name'],style_1)

    col += 1

    for protocol in item['protocols']:

        protocol_type = protocol['proto'] + '/'

        protocol_port = protocol['port'] + '\n'

        protocol_arr.append(protocol_type + protocol_port)

    protocol_arr[-1] = protocol_arr[-1].replace('\n', '')

    sheet1.write(row,col,protocol_arr,style_1)

    row += 1

file_path = os.path.join('Абсолютный', 'путь', 'до', 'создаваемого', 'файла.xls')

wb.save(file_path)

server.v2.core.logout(auth_token)

В результате выполнения скрипта получится файл со следующим наполнением:

Импорт данных из таблиц

Допустим, после аналитики и харденинга политик были разработаны новые и обновлены прежние объекты и правила. Теперь необходимо внести эти изменения в UserGate. Делать подобное вручную долго и муторно, а также всегда есть шанс ошибиться при вводе очередного правила. Эффективнее и безопаснее сделать для этого еще один скрипт для загрузки объектов или правил, который в дальнейшем можно будет переиспользовать в подобных задачах.

Разберем этот сценарий на примере загрузки в UserGate следующего перечня сервисов:

К списку сервисов добавилось несколько новых объектов (TEST1, TEST2, IPSec), а также изменились некоторые старые (RDP, POP3, HTTPS, HTTP).

Для переноса объектов воспользуемся скриптом ниже и далее разберем важные части:

#!/usr/bin/env python3

import os

import xmlrpc.client

import xlrd

SERVER = '192.168.84.26'

USER = 'API_Admin'

PASSWORD = 'Sup3rS#cretP@ssword!

server = xmlrpc.client.ServerProxy('http://' + SERVER + ':4040/rpc', verbose=False)

res = server.v2.core.login(USER, PASSWORD, {})

auth_token = res['auth_token']

dict_services = {}

all_services = server.v1.libraries.services.list(auth_token, 0, 1000, {}, [])

for item in all_services["items"]:

    dict_services.update({item["name"]:item["id"]})

file_path = os.path.join('Абсолютный', 'путь', 'до', 'файла.xls')

book = xlrd.open_workbook(file_path)

sheet1 = book.sheet_by_index(0)

for row in range(sheet1.nrows):

    if row > 0:

        protocols = []

        col = 0

        service_name = sheet1.cell_value(row, col)

        col += 1

        service_dst_ports = sheet1.cell_value(row, col).split('\n')

        service_dst_ports_stripped = [s.strip() for s in service_dst_ports]

        for service_dst_port in service_dst_ports_stripped:

            protocol = {'proto':'','port':'','source_port':''}

            service_dst_port_type = str(service_dst_port)[:service_dst_port.find('/')]

            service_dst_port_numb = str(service_dst_port)[service_dst_port.find('/')+1:]

            protocol['proto'] = service_dst_port_type

            protocol['port'] = service_dst_port_numb

            protocols.append(protocol)

        service_structure = {'name': service_name, 'description': '', 'protocols': protocols}

        try:

            server.v1.libraries.service.add(auth_token, service_structure)

        except xmlrpc.client.Fault as err:

            if err.faultCode == 409:

                server.v1.libraries.service.update(auth_token, dict_services[service_name], service_structure)

server.v2.core.logout(auth_token)

Начало скрипта не отличается от предыдущих, кроме импортированной библиотеки xlrd.

После аутентификации и получения токена первым действием собираются текущие данные о сервисах на UserGate, которые заносятся в словарь dict_services

dict_services = {}

all_services = server.v1.libraries.services.list(auth_token, 0, 1000, {}, [])

for item in all_services["items"]:

    dict_services.update({item["name"]:item["id"]})

В конце он понадобится для обновления существующих объектов.

Далее подгружается файл, в котором находится нужный список с сервисами.

file_path = os.path.join('Абсолютный', 'путь', 'до', 'файла.xls')

book = xlrd.open_workbook(file_path)

sheet1 = book.sheet_by_index(0)

Далее с помощью цикла идет перебор всех строк таблицы:

for row in range(sheet1.nrows): 

Последующим условием if row > 0: пропускается строка с заголовками таблицы.

Через переменную row перебираются строки таблицы, через col – столбцы.

Сначала скрипт вытягивает из первого столбца название сервиса.

service_name = sheet1.cell_value(row, col)

Далее скрипт смотрит на второй столбец и забирает из ячейки весь перечень портов.

В список protocols записываются словари protocol, в которых указываются тип и номер порта.

Предварительно перечень портов разделяется на пары <тип порта>/<номер порта> и записывается в промежуточный список для последующей обработки:

service_dst_ports = sheet1.cell_value(row, col).split('\n')

service_dst_ports_stripped = [s.strip() for s in service_dst_ports]

Далее вложенный цикл перебирает каждую пару <тип порта>/<номер порта> и записывает их в словарь protocol, который последовательно добавляется в список protocols. Когда список protocols полностью сформирован, он и название соответствующего сервиса записываются в структуру:

service_structure = {'name': service_name, 'description': '', 'protocols': protocols}

Получается, что ранее мы парсили эту структуру по ячейкам таблицы, теперь же наоборот поэтапно собрали.

Вспомним выгруженный ранее сервис и убедимся в этом:

Через оператор try: проводится попытка с помощью метода API добавить сервис в библиотеку UserGate.

server.v1.libraries.service.add(auth_token, service_structure)

В случае, если сервис с таким названием уже есть в библиотеке, то метод возвращает ошибку с кодом 409.

С помощью следующей конструкции мы переопределяем обработку этой ошибки: 

except xmlrpc.client.Fault as err:

    if err.faultCode == 409:

После неуспешной попытки добавить новый объект, вместо вывода кода ошибки в терминал, скрипт обновляет уже существующий в библиотеке UserGate следующим методом:

server.v1.libraries.service.update(auth_token, dict_services[service_name], service_structure)

Аналогично можно обработать и другие ошибки так, как это необходимо вам.

Заключение

Таким образом, с помощью данных скриптов мы автоматизируем рутинные задачи, сокращаем трудозатраты и уменьшаем влияние человеческого фактора при переносе политик и объектов в UserGate.

Уже готовые скрипты вы можете объединять между собой, переиспользовать и адаптировать для новых задач.

Всем оптимизации и автоматизации!

Комментарии (0)