Одноранговая сеть или проще P2P сеть — это сеть в которой все пользователи равны и имеют равные права. Отличительная особенность таких сетей от обычных в том, что в ней нет единого сервера, к которому подключаются пользователи, вместо этого они подключаются друг к другу. Существуют гибридные варианты таких сетей, в котором присутствует сервер, выполняющий только координирующую работу.
Сегодня я хочу предложить простой вариант реализации P2P сервера для такой сети на языке python.
Предыстория
На 1-ом курсе обучения в вузе мне преподаватель по программированию предложил написать мне децентрализованный чат. Язык и модули я выбирал сам. Такое предложение меня сразу заинтересовало, тем более я давно хотел начать изучать python, да и мог не посещать её пары. Недолго думая я согласился и принялся к работе.
Самым трудным для меня оказалось написать серверную часть. Если интерфейс я написал практически сразу, на него я потратил порядка 2 дней, то над серверной частью мне пришлось подзадуматься. Думал я неделю, но зато на следующий день за пару часов написал рабочий P2P сервер для своего чата.
Сервер
Исходный код всего сервера расположен ниже.
import socket
import rsa
from threading import Thread
from time import sleep
import datetime
# P2P сервер
class P2P:
def __init__(self, _port: int, _max_clients: int = 1):
# Индикатор работы сервера
self.running = True
# Порт сервера
self.port = _port
# Максимальное кол-во подключений
self.max_clients = _max_clients
# Подключённые пользователи
self.clients_ip = ["" for i in range(self.max_clients)]
# Словарь с входящими сообщениями
self.incoming_requests = {}
# Логи клиентов
self.clients_logs = [Log for i in range(self.max_clients)]
# Клиентские сокеты
self.client_sockets = [socket.socket() for i in range(self.max_clients)]
# Таймауты клиентов
for i in self.client_sockets:
i.settimeout(0.2)
# Ключи для шифрования исходящих сообщений
self.keys = [rsa.key.PublicKey for i in range(self.max_clients)]
# Ключи для дешифрования входящих сообщений
self.my_keys = [rsa.key.PrivateKey for i in range(self.max_clients)]
# Информация загруженности сокетов
self.socket_busy = [False for i in range(self.max_clients)]
# Чёрный список
self.blacklist = ["127.0.0.1"] + Log.read_and_return_list("blacklist.txt")
# Серверный сокет
self.server_socket = socket.socket()
# Таймаут сервера
self.server_socket.settimeout(0.2)
# Бинд сервера
self.server_socket.bind(('localhost', _port))
self.server_socket.listen(self.max_clients)
self.log = Log("server.log")
self.log.save_data("Server initialized")
# server control
# Создаёт сессию с этим пользователем
def create_session(self, _address: str):
self.log.save_data("Creating session with {}".format(_address))
ind = self.__get_free_socket()
if _address in self.blacklist:
self.log.save_data("{} in blacklist".format(_address))
return
if ind is None:
self.log.save_data("All sockets are busy, can`t connect to {}".format(_address))
return
try:
self.__add_user(_address)
thread = Thread(target=self.__connect, args=(_address, 1))
thread.start()
thread.join(0)
connection, address = self.server_socket.accept()
connection.settimeout(0.2)
except OSError:
self.log.save_data("Failed to create session with {}".format(_address))
self.__del_user(_address)
return
my_key = rsa.newkeys(512)
self.raw_send(_address, my_key[0].save_pkcs1())
key = connection.recv(162).decode()
self.clients_logs[ind].save_data("from {}: {}".format(_address, key))
key = rsa.PublicKey.load_pkcs1(key)
self.__add_keys(_address, key, my_key[1])
while self.running and self.socket_busy[ind]:
try:
data = connection.recv(2048)
except socket.timeout:
continue
except OSError:
self.close_connection(_address)
return
if data:
data = rsa.decrypt(data, self.my_keys[ind])
self.__add_request(_address, data)
try:
self.close_connection(_address)
except TypeError or KeyError:
pass
# Подключается к пользователю
def __connect(self, _address: str, *args):
ind = self.__get_ind_by_address(_address)
try:
self.client_sockets[ind].connect((_address, self.port))
self.socket_busy[ind] = True
return True
except OSError:
return False
# Перезагружает сокет
def __reload_socket(self, _ind: int):
self.client_sockets[_ind].close()
self.client_sockets[_ind] = socket.socket()
self.socket_busy[_ind] = False
# Закрывает соединение
def close_connection(self, _address: str):
ind = self.__get_ind_by_address(_address)
self.__del_key(_address)
self.__reload_socket(ind)
self.__del_user(_address)
# Останавливает сервер
def kill_server(self):
self.running = False
sleep(1)
self.server_socket.close()
self.log.kill_log()
for i in self.client_sockets:
i.close()
for i in self.clients_logs:
try:
i.kill_log()
except TypeError:
pass
# Отправляет сообщение с шифрованием
def send(self, _address: str, _message: str):
ind = self.__get_ind_by_address(_address)
try:
self.clients_logs[ind].save_data("to {}: {}".format(_address, _message))
self.client_sockets[ind].send(rsa.encrypt(_message.encode(), self.keys[ind]))
self.log.save_data("Send message to {}".format(_address))
except OSError:
self.log.save_data("Can`t send message to {}".format(_address))
# Отправляет сообщение без шифрования
def raw_send(self, _address: str, _message: bytes):
ind = self.__get_ind_by_address(_address)
try:
self.client_sockets[ind].send(_message)
self.clients_logs[ind].save_data("to {}: {}".format(_address, _message))
self.log.save_data("Raw send message to {}".format(_address))
except OSError:
self.log.save_data("Raw send to {} Failed".format(_address))
# add
# Добавляет пользователя
def __add_user(self, _address: str):
ind = self.__get_free_socket()
self.clients_logs[ind] = Log("{}.log".format(_address))
self.clients_ip[ind] = _address
self.incoming_requests[_address] = []
self.log.save_data("Added user {}".format(_address))
# Добавляет ключ для шифрования и дешифрования адресу
def __add_keys(self, _address: str, _key: rsa.key.PublicKey, _my_key: rsa.key.PrivateKey):
ind = self.__get_ind_by_address(_address)
try:
self.keys[ind] = _key
self.my_keys[ind] = _my_key
except TypeError:
return
# Добавляет входящее сообщение от адреса
def __add_request(self, _address: str, _message: bytes):
self.incoming_requests[_address].append(_message.decode())
self.clients_logs[self.__get_ind_by_address(_address)].save_data("from {}: {}".format(_address, str(_message)))
self.log.save_data("Get incoming message from {}".format(_address))
# get
# Возвращает индекс первого свободного соккета
# if self.__get_free_socket() is not None: *
def __get_free_socket(self):
for i in range(len(self.socket_busy)):
if not self.socket_busy[i]:
return i
return None
# Возвращает номер индекса, к которому подключён адрес
def __get_ind_by_address(self, _address: str):
for i in range(len(self.clients_ip)):
if self.clients_ip[i] == _address:
return i
else:
return None
# Возвращает входящее сообщение от адреса
def get_request(self, _address: str):
data = self.incoming_requests[_address][0]
self.incoming_requests[_address] = [self.incoming_requests[_address][i]
for i in range(1, len(self.incoming_requests[_address]))]
return data
# check
# Проверяет наличие входящих сообщения от пользователя
# if self.check_request(_address): *
def check_request(self, _address: str):
return bool(self.incoming_requests.get(_address))
# return True if you already connected to _address else False
def check_address(self, _address: str):
return True if _address in self.clients_ip else False
# del
# Удаляет пользователя
def __del_user(self, _address: str):
ind = self.__get_ind_by_address(_address)
self.clients_logs[ind].kill_log()
self.clients_logs[ind] = Log
self.clients_ip[ind] = ""
self.incoming_requests.pop(_address)
self.log.save_data("Deleted user {}".format(_address))
# Удаляет пользователя
def __del_key(self, _address: str):
ind = self.__get_ind_by_address(_address)
self.keys[ind] = rsa.key.PublicKey
self.my_keys[ind] = rsa.key.PrivateKey
# others
# Возвращает число подключённых пользователей
def __len__(self):
num = 0
for i in self.clients_ip:
if i != "":
num += 1
return num
# возвращает Правду если есть хотя бы одно подключение
def __bool__(self):
for i in self.clients_ip:
if i != "":
return True
return False
class Log:
def __init__(self, _name: str):
self.name = _name
try:
self.file = open(_name, "a")
except FileNotFoundError:
self.file = open(_name, "w")
self.save_data("Log started at " + str(datetime.datetime.now()))
self.file.close()
# Сохраняет информацию в файл
def save_data(self, _data: str):
self.file = open(self.name, "a")
self.file.write("{}\n".format(_data))
self.file.close()
# Возвращает данные из файла в виде листа
@staticmethod
def read_and_return_list(_name: str):
try:
file = open(_name, "r")
except FileNotFoundError:
return []
data = file.read()
return data.split("\n")
# Останавливает лог
def kill_log(self):
self.file = open(self.name, "a")
self.save_data("Log stopped at {}\n".format(datetime.datetime.now()))
self.file.close()
А теперь приступим к разбору и объяснению. Все серверные функции мы разделим на условные категории в зависимости от того, что они делают:
- инициализация
- add функции
- del функции
- check функции
- get функции
- server control функции
- Другие функции
Инициализация
def __init__(self, _port: int, _max_clients: int = 1):
# Индикатор работы сервера
self.running = True
# Порт сервера
self.port = _port
# Максимальное кол-во подключений
self.max_clients = _max_clients
# Подключённые пользователи
self.clients_ip = ["" for i in range(self.max_clients)]
# Словарь с входящими сообщениями
self.incoming_requests = {}
# Логи клиентов
self.clients_logs = [Log for i in range(self.max_clients)]
# Клиентские сокеты
self.client_sockets = [socket.socket() for i in range(self.max_clients)]
# Таймауты клиентов
for i in self.client_sockets:
i.settimeout(0.2)
# Ключи для шифрования исходящих сообщений
self.keys = [rsa.key.PublicKey for i in range(self.max_clients)]
# Ключи для дешифрования входящих сообщений
self.my_keys = [rsa.key.PrivateKey for i in range(self.max_clients)]
# Информация загруженности сокетов
self.socket_busy = [False for i in range(self.max_clients)]
# Чёрный список
self.blacklist = ["127.0.0.1"] + Log.read_and_return_list("blacklist.txt")
# Серверный сокет
self.server_socket = socket.socket()
# Таймаут сервера
self.server_socket.settimeout(0.2)
# Бинд сервера
self.server_socket.bind(('localhost', _port))
self.server_socket.listen(self.max_clients)
self.log = Log("server.log")
self.log.save_data("Server initialized")
Для инициализации сервера запросим порт, на котором будем запускать сервер и максимальное кол-во подключений, по умолчанию 1. Сам сервер будет хранить такие данные:
- Индикатор работы
- Порт
- Максимальное кол-во соединений
Листы длиной равной максимальному количеству пользователей:
- Ip подключённых клиентов
- Клиентские сокеты
- Ключи для шифрования
- Ключи для дешифрования
- Индикатор загруженности сокетов
Также объявим чёрный список адресов, который будет загружаться из файла и постоянно содержать адрес "127.0.0.1" во избежание "двойного подключения" к себе самому ( localhost всё ещё доступен), и словарь, который будет хранить входящие сообщения. И нужно установить серверному сокету максимальное кол-во подключений командой listen().
add функции
Все функции в этой категории будут работать только внутри класса. Неправильное обращение с ними может вызвать неправильную работу сервера.
Функция add_user добавляет указанный адрес в рабочие листы сервера, а также запускает лог диалога с пользователем.
def __add_user(self, _address: str):
ind = self.__get_free_socket()
self.clients_logs[ind] = Log("{}.log".format(_address))
self.clients_ip[ind] = _address
self.incoming_requests[_address] = []
self.log.save_data("Added user {}".format(_address))
Функция add_keys добавляет ключи для шифрования и дешифрования указанному адресу.
def __add_keys(self, _address: str, _key: rsa.key.PublicKey, _my_key: rsa.key.PrivateKey):
ind = self.__get_ind_by_address(_address)
try:
self.keys[ind] = _key
self.my_keys[ind] = _my_key
except TypeError:
return
И последняя функция add_request добавляет в словарь входящих сообщений сообщение от указанного адреса.
def __add_request(self, _address: str, _message: bytes):
self.incoming_requests[_address].append(_message.decode())
self.clients_logs[self.__get_ind_by_address(_address)].save_data("from {}: {}".format(_address, str(_message)))
self.log.save_data("Get incoming message from {}".format(_address))
del функции
Эти функции также как и прошлые работают с внутренними данными сервера. Но они, в отличии от прошлых, удаляют данные а не добавляют.
Функция del_user противоположна функции add_user. Она удаляет всё, что связано с указанным адресом с сервером, а также закрывает лог.
def __del_user(self, _address: str):
ind = self.__get_ind_by_address(_address)
self.clients_logs[ind].kill_log()
self.clients_logs[ind] = Log
self.clients_ip[ind] = ""
self.incoming_requests.pop(_address)
self.log.save_data("Deleted user {}".format(_address))
Функция del_key удаляет ключи для шифрования и дешифрования указанного адреса.
def __del_key(self, _address: str):
ind = self.__get_ind_by_address(_address)
self.keys[ind] = rsa.key.PublicKey
self.my_keys[ind] = rsa.key.PrivateKey
check функции
Эти функции направлены на получение информации о данных на сервере.
Функция check_request проверяет наличие входящих сообщение от указанного адреса и возвращает его в виде True при наличии или False при отсутствии.
def check_request(self, _address: str):
return bool(self.incoming_requests.get(_address))
Функции check_address проверяет есть ли указанный адрес среди подключённых пользователей или нет и возвращает True, если он есть или False, если его нет.
def check_address(self, _address: str):
return True if _address in self.clients_ip else False
get функции
Функция get_free_socket только для внутренней работы сервера и возвращает индекс свободного соккета, если такие есть, иначе ничего.
def __get_free_socket(self):
for i in range(len(self.socket_busy)):
if not self.socket_busy[i]:
return i
return None
Функция get_ind_by_address тоже только для внутренней работы, она возвращает номер соккета, к которому подключён данный адрес или ничего, если адрес никуда не подключён.
def __get_ind_by_address(self, _address: str):
for i in range(len(self.clients_ip)):
if self.clients_ip[i] == _address:
return i
else:
return None
И последняя функция get_request возвращает первое сообщение от указанного адреса и удаляет его из сервера. Она выкинет ошибку, если сообщений нет вообще.
def get_request(self, _address: str):
data = self.incoming_requests[_address][0]
self.incoming_requests[_address] = [self.incoming_requests[_address][i]
for i in range(1, len(self.incoming_requests[_address]))]
return data
server control функции
Это основные функции работы сервера, в них заключается логика работы сервера.
Одна из самых важных функций — create_session — она устанавливает соединение с указанным адресом. Здесь осуществляется проверка наличия адреса в в чёрном списке, загруженность соккетов, осуществляется обмен ключами шифрования при успешном подключении и запускается цикл прослушивания соккета, который получает сообщени я и работает с ними.
def create_session(self, _address: str):
self.log.save_data("Creating session with {}".format(_address))
ind = self.__get_free_socket()
if _address in self.blacklist:
self.log.save_data("{} in blacklist".format(_address))
return
if ind is None:
self.log.save_data("All sockets are busy, can`t connect to {}".format(_address))
return
try:
self.__add_user(_address)
thread = Thread(target=self.__connect, args=(_address, 1))
thread.start()
thread.join(0)
connection, address = self.server_socket.accept()
connection.settimeout(0.2)
except OSError:
self.log.save_data("Failed to create session with {}".format(_address))
self.__del_user(_address)
return
my_key = rsa.newkeys(512)
self.raw_send(_address, my_key[0].save_pkcs1())
key = connection.recv(162).decode()
self.clients_logs[ind].save_data("from {}: {}".format(_address, key))
key = rsa.PublicKey.load_pkcs1(key)
self.__add_keys(_address, key, my_key[1])
while self.running and self.socket_busy[ind]:
try:
data = connection.recv(2048)
except socket.timeout:
continue
except OSError:
self.close_connection(_address)
return
if data:
data = rsa.decrypt(data, self.my_keys[ind])
self.__add_request(_address, data)
try:
self.close_connection(_address)
except TypeError or KeyError:
pass
Функция connect осуществляет подключение к пользователю с указанным адресом и возвращает True при успехе или False при неудаче. Использовать её стоит только внутри сервера.
def __connect(self, _address: str, *args):
ind = self.__get_ind_by_address(_address)
try:
self.client_sockets[ind].connect((_address, self.port))
self.socket_busy[ind] = True
return True
except OSError:
return False
Функция close_connection закрывает соединение с указанным адресом.
def close_connection(self, _address: str):
ind = self.__get_ind_by_address(_address)
self.__del_key(_address)
self.__reload_socket(ind)
self.__del_user(_address)
Функция kill_server полностью выключает сервер.
def kill_server(self):
self.running = False
sleep(1)
self.server_socket.close()
self.log.kill_log()
for i in self.client_sockets:
i.close()
for i in self.clients_logs:
try:
i.kill_log()
except TypeError:
pass
И последняя функция reload_socket, предназначенная для использования внутри самого сервера, перезагружает сокет с указанным индексом.
def __reload_socket(self, _ind: int):
self.client_sockets[_ind].close()
self.client_sockets[_ind] = socket.socket()
self.socket_busy[_ind] = False
Другие функции
Функция bool возвращает True, если есть хоть какое-нибудь подключение, или False, если таких нет.
def __bool__(self):
for i in self.clients_ip:
if i != "":
return True
return False
Функция len возвращает количество подключённых к серверу клиентов.
def __len__(self):
num = 0
for i in self.clients_ip:
if i != "":
num += 1
return num
Лог
Также стоить написать небольшой лог для сервера, который будет документировать процесс работы сервера и процесс обмена сообщения между пользователями. Стоит сказать, что открытие и закрытие файла при каждой записи необходимо в данном случае. Так как при вылете сервера или программы, где он задействован может потребоваться проверить лог работы сервера и данные сохранятся только в таком случае.
А теперь разберём функции на пальцах, тем более здесь ничего сложного нет.
Для инициализации сервера потребуется только имя файла. Сначала мы попробуем открыть файл на до запись, но если такого файла нет, то создадим его. Сразу же в файл запишем время старта лога.
def __init__(self, _name: str):
self.name = _name
try:
self.file = open(_name, "a")
except FileNotFoundError:
self.file = open(_name, "w")
self.save_data("Log started at " + str(datetime.datetime.now()))
self.file.close()
Функция save_data сохраняет в файл указанное сообщение.
def save_data(self, _data: str):
self.file = open(self.name, "a")
self.file.write("{}\n".format(_data))
self.file.close()
Статическая функция read_and_return_list не требует объекта класса для использования, но требует для своей работы имя файла из которого будет взята вся информация и возвращена в виде листа.
@staticmethod
def read_and_return_list(_name: str):
try:
file = open(_name, "r")
except FileNotFoundError:
return []
data = file.read()
return data.split("\n")
И последняя функция kill_log записывает в файл время остановки лога и закрывает файл.
def kill_log(self):
self.file = open(self.name, "a")
self.save_data("Log stopped at {}\n".format(datetime.datetime.now()))
self.file.close()
Заключение
Написать сервер для одноранговой сети не сложно, но, с другой стороны, есть куда двигаться. Можно реализовать отправку файлов и изображений, скачивание их по частям сразу от нескольких пользователей. Можно усовершенствовать лог или добавить шифрование на отправку ключей для шифрования и дешифрования.
Если есть какие-либо предложения по этому поводу или варианты улучшения кода буду рад почитать в комментариях.
mrevgenx
Проделана отличная работа, поддерживаю способ обучения через деланье, при наличии хотя бы минимальной теоретической базы. Препод попался, походу, крутой — это хорошо.
Что касается статьи, мне кажется, гораздо ценнее было бы обрисовать общую архитектуру, схему взаимодействия участников сети. Скриншотики работы программы? А также, почему было самым трудным написать серверную часть, в чем возникли самые большие сложности?
Не лучше ли приведенный код положить на GitHub и оставить ссылку? Класс сервера написан, а где сама логика, которая реализует работу сервера? На чем реализован клиент, про него ни слова, а жаль? Могу ли я у себя взять и запустить, придется до этого еще додуматься?
Слова нисколько не дополняют код и не помогают в нем разобраться. Кажется, словесное описание работы функций лучше положить в python-docstring и хранить прямо в репозитории, а статью полностью освободить от этого. Тем более, что код некоторых функций говорит за себя не менее красноречиво, чем любые слова. Как считаете?
По поводу кода из очевидного я бы отметил, что в классе Log функция read_and_return_list не к месту, она больше относится к конфигурации, чем к логгированию. А также, рекомендую обратить внимание на встроенный модуль logging, который умеет очень много чего и явно сюда просится.
player7004 Автор
Спасибо за отклик! В процессе написания сейчас находится статья на тему разработки всего чата, там будет и ссылка на репозиторий github с исходным кодом всего проекта. Функцию read_and_return_list отнёс в класс лог для собственного удобства. Модуль logging обязательно посмотрю.