Всех приветствую!
Послушал обзор курса Мониторинг высоконагруженных систем от OTUS, в котором упоминалось, что используется асинхронный подход в мониторинге и решил реализовать его в установленном по гайду WG. Готового легковесного в интернете ранее не нашел, а вопрос назрел в ввиду оперативного понимания, кто забивает канал, да и логи хотелось бы увидеть о событиях. Если читатель надеется, что тут я опишу изменение исходника, не тратьте время, будем обрабатывать вывод команды wg в консоли Python-ом, который уже есть в ubuntu 20.04 и содержит asincio. Можете по

читать как писался код

Напишем первую функция которая будет создавать корутины с блоками клиентов из вывода `STDOUT` в бесконечном цикле и то ради чего мы это делаем - писать в лог и мониторить. Добавим обработку исключений в процессе создание корутин для остановки по ctrl+c и по ошибкам asyncio плюс паузу 5 минут в цикле по сбору вывода wg. Для мониторинга создадим http сервер, который при задержках можно мгновенно открыть и определить кто забил канал и эту страницу можно опрашивать в панели grafan-ы на будущее.

async def main_loop() -> None:
	'''Основной цикл'''
		await log_event("Wireguard Logging Service Started")
		asyncio.create_task(start_http_server())
		while True:
			try:
				wg_output: str = await read_wg_output()
				peer_blocks = re.split(r"\n(?=peer: )", wg_output.strip())
				tasks = [process_peer(block) for block in peer_blocks]
				if tasks:
					await asyncio.gather(*tasks)
					await asyncio.sleep(300)  # Ожидание перед итерацией 5 минут
			except (KeyboardInterrupt, asyncio.CancelledError):	
				await stop()
				break

Здесь определился с названиями функции, отправил первую запись в лог, определил, что захваченное делится на куски начинающиеся с peer и убрал с вывода пустые символы. Собранные куски переберем с помощью list comprehension и обработаем функцией в которой будет вся логика. Результат отправим исполняться стандартной asyncio.gather(), которая соберет корутины из блоков и отправит в цикл asincio исполняться конкурентно.---

Пишем все те функции название которые придумали.
Первая логирование, получаем текст и через контекстный менеджер добавляем в файл.

import aiofiles
	import os
	from datetime import datetime
	async def log_event(message) -> None:
		'''Функция логирование события в файл с временной меткой'''
		async with aiofiles.open(LOG_FILE, mode='a') as f:	
			await f.write(f"{datetime.now()} - {message}\n")

Во второй опишем веб сервер, добавив в лог событие запуска

from aiohttp import web
 	async def start_http_server() -> None:
	'''Запуск HTTP сервера для отдачи состояния подключений'''
	    app = web.Application()
	    app.router.add_get(handle_uniq_record)
	    runner = web.AppRunner(app)
	    await runner.setup()
	    site = web.TCPSite(runner, http_host, http_port)
 	    await site.start()
	    await log_event(f"HTTP сервер запущен на http://{http_host}:{http_port}")

Плюс обработчик с сортировкой ставить активных клиентов в начало.

async def handle_uniq_record(request) -> web.Response:
	'''Обработка HTTP запроса и возврат текущего состояния подключений'''
		sorted_items = sorted(uniq_record.items(), key=lambda x: x[1]['connected'], reverse=True)
		uniq_record = dict(sorted_items)
		return web.json_response(uniq_record)

Следующей заберем вывод записав ошибки в лог с исключением если вывода не будет.

from asyncio.subprocess import Process
    async def read_wg_output() -> str:
	'''Чтение вывода команды wg'''
	proc: Process = await asyncio.create_subprocess_shell(
		"wg",
		stdout=asyncio.subprocess.PIPE,
		stderr=asyncio.subprocess.PIPE,)
	stdout, stderr = await proc.communicate()
	if stderr or not stdout.decode().strip():
		await log_event(f"Error running, check wg: {stderr.decode()}")
		await stop()
		raise RuntimeError("Stopping service due to empty or error output from wg")
	return stdout.decode()

В следующей функции основная логика, в которой я изрядно поломал голову исключив пустые события возникающие при каждой итерации. Придумал писать состояния клиентов в словарь и вести объем трафика по ним по принципу лоад оверейдж, 3 значения разделенные временем итерации, 5 минут. Пришлось скрупулезно с оглядкой на статью, что дали мне при обучении в skillbox обработать приходящий вывод и добавить условие проверки в tasks = [process_peer(block) for block in peer_blocks if block.strip()] функции main_loop().

async def process_peer(peer_block) -> None:
	'''Функция основной логики: запись событий подключения/отключения, обновление состояния в uniq_record'''
	THRESHOLD = 5 # временной порог за который обсчитывать активность или неактивность
	key_map = ['transfer', 'transfer 5m', 'transfer 15m']
	pattern = re.compile( r'peer:\s*(?P<peer>\S+)'
		r'(?:\s*endpoint:\s*(?P<endpoint>[0-9]+\.[0-9]+\.[0-9]+\.[0-9]+:[0-9]+))?
		r'(?:.*?latest handshake:\s*'
		r'(?:(?P<days>\d{1,2})\s*days?,\s*)?'
		r'(?:(?P<hours>\d{1,2})\s*hours?,\s*)?'
		r'(?:(?P<minutes>\d{1,2})\s*minutes?,\s*)?'
		r'(?P<seconds>\d{1,2})\s*seconds ago)?'
		r'(?:.*?transfer:\s*(?P<transfer>.*))?',
		re.IGNORECASE | re.DOTALL
			)
	match = pattern.search(peer_block)
	if match:
		peer = match.group('peer')
		endpoint = match.group('endpoint') or ''
		days = int(match.group('days') or 0)
		hours = int(match.group('hours') or 0) 
		minutes = int(match.group('minutes') or 0)
		seconds = int(match.group('seconds') or 0)
		transfer = match.group('transfer').strip() if match.group('transfer') else ''
		
	user = await get_user_from_peer(peer)
		uniq_record.setdefault(user, {
		'connected': False,
		'transfer': '',
		'transfer 5m': '',
		'transfer 15m': '',
		'current_index': 0  
		})
	if hours == 0 and seconds and minutes <= THRESHOLD and transfer and uniq_record[user]['connected'] != True:
		await log_event(f"User {user} connected from {endpoint}")
		uniq_record[user]['connected'] = True
	elif (hours > 0 or minutes > THRESHOLD) and uniq_record[user]['connected'] != False:
		await log_event(f"User {user} {endpoint} disconnected: {transfer}")
		uniq_record[user]['connected'] = False
		
	idx = uniq_record[user]['current_index']
	uniq_record[user][key_map[idx]] = transfer #записывает текущее значение объема потока в состояние.
	uniq_record[user]['current_index'] = (idx + 1) % 3 #циклично перезаписывает индекс 0->1->2->0 и т.д. 

В следующей функции get_user_from_peer(peer) привязываем найденного значения peer к реальному имени клиента, сначала подумал взять имена файлов ключей которые создавал именую именем клиента за основу, но решил, что лучше брать имя папки.

async def get_user_from_peer(peer) -> str:
	'''Поиск пользователя по peer в файлах клиентов, возврат имени папки пользователя или unknown'''
	CLIENTS_PATH = путь до папки с клиентами
	for folder, _, files in os.walk(CLIENTS_PATH):
		for file in files:
			file_path = os.path.join(folder, file)
			try:
				async with aiofiles.open(file_path, mode='r') as f:
					content = await f.read()
				if peer in content:
					user_folder = os.path.basename(folder)
					return user_folder
			except Exception:
				continue
	return "unknown"

Вложенным циклом перебираем файлы в папке ища peer, при наличии возвращаем имя папки в которой он нашелся.

Так, осталось написать функцию для корректного завершения корутин, кстати отличная статья здесь на хабре про асинхронность в пайтоне, я там нашел команду, asyncio.all_tasks(), которая собирает все корутины в переменную и в цикле уже завершаю каждую.

async def stop() -> None:
	'''Остановка всех задач и логирование остановки сервиса'''

	await log_event("Wireguard Logging Service Stopped")
	tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
	for task in tasks:
		task.cancel()
	await asyncio.gather(*tasks, return_exceptions=True)

Вроде все, осталось обернуть все в класс используя на полную ООП и создать условия запуска.

if __name__ == "__main__":
	monitor = WireguardMonitor()
	try:
	    asyncio.run(monitor.main_loop())
	except KeyboardInterrupt:
                print("Программа остановлена пользователем.")

в которой создаём объект класса и запускаем цикл asincio.run в которой и будет асинхронно выполняться задачи.---

---

или перейти к результату в котором получились приемлемые логи по событию не связанного со служебным циклом и вывод на http страницу объема проходящего трафика разделенного временем с состоянием активности клиентов.

"wg-external": {
"connected": true,
"transfer": "15.55 GiB received, 3.05 GiB sent",
"transfer 5m": "15.52 GiB received, 3.2 GiB sent",
"transfer 15m": "15.50 GiB received, 3.0 GiB sent",
"current_index": 2
},
"guest": {
"connected": false,
"transfer": "",
"transfer 5m": "",
"transfer 15m": "",
"current_index": 2
},

Весь код находится в репе или

в этом блоке

import asyncio
from asyncio.subprocess import Process
import aiofiles
import re
import os
from datetime import datetime
from aiohttp import web

class WireguardMonitor:
def init(self, threshold=30,
log_file="/var/log/wireguard/wireguard.log",
clients_path="/etc/wireguard/clients") -> None:
'''Инициализация параметров и создание необходимых директорий'''

    os.makedirs("/var/log/wireguard", exist_ok=True)
    self.log_file: str=log_file
    self.clients_path: str=clients_path
    self.THRESHOLD: int = threshold
    self.LOG_FILE: str = log_file
    self.CLIENTS_PATH: str = clients_path
    self.uniq_record = dict()
    self.http_host = '0.0.0.0'
    self.http_port = 58105
    self.key_map = ['transfer', 'transfer 5m', 'transfer 15m']
    self.pattern = re.compile(
        r'peer:\s*(?P<peer>\S+)'
        r'(?:\s*endpoint:\s*(?P<endpoint>[0-9]+\.[0-9]+\.[0-9]+\.[0-9]+:[0-9]+))?'
        r'(?:.*?latest handshake:\s*'
        r'(?:(?P<days>\d{1,2})\s*days?,\s*)?'
        r'(?:(?P<hours>\d{1,2})\s*hours?,\s*)?'
        r'(?:(?P<minutes>\d{1,2})\s*minutes?,\s*)?'
        r'(?P<seconds>\d{1,2})\s*seconds ago)?'
        r'(?:.*?transfer:\s*(?P<transfer>.*))?',
        re.IGNORECASE | re.DOTALL
    )


async def stop(self) -> None:
    '''Остановка всех задач и логирование остановки сервиса'''

    await self.log_event("Wireguard Logging Service stopped")
    tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
    for task in tasks:
        task.cancel()
    await asyncio.gather(*tasks, return_exceptions=True)


async def read_wg_output(self) -> str:
    '''Чтение вывода команды wg'''

    proc: Process = await asyncio.create_subprocess_shell(
        "wg",
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE,
    )
    stdout, stderr = await proc.communicate()
    if stderr or not stdout.decode().strip():
        await self.log_event(f"Error running, check wg: {stderr.decode()}")
        await self.stop()
        raise RuntimeError("Stopping service due to empty or error output from wg")
    return stdout.decode()


async def get_user_from_peer(self, peer) -> str:
    '''Поиск пользователя по peer в файлах клиентов, возврат имени папки пользователя или "unknown"
    peer - публичный ключ клиента взятый из вывода wg
    '''

    for folder, _, files in os.walk(self.CLIENTS_PATH):
        for file in files:
            file_path = os.path.join(folder, file)
            try:
                async with aiofiles.open(file_path, mode='r') as f:
                    content = await f.read()
                if peer in content:
                    user_folder = os.path.basename(folder)
                    return user_folder
            except Exception:
                continue
    return "unknown"


async def log_event(self, message) -> None:
    '''Функция логирование события в файл с временной меткой
    message - текс для записи в файл лога
    '''

    async with aiofiles.open(self.LOG_FILE, mode='a') as f:
        await f.write(f"{datetime.now()} - {message}\n")


async def process_peer(self, peer_block) -> None:
    '''Функция основной логики: запись событий подключения/отключения, обновление состояния клиентов в словаре
    peer_block - текстовый блок вывода команды wg 
    '''

    match = self.pattern.search(peer_block)
    if match:
        peer = match.group('peer')
        endpoint = match.group('endpoint') or ''
        days = int(match.group('days') or 0)
        hours = int(match.group('hours') or 0)
        minutes = int(match.group('minutes') or 0)
        seconds = int(match.group('seconds') or 0)
        transfer = match.group('transfer').strip() if match.group('transfer') else ''

        user = await self.get_user_from_peer(peer)

        self.uniq_record.setdefault(user, {
                                        'connected': False,
                                        'transfer': '',
                                        'transfer 5m': '',
                                        'transfer 15m': '',
                                        'current_index': 0
                                    })

        if hours == 0 and seconds and minutes <= self.THRESHOLD and transfer and self.uniq_record[user]['connected'] != True:
            await self.log_event(f"User {user} connected from {endpoint}")
            self.uniq_record[user]['connected'] = True
        elif (hours > 0 or minutes > self.THRESHOLD) and self.uniq_record[user]['connected'] != False:
            await self.log_event(f"User {user} {endpoint} disconnected: {transfer}")
            self.uniq_record[user]['connected'] = False

        idx = self.uniq_record[user]['current_index']
        self.uniq_record[user][self.key_map[idx]] = transfer
        self.uniq_record[user]['current_index'] = (idx + 1) % 3


async def handle_uniq_record(self, request) -> web.Response:
    '''Обработка HTTP запроса и возврат текущего состояния подключений клиентов с сортировкой
     активных подключений, ставит на первые позициии.
    request - запрос формируемый при открытии или обновления веб страницы на HTTP сервер.
     '''

    sorted_items = sorted(self.uniq_record.items(), key=lambda x: x[1]['connected'], reverse=True)
    self.uniq_record = dict(sorted_items)
    return web.json_response(self.uniq_record)


async def start_http_server(self) -> None:
    '''Запуск HTTP сервера для отдачи состояния подключений'''

    app = web.Application()
    app.router.add_get('/uniq_record', self.handle_uniq_record)
    runner = web.AppRunner(app)
    await runner.setup()
    site = web.TCPSite(runner, self.http_host, self.http_port)
    await site.start()
    await self.log_event(f"HTTP сервер запущен на http://{self.http_host}:{self.http_port}")


async def main_loop(self) -> None:
    '''Основной цикл'''

    await self.log_event("Wireguard Logging Service Started")
    asyncio.create_task(self.start_http_server())
    while True:
        try:
            wg_output: str = await self.read_wg_output()
            peer_blocks = re.split(r"\n(?=peer: )", wg_output.strip())
            tasks = [self.process_peer(block) for block in peer_blocks if block.strip()]
            if tasks:
                await asyncio.gather(*tasks)
            await asyncio.sleep(300)  # Ожидание перед итерацией 5 минут
        except (KeyboardInterrupt, asyncio.CancelledError):
            await self.stop()
            break

if name == "main":
monitor = WireguardMonitor()
try:
asyncio.run(monitor.main_loop())
except KeyboardInterrupt:
print("Программа остановлена пользователем.")

Комментарии (0)