Когда горишь своим делом!
Когда горишь своим делом!

Приветствую! Меня зовут Евгений, я DevOps в команде сопровождения инфраструктуры. Мои прошлые статьи были посвящены протоколу BGP, но в этот раз я приготовил нечто поинтереснее. В прошлом году мне захотелось расширить своё портфолио навыков в сторону автоматизации задач. Не последнюю роль в этом желании сыграла книга «Автоматизация программируемых сетей», выпущенная известным в IT-кругах издательским домом O'Reilly. В ней примеры скриптов написаны на Python. Позже я в учебном центре прошёл курс повышения квалификации по программе «Python для сетевых инженеров. Автоматизация сетевых задач», а затем продолжил самостоятельно постигать это искусство. Недавно подвернулась интересная задача, о решении которой при помощи Python я вам сегодня и расскажу. Усаживаетесь поудобнее, мы отправляемся.

Как выглядит задача

Был у нас сервер OpenVPN, на котором скриптом собирались логи активности пользователей. Сервер заменили на новый, улучшенный, более безопасный и с несколько другой архитектурой работы. Сначала логи активности пользователей предполагали собирать тем же скриптом, но он наотрез отказался работать в новой инсталляции из-за ряда изменений в конфигурации сервера. Потребовалось новое решение, в пользу которого было и то, что изначальный скрипт формировал данные для отправки в базу InfluxDB, которая также планируется к выводу из эксплуатации. В связи с этим я решил разработать новый механизм, который будет готовить данные для нашей системы мониторинга — Zabbix.

Архитектура решения

В качестве источника данных решили использовать встроенный в OpenVPN механизм журналирования, а при помощи скрипта — формировать в формате JSON данные, которые будет принимать и обрабатывать Zabbix при помощи UserParameter. Преимущество встроенного механизма ещё и в том, что сервер постоянно обновляет информацию в журнале, с которым мы будем работать.

Подготовка сервера

Приведенные далее пути файлов и их названия у вас могут отличаться.

Для активации сбора логов на сервере необходимо в конфигурационный файл, например /etc/openvpn/ovpn.conf, добавить следующие строки:

status /var/log/openvpn/user-openvpn-status.log
status-version 2

Первая строчка включает журналирование статуса подключённых клиентов в инстансе OpenVPN и путь к файлу, в который необходимо записывать данные. Вторая определяет степень насыщения лога заголовками и значениями. Мы получим примерно такое содержимое файла user-openvpn-status.log, часть информации из которого нам необходима для формирования выходных данных:

TITLE,OpenVPN 2.4
TIME,Wed May 24 15:31:01 2023,1684931461
HEADER,CLIENT_LIST,Common Name,Real Address,Virtual Address,Virtual IPv6 Address,Bytes Received,Bytes Sent,Connected Since,Connected Since (time_t),Username,Client ID,Peer ID
CLIENT_LIST,user1,109.173.2.43:58512,192.168.0.100,,13812222,32527491,Tue May 23 23:53:04 2023,1684875184,user1,10,15
CLIENT_LIST,user2,84.252.147.251:58658,192.168.1.110,,1700717,3740537,Wed May 24 15:03:38 2023,1684929818,user2,35,0
CLIENT_LIST,user3,185.35.14.25:61677,192.168.2.120,,106197150,1811955768,Wed May 24 09:44:05 2023,1684910645,user3,45,6
HEADER,ROUTING_TABLE,Virtual Address,Common Name,Real Address,Last Ref,Last Ref (time_t)
ROUTING_TABLE,8a:f5:14:91:08:f3,user1,109.173.2.43:58512,Wed May 24 15:31:00 2023,1684931460
ROUTING_TABLE,c2:74:56:f9:28:4c,user2,185.35.14.25:61677,Wed May 24 15:30:58 2023,1684931458
ROUTING_TABLE,16:f7:3c:4a:1d:ed,user3,84.252.147.251:58658,Wed May 24 15:31:00 2023,1684931460
GLOBAL_STATS,Max bcast/mcast queue length,17
END

Из всего лог-файла я решил использовать значения таких заголовков:

  • Common Name — имя пользователя;

  • Real Address — внешний IP, с которого идёт подключение к серверу;

  • Virtual Address — полученный пользователем в VPN IPv4-адрес;

  • Bytes Received — объём полученного от пользователя трафика в рамках VPN-сессии;

  • Bytes Sent — объём отправленного пользователю трафика в рамках VPN-сессии;

  • Connected Since (time_t) — время подключения к серверу, в формате unix time.

Приступим к написанию скрипта

Начинается скрипт с shebang, в котором мы указываем интерпретатором Python 3, в моём случае это 3.6.8, и набор импортируемых модулей:

#!/usr/bin/python3

import json
import os
import time
import sys
import re
import pygeoip
from datetime import datetime

Для начала нам нужно найти в системе лог-файл, с которым мы будем работать. Зададим переменную с директорией поиска; вспоминаем, куда указали серверу писать журнал, а также переменную с путём до файла базы данных, это понадобится нам потом для реализации GeoIP:

path = "/var/log/openvpn/"
geoip_db_path = "/usr/bin/GeoIP.dat"

У меня на сервере несколько инстансов OpenVPN, и изначально скрипт за один запуск собирал данные из лог-файлов каждого инстанса, но я узнал об имеющемся у нас ограничении на длину передаваемых в Zabbix данных: они не должны содержать более 65536 символов. Решил собирать данные отдельно по каждому инстансу, передавая в скрипт аргумент, который позволил бы выбирать конкретный лог-файл. При этом я сохранил возможность обрабатывать несколько журналов разом, для этого достаточно изменить условие поиска, о котором мы поговорим дальше.

На случай, если скрипт будет импортирован модулем в другой, в самом конце прописываем условие его запуска:

if __name__ == "__main__":
    result = get_log_json(sys.argv[1])
    print(result)

Подробности о том, как работает и зачем нужна конструкция в первой строчке, можно легко найти в интернете. Во второй строчке мы присваиваем переменной result результат вызываемой функции get_log_json, в которую мы передаём первый аргумент при помощи sys.argv[1].

Создадим функцию и механизм поиска лог-файла:

def get_log_json(vpn=""):
    status_files = []
    name_arg = str(vpn) + "-openvpn-status.log"

    for address, dirs, files in os.walk(path):
        for name in files:
            if name_arg in name:
                status_files.append(os.path.join(address, name))

Внутри функции объявляем переменную status_files с пустым листом, в неё будут в цикле добавляться имена найденных файлов, отвечающих требованию содержать в своём названии строчку из name_arg. Полученный список названий файлов нужно будет проитерировать и совершить с каждым из них ряд манипуляций.

Первоначально надо отфильтровать содержимое лог-файла и исключить лишнее, фильтрация организована следующим образом:

for status_log in status_files:
    log_connection = []
    with open(status_log, "r") as file:
        for line in file:
            if "TITLE" in line:
                continue
            elif "TIME" in line:
                continue
            elif "HEADER" in line:
                continue
            elif "ROUTING_TABLE" in line:
                break

            log_connection.append(line)

Почти все дальнейшие действия будут происходить внутри цикла for status_log in status_files.

  1. Итерируем список файлов из переменной status_files.

  2. Объявляем переменную log_connection с пустым листом.

  3. Открываем итерируемый файл и построчно проверяем по набору условий.

  4. Записываем в лист log_connection строчки, которые удовлетворили условиям.

По итогу работы цикла for line in file мы получим следующее содержимое в переменной log_connection:

[
'CLIENT_LIST,user1,109.173.2.43:58512,192.168.0.100,,13812222,32527491,Tue May 23 23:53:04 2023,1684875184,user1,10,15\n',
'CLIENT_LIST,user2,84.252.147.251:58658,192.168.1.110,,1700717,3740537,Wed May 24 15:03:38 2023,1684929818,user2,35,0\n',
'CLIENT_LIST,user3,185.35.14.25:61677,192.168.2.120,,106197150,1811955768,Wed May 24 09:44:05 2023,1684910645,user3,45,6\n'
]

Перед обработкой получившегося листа нужно задать ещё несколько переменных:

nets = {
    "group1": list(range(0, 1)),
    "group2": list(range(1, 3)),
    "group3": list(range(3, 5))
}

geoip_db = pygeoip.GeoIP(geoip_db_path)
parsing_zabbix = []

Я захотел насыщать формируемый JSON информацией, к какому инстансу OpenVPN подключён пользователь. Будет браться значение из октета IP, получаемого пользователем в VPN, и сравниваться на совпадение с диапазоном в словаре nets. С переходом на работу с одним лог-файлом этот механизм немного потерял смысл и проще определять инстанс по передаваемому аргументу или куску названия лог-файла, но я решил оставить как есть.

Также я захотел насыщать его данными о стране происхождения VPN-подключения, для этого создаём переменную geoip_db, которая будет содержать прочитанную модулем pygeoip базу данных. Напоследок нужна ещё одна переменная с пустым листом, в которую мы будем добавлять формируемый словарь с выходными данными.

Переходим к самому интересному. Мы сформировали данные для дальнейшей обработки и теперь можем делать словарь с выходными данными:

for i in log_connection:
    environment = ""
    parsing = i.split(",")
    connect_time = datetime.fromtimestamp(int(parsing[8]))
    octet = int(parsing[3].split(".")[2])
    time_now = int(time.time())
    seconds = time_now - int(parsing[8])
    real_ip = parsing[2].split(":")[0]
    country = geoip_db.country_code_by_addr(real_ip)
    regex = "^[A-Z][A-Z]$"
    pattern = re.compile(regex)

    if pattern.search(country):
        code = country
    else:
        code = "Error"
        
    nets_key_list = nets.keys()
    
    for key in nets_key_list:
        if octet in nets[key]:
            environment = key
            break

    parsing_zabbix_temp = {
        "user": parsing[1],
        "real_ip": real_ip,
        "vpn_ip": parsing[3],
        "env": environment,
        "bytes_r": parsing[5],
        "bytes_s": parsing[6],
        "con_time": str(connect_time),
        "unix_time": parsing[8],
        "duration": seconds,
        "code": country
    }

    parsing_zabbix.append(parsing_zabbix_temp)

Берем ранее подготовленный лист log_connection и проходимся циклом по каждому индексу. Разделяем по запятой взятую из индекса строчку на новый лист и присваиваем переменной parsing, из этой переменной по индексу будут браться значения для дальнейших манипуляций. Для наглядности вот пример содержимого переменной parsing:

[
'CLIENT_LIST',                # номер индекса [0]
'user1',                      # номер индекса [1]
'109.173.2.43:58512',         # номер индекса [2]
'192.168.0.100',              # номер индекса [3]
'',                           # номер индекса [4]
'13812222',                   # номер индекса [5]
'32527491',                   # номер индекса [6]
'Tue May 23 23:53:04 2023',   # номер индекса [7]
'1684875184',                 # номер индекса [8]
'user1',                      # номер индекса [9]
'10',                         # номер индекса [10]
'15\n'                        # номер индекса [11]
]

Объясню по порядку, сверху вниз, как формируется словарь parsing_zabbix_temp:

  1. Переводим время подключения из unix time в timestamp вида 2023-05-23 23:53:04 и сохраняем значение ключа con_time в виде строки. Мне оно понадобится в Grafana, так как с unix time она работать не умеет.

  2. Определяем третий октет у IP 192.168.0.100 с помощью разделения на список по точке и присваивания его второго индекса переменной octet. Полученное значение будет целочисленным.

  3. Определяем текущее время в формате unix time и вычисляем длительность сессии в секундах. Результат присваиваем переменной seconds, а её ключу duration. Так как лог обновляется частно, для меня такой подход приемлем. За точку отсчёта ещё можно взять unix time из второй строчки изначального лога.

  4. Внешний IP-адрес получаем разделением 109.173.2.43:58512 на лист по двоеточию и присваиваем переменной real_ip значение нулевого индекса получившегося листа. Переменную также присваиваем ключу real_ip.

Для получения значений ключей code и env требуются дополнительные пояснения, начнём с code. Эта та самая задача про GeOIP. Изначально я решал задачу отправкой запросов в API специального сайта, получал оттуда ответ, забирал нужное значение и проверял его через регулярное выражение на соответствие двум заглавным буквам, так как каждая страна имеет международный код, состоящий из двух заглавных букв. Если бы API мне не ответил или прислал нечто другое, то переменной было бы присвоено Error.

Такое решение работало, но при опытной эксплуатации выяснилось, что оно очень сильно замедляет скрипт, он стал работать более минуты, что неприемлемо. Мы нашли дамп базы данных, я переписал скрипт с запроса к API сайта на получение кода из базы данных модулем pygeoip, при этом проверку на корректность решил не убирать.

Переходим к вопросу про ключ env. Как я говорил выше, основой служит октет IP-адреса, полученного пользователем в VPN. Создадим список ключей ранее созданного словаря nets и присвоим его переменной nets_key_list. Итерируем список и сравниваем значение переменной octet со значениями выбранного ключа; если совпадений не найдено, то переходим к следующему ключу из списка и сравниваем с его значениями; после совпадения выходим из цикла и присваиваем переменной environment значение итерируемого в данный момент ключа. Пока печатал, понял, что это может быть не очень понятно, потому приведу пример:

octet = 2
nets_key_list = ['group1', 'group2', 'group3']

Во второй итерации key равен 'group2'
nets['group2'] = [0,1,2]
Если octet содержится в nets['group2'], то environment равен key 

Заполняем оставшиеся поля, используя индексы листа parsing, и добавляем полученный словарь в ранее созданный лист parsing_zabbix.

По завершении каждой итерации ранее описанного цикла for i in log_connection полученные данные будут добавляться в общую переменную:

parsing_zabbix_all += parsing_zabbix

Все циклы завершили свою работу, осталось сделать всего пару вещей. Первое: весь полученный объём данных присвоить новому словарю с единственным ключом data, чтобы удовлетворить требованию Zabbix:

parsing_zabbix_add_data = {"data": parsing_zabbix_all}

Второе: перевести содержимое переменной parsing_zabbix_add_data в JSON и вернуть как ответ вызванной функции в переменную result:

return json.dumps(parsing_zabbix_add_data)

Результат

Мы получим такой вывод (для наглядности я его отформатировал):

{
  "data": [
    {
      "user": "user1",
      "real_ip": "109.173.2.43",
      "vpn_ip": "192.168.0.100",
      "env": "group1",
      "bytes_r": "13812222",
      "bytes_s": "32527491",
      "con_time": "2023-05-23 23:53:04",
      "unix_time": "1684875184",
      "duration": 2488510,
      "code": "RU"
    },
    {
      "user": "user2",
      "real_ip": "84.252.147.251",
      "vpn_ip": "192.168.1.110",
      "env": "group2",
      "bytes_r": "1700717",
      "bytes_s": "3740537",
      "con_time": "2023-05-24 15:03:38",
      "unix_time": "1684929818",
      "duration": 2433876,
      "code": "RU"
    },
    {
      "user": "user3",
      "real_ip": "185.35.14.25",
      "vpn_ip": "192.168.2.120",
      "env": "group2",
      "bytes_r": "106197150",
      "bytes_s": "1811955768",
      "con_time": "2023-05-24 09:44:05",
      "unix_time": "1684910645",
      "duration": 2453049,
      "code": "DE"
    }
  ]
}

Под спойлером оставляю скрипт целиком, надеюсь, кому-то идеи из него помогут в решении задач ;)

Нажимать сюда
#!/usr/bin/python3

import json
import os
import time
import sys
import re
import pygeoip
from datetime import datetime

path = "/var/log/openvpn/"
geoip_db_path = "/usr/bin/GeoIP.dat"


def get_log_json(vpn=""):
    status_files = []
    name_arg = str(vpn) + "-openvpn-status.log"

    for address, dirs, files in os.walk(path):
        for name in files:
            if name_arg in name:
                status_files.append(os.path.join(address, name))

    parsing_zabbix_all = []

    for status_log in status_files:
        log_connection = []
        with open(status_log, "r") as file:
            for line in file:
                if "TITLE" in line:
                    continue
                elif "TIME" in line:
                    continue
                elif "HEADER" in line:
                    continue
                elif "ROUTING_TABLE" in line:
                    break

                log_connection.append(line)

        nets = {
            "group1": list(range(0, 1)),
            "group2": list(range(1, 3)),
            "group3": list(range(3, 5))
        }

        geoip_db = pygeoip.GeoIP(geoip_db_path)
        parsing_zabbix = []
        
        for i in log_connection:
            environment = ""
            parsing = i.split(",")
            connect_time = datetime.fromtimestamp(int(parsing[8]))
            octet = int(parsing[3].split(".")[2])
            nets_key_list = nets.keys()
            time_now = int(time.time())
            seconds = time_now - int(parsing[8])
            real_ip = parsing[2].split(":")[0]
            country = geoip_db.country_code_by_addr(real_ip)
            regex = "^[A-Z][A-Z]$"
            pattern = re.compile(regex)

            if pattern.search(country):
                code = country
            else:
                code = "Error"

            for key in nets_key_list:
                if octet in nets[key]:
                    environment = key
                    break

            parsing_zabbix_temp = {
                "user": parsing[1],
                "real_ip": real_ip,
                "vpn_ip": parsing[3],
                "env": environment,
                "bytes_r": parsing[5],
                "bytes_s": parsing[6],
                "con_time": str(connect_time),
                "unix_time": parsing[8],
                "duration": seconds,
                "code": country
            }

            parsing_zabbix.append(parsing_zabbix_temp)
        parsing_zabbix_all += parsing_zabbix

    parsing_zabbix_add_data = {"data": parsing_zabbix_all}
    return json.dumps(parsing_zabbix_add_data)


if __name__ == "__main__":
    result = get_log_json(sys.argv[1])
    print(result)

Всем спасибо за внимание, до новых встреч!

Комментарии (9)


  1. AcckiyGerman
    27.06.2023 10:05
    +2

    Приветствуем в мире Python и на хабре!

    Вот это

        with open(status_log, "r") as file:
            for line in file:
                if "TITLE" in line:
                    continue
                elif "TIME" in line:
                    continue
                elif "HEADER" in line:
                    continue
                elif "ROUTING_TABLE" in line:
                    break
    
                log_connection.append(line)
    

    можно заменить на

        with open(status_log, "r") as file:
            for line in file:
                if "CLIENT_LIST" in line:
                    log_connection.append(line)
    

    При этом я сохранил возможность обрабатывать несколько журналов разом

    Лучше сразу выкидывать код, который "может быть потом пригодится". Не пригодится в 99% случаев.

    Чтение логов и парсинг красивее и экономичнее по памяти делать в один проход, а не двумя циклами. Парсинг одной строки можно вынести в отдельную функцию.

    Что это за duration у вас? Как получатель этой информации, я бы подумал, что это длительность клиентской сессии, но у вас это время прошедшее от подключения клиента до обработки логов:

    ....
        time_now = int(time.time())
        seconds = time_now - int(parsing[8])
    ...
        "unix_time": parsing[8],
        "duration": seconds,
    

    Обращатся к внешней серверам при обработки логов чревато не только долгой работой, но и баном ввиду слишком частых подключений, так что решение выкачать GeoIP совершенно правильное.

    В целом идёте по граблям но в правильном направлении!


    1. EvgenNet Автор
      27.06.2023 10:05

      Спасибо за подсказку с фильтрацией строк исходного лог файла, учту!)
      Про duration вы правильно поняли, вернее использовать временную метку из лог файла. Лог файл обрабатывается каждые несколько минут, потому погрешность не очень большая, но стоит этот момент переделать, спасибо!


      1. AcckiyGerman
        27.06.2023 10:05

        Извините за мою настойчивость, но получается, что Duration это не время, в течении которого пользователь (был) подключен к VPN, а время, через которое вы логи обработали. А если вы логи через год обработали, то получится что пользователь год от VPN не отключался.
        Смысл этой метрики для мониторинга для меня непонятен.
        Узнать о том, как давно клиент подключился?
        Но что если он уже отключился?


  1. maxx_s
    27.06.2023 10:05
    +2

    Скажите, рассматривался ли mgmt интерфейс OpenVPN для сбора статистики? Если да то почему был выбран статус лог файл?


    1. EvgenNet Автор
      27.06.2023 10:05

      Нет, не рассматривался.


  1. vldmrmlkv
    27.06.2023 10:05
    +1

    для больших файлов иногда полезно использовать yield

    def status_log_reader(status_log):
        with open(status_log, "r") as file:
            for line in file:
                yield line
        for status_log in status_files: 
            log_connection = []
            with open(status_log, "r") as file:
                y_file = status_log_reader(file) # status_log_reader
                for line in y_file:
                    if "CLIENT_LIST" in line:
                        log_connection.append(line)


    1. AcckiyGerman
      27.06.2023 10:05

      У вас две ошибки, одна семантическая, а другая логическая.

      Во-первых вы пытаетесь открыть файл дважды, когда передаёте уже открытый файл (объект TextIOWrapper ) в status_log_reader, которая ожидает путь к файлу и выбросит исключение.

      Во вторых, если все строки в конце концов складываются в List, они будут занимать одинаковое количество памяти независимо от того, читаете вы их обычным перебором или Yield.


      1. vldmrmlkv
        27.06.2023 10:05

            for status_log in status_files:
                log_connection = tuple()
                y_file = status_log_reader(status_log)
                for line in y_file:
                    if "CLIENT_LIST" in line:
                        log_connection += (line,)

        А на счёт памяти, когда все строки попадают в List - отдельный случай, т.е. могут и не все строки + можно использовать tuple.


        1. AcckiyGerman
          27.06.2023 10:05
          +1

          Текущая программа автора выглядит в псевдокоде так

          read and filter all lines
          make big json
          print big json
          

          Я предполагаю, что вы советуете испльзовать Yield, для того чтобы сократить потребление памяти?

          Но у вас все отфильтрованные строки также загружаются в память, и неважно, через FOR они загружаются или через YIELD и не так уж важно в List или в Tuple.
          Размер занимаемой памяти в любом случае будет O(n), а точнее
          N * (StringSize + StructSize)
          То есть количество строк умножить на сумму длинны строки и размера элемента структуры, который эту строку содержит.

          Если вы хотите сократить потребление до 1(n) то вместо складывания строк в память, их нужно фильтровать и сразу передавать дальше.

          В псевдокоде такая программа будет выглядеть как

          read line
          filter line
          make json line
          print json line