Всех приветствую!
Послушал обзор курса Мониторинг высоконагруженных систем от OTUS, в котором упоминалось, что используется асинхронный подход в мониторинге и решил реализовать его в установленном по гайду WG. Готового легковесного в интернете ранее не нашел, а вопрос назрел в ввиду оперативного понимания, кто забивает канал, да и логи хотелось бы увидеть о событиях. Если читатель надеется, что тут я опишу изменение исходника, не тратьте время, будем обрабатывать вывод команды wg в консоли Python-ом, который уже есть в ubuntu 20.04 и содержит asincio. Можете по
читать как писался код
Напишем первую функция которая будет создавать корутины с блоками клиентов из вывода `STDOUT` в бесконечном цикле и то ради чего мы это делаем - писать в лог и мониторить. Добавим обработку исключений в процессе создание корутин для остановки по ctrl+c и по ошибкам asyncio плюс паузу 5 минут в цикле по сбору вывода wg. Для мониторинга создадим http сервер, который при задержках можно мгновенно открыть и определить кто забил канал и эту страницу можно опрашивать в панели grafan-ы на будущее.
async def main_loop() -> None:
'''Основной цикл'''
await log_event("Wireguard Logging Service Started")
asyncio.create_task(start_http_server())
while True:
try:
wg_output: str = await read_wg_output()
peer_blocks = re.split(r"\n(?=peer: )", wg_output.strip())
tasks = [process_peer(block) for block in peer_blocks]
if tasks:
await asyncio.gather(*tasks)
await asyncio.sleep(300) # Ожидание перед итерацией 5 минут
except (KeyboardInterrupt, asyncio.CancelledError):
await stop()
break
Здесь определился с названиями функции, отправил первую запись в лог, определил, что захваченное делится на куски начинающиеся с peer и убрал с вывода пустые символы. Собранные куски переберем с помощью list comprehension и обработаем функцией в которой будет вся логика. Результат отправим исполняться стандартной asyncio.gather(), которая соберет корутины из блоков и отправит в цикл asincio исполняться конкурентно.---
Пишем все те функции название которые придумали.
Первая логирование, получаем текст и через контекстный менеджер добавляем в файл.
import aiofiles
import os
from datetime import datetime
async def log_event(message) -> None:
'''Функция логирование события в файл с временной меткой'''
async with aiofiles.open(LOG_FILE, mode='a') as f:
await f.write(f"{datetime.now()} - {message}\n")
Во второй опишем веб сервер, добавив в лог событие запуска
from aiohttp import web
async def start_http_server() -> None:
'''Запуск HTTP сервера для отдачи состояния подключений'''
app = web.Application()
app.router.add_get(handle_uniq_record)
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner, http_host, http_port)
await site.start()
await log_event(f"HTTP сервер запущен на http://{http_host}:{http_port}")
Плюс обработчик с сортировкой ставить активных клиентов в начало.
async def handle_uniq_record(request) -> web.Response:
'''Обработка HTTP запроса и возврат текущего состояния подключений'''
sorted_items = sorted(uniq_record.items(), key=lambda x: x[1]['connected'], reverse=True)
uniq_record = dict(sorted_items)
return web.json_response(uniq_record)
Следующей заберем вывод записав ошибки в лог с исключением если вывода не будет.
from asyncio.subprocess import Process
async def read_wg_output() -> str:
'''Чтение вывода команды wg'''
proc: Process = await asyncio.create_subprocess_shell(
"wg",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,)
stdout, stderr = await proc.communicate()
if stderr or not stdout.decode().strip():
await log_event(f"Error running, check wg: {stderr.decode()}")
await stop()
raise RuntimeError("Stopping service due to empty or error output from wg")
return stdout.decode()
В следующей функции основная логика, в которой я изрядно поломал голову исключив пустые события возникающие при каждой итерации. Придумал писать состояния клиентов в словарь и вести объем трафика по ним по принципу лоад оверейдж, 3 значения разделенные временем итерации, 5 минут. Пришлось скрупулезно с оглядкой на статью, что дали мне при обучении в skillbox обработать приходящий вывод и добавить условие проверки в tasks = [process_peer(block) for block in peer_blocks if block.strip()]
функции main_loop().
async def process_peer(peer_block) -> None:
'''Функция основной логики: запись событий подключения/отключения, обновление состояния в uniq_record'''
THRESHOLD = 5 # временной порог за который обсчитывать активность или неактивность
key_map = ['transfer', 'transfer 5m', 'transfer 15m']
pattern = re.compile( r'peer:\s*(?P<peer>\S+)'
r'(?:\s*endpoint:\s*(?P<endpoint>[0-9]+\.[0-9]+\.[0-9]+\.[0-9]+:[0-9]+))?
r'(?:.*?latest handshake:\s*'
r'(?:(?P<days>\d{1,2})\s*days?,\s*)?'
r'(?:(?P<hours>\d{1,2})\s*hours?,\s*)?'
r'(?:(?P<minutes>\d{1,2})\s*minutes?,\s*)?'
r'(?P<seconds>\d{1,2})\s*seconds ago)?'
r'(?:.*?transfer:\s*(?P<transfer>.*))?',
re.IGNORECASE | re.DOTALL
)
match = pattern.search(peer_block)
if match:
peer = match.group('peer')
endpoint = match.group('endpoint') or ''
days = int(match.group('days') or 0)
hours = int(match.group('hours') or 0)
minutes = int(match.group('minutes') or 0)
seconds = int(match.group('seconds') or 0)
transfer = match.group('transfer').strip() if match.group('transfer') else ''
user = await get_user_from_peer(peer)
uniq_record.setdefault(user, {
'connected': False,
'transfer': '',
'transfer 5m': '',
'transfer 15m': '',
'current_index': 0
})
if hours == 0 and seconds and minutes <= THRESHOLD and transfer and uniq_record[user]['connected'] != True:
await log_event(f"User {user} connected from {endpoint}")
uniq_record[user]['connected'] = True
elif (hours > 0 or minutes > THRESHOLD) and uniq_record[user]['connected'] != False:
await log_event(f"User {user} {endpoint} disconnected: {transfer}")
uniq_record[user]['connected'] = False
idx = uniq_record[user]['current_index']
uniq_record[user][key_map[idx]] = transfer #записывает текущее значение объема потока в состояние.
uniq_record[user]['current_index'] = (idx + 1) % 3 #циклично перезаписывает индекс 0->1->2->0 и т.д.
В следующей функции get_user_from_peer(peer) привязываем найденного значения peer к реальному имени клиента, сначала подумал взять имена файлов ключей которые создавал именую именем клиента за основу, но решил, что лучше брать имя папки.
async def get_user_from_peer(peer) -> str:
'''Поиск пользователя по peer в файлах клиентов, возврат имени папки пользователя или unknown'''
CLIENTS_PATH = путь до папки с клиентами
for folder, _, files in os.walk(CLIENTS_PATH):
for file in files:
file_path = os.path.join(folder, file)
try:
async with aiofiles.open(file_path, mode='r') as f:
content = await f.read()
if peer in content:
user_folder = os.path.basename(folder)
return user_folder
except Exception:
continue
return "unknown"
Вложенным циклом перебираем файлы в папке ища peer, при наличии возвращаем имя папки в которой он нашелся.
Так, осталось написать функцию для корректного завершения корутин, кстати отличная статья здесь на хабре про асинхронность в пайтоне, я там нашел команду, asyncio.all_tasks()
, которая собирает все корутины в переменную и в цикле уже завершаю каждую.
async def stop() -> None:
'''Остановка всех задач и логирование остановки сервиса'''
await log_event("Wireguard Logging Service Stopped")
tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
for task in tasks:
task.cancel()
await asyncio.gather(*tasks, return_exceptions=True)
Вроде все, осталось обернуть все в класс используя на полную ООП и создать условия запуска.
if __name__ == "__main__":
monitor = WireguardMonitor()
try:
asyncio.run(monitor.main_loop())
except KeyboardInterrupt:
print("Программа остановлена пользователем.")
в которой создаём объект класса и запускаем цикл asincio.run в которой и будет асинхронно выполняться задачи.---
---
или перейти к результату в котором получились приемлемые логи по событию не связанного со служебным циклом и вывод на http страницу объема проходящего трафика разделенного временем с состоянием активности клиентов.
"wg-external": {
"connected": true,
"transfer": "15.55 GiB received, 3.05 GiB sent",
"transfer 5m": "15.52 GiB received, 3.2 GiB sent",
"transfer 15m": "15.50 GiB received, 3.0 GiB sent",
"current_index": 2
},
"guest": {
"connected": false,
"transfer": "",
"transfer 5m": "",
"transfer 15m": "",
"current_index": 2
},
Весь код находится в репе или
в этом блоке
import asyncio
from asyncio.subprocess import Process
import aiofiles
import re
import os
from datetime import datetime
from aiohttp import web
class WireguardMonitor:
def init(self, threshold=30,
log_file="/var/log/wireguard/wireguard.log",
clients_path="/etc/wireguard/clients") -> None:
'''Инициализация параметров и создание необходимых директорий'''
os.makedirs("/var/log/wireguard", exist_ok=True)
self.log_file: str=log_file
self.clients_path: str=clients_path
self.THRESHOLD: int = threshold
self.LOG_FILE: str = log_file
self.CLIENTS_PATH: str = clients_path
self.uniq_record = dict()
self.http_host = '0.0.0.0'
self.http_port = 58105
self.key_map = ['transfer', 'transfer 5m', 'transfer 15m']
self.pattern = re.compile(
r'peer:\s*(?P<peer>\S+)'
r'(?:\s*endpoint:\s*(?P<endpoint>[0-9]+\.[0-9]+\.[0-9]+\.[0-9]+:[0-9]+))?'
r'(?:.*?latest handshake:\s*'
r'(?:(?P<days>\d{1,2})\s*days?,\s*)?'
r'(?:(?P<hours>\d{1,2})\s*hours?,\s*)?'
r'(?:(?P<minutes>\d{1,2})\s*minutes?,\s*)?'
r'(?P<seconds>\d{1,2})\s*seconds ago)?'
r'(?:.*?transfer:\s*(?P<transfer>.*))?',
re.IGNORECASE | re.DOTALL
)
async def stop(self) -> None:
'''Остановка всех задач и логирование остановки сервиса'''
await self.log_event("Wireguard Logging Service stopped")
tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
for task in tasks:
task.cancel()
await asyncio.gather(*tasks, return_exceptions=True)
async def read_wg_output(self) -> str:
'''Чтение вывода команды wg'''
proc: Process = await asyncio.create_subprocess_shell(
"wg",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await proc.communicate()
if stderr or not stdout.decode().strip():
await self.log_event(f"Error running, check wg: {stderr.decode()}")
await self.stop()
raise RuntimeError("Stopping service due to empty or error output from wg")
return stdout.decode()
async def get_user_from_peer(self, peer) -> str:
'''Поиск пользователя по peer в файлах клиентов, возврат имени папки пользователя или "unknown"
peer - публичный ключ клиента взятый из вывода wg
'''
for folder, _, files in os.walk(self.CLIENTS_PATH):
for file in files:
file_path = os.path.join(folder, file)
try:
async with aiofiles.open(file_path, mode='r') as f:
content = await f.read()
if peer in content:
user_folder = os.path.basename(folder)
return user_folder
except Exception:
continue
return "unknown"
async def log_event(self, message) -> None:
'''Функция логирование события в файл с временной меткой
message - текс для записи в файл лога
'''
async with aiofiles.open(self.LOG_FILE, mode='a') as f:
await f.write(f"{datetime.now()} - {message}\n")
async def process_peer(self, peer_block) -> None:
'''Функция основной логики: запись событий подключения/отключения, обновление состояния клиентов в словаре
peer_block - текстовый блок вывода команды wg
'''
match = self.pattern.search(peer_block)
if match:
peer = match.group('peer')
endpoint = match.group('endpoint') or ''
days = int(match.group('days') or 0)
hours = int(match.group('hours') or 0)
minutes = int(match.group('minutes') or 0)
seconds = int(match.group('seconds') or 0)
transfer = match.group('transfer').strip() if match.group('transfer') else ''
user = await self.get_user_from_peer(peer)
self.uniq_record.setdefault(user, {
'connected': False,
'transfer': '',
'transfer 5m': '',
'transfer 15m': '',
'current_index': 0
})
if hours == 0 and seconds and minutes <= self.THRESHOLD and transfer and self.uniq_record[user]['connected'] != True:
await self.log_event(f"User {user} connected from {endpoint}")
self.uniq_record[user]['connected'] = True
elif (hours > 0 or minutes > self.THRESHOLD) and self.uniq_record[user]['connected'] != False:
await self.log_event(f"User {user} {endpoint} disconnected: {transfer}")
self.uniq_record[user]['connected'] = False
idx = self.uniq_record[user]['current_index']
self.uniq_record[user][self.key_map[idx]] = transfer
self.uniq_record[user]['current_index'] = (idx + 1) % 3
async def handle_uniq_record(self, request) -> web.Response:
'''Обработка HTTP запроса и возврат текущего состояния подключений клиентов с сортировкой
активных подключений, ставит на первые позициии.
request - запрос формируемый при открытии или обновления веб страницы на HTTP сервер.
'''
sorted_items = sorted(self.uniq_record.items(), key=lambda x: x[1]['connected'], reverse=True)
self.uniq_record = dict(sorted_items)
return web.json_response(self.uniq_record)
async def start_http_server(self) -> None:
'''Запуск HTTP сервера для отдачи состояния подключений'''
app = web.Application()
app.router.add_get('/uniq_record', self.handle_uniq_record)
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner, self.http_host, self.http_port)
await site.start()
await self.log_event(f"HTTP сервер запущен на http://{self.http_host}:{self.http_port}")
async def main_loop(self) -> None:
'''Основной цикл'''
await self.log_event("Wireguard Logging Service Started")
asyncio.create_task(self.start_http_server())
while True:
try:
wg_output: str = await self.read_wg_output()
peer_blocks = re.split(r"\n(?=peer: )", wg_output.strip())
tasks = [self.process_peer(block) for block in peer_blocks if block.strip()]
if tasks:
await asyncio.gather(*tasks)
await asyncio.sleep(300) # Ожидание перед итерацией 5 минут
except (KeyboardInterrupt, asyncio.CancelledError):
await self.stop()
break
if name == "main":
monitor = WireguardMonitor()
try:
asyncio.run(monitor.main_loop())
except KeyboardInterrupt:
print("Программа остановлена пользователем.")