Одноранговая сеть или проще P2P сеть — это сеть в которой все пользователи равны и имеют равные права. Отличительная особенность таких сетей от обычных в том, что в ней нет единого сервера, к которому подключаются пользователи, вместо этого они подключаются друг к другу. Существуют гибридные варианты таких сетей, в котором присутствует сервер, выполняющий только координирующую работу.


Сегодня я хочу предложить простой вариант реализации P2P сервера для такой сети на языке python.


Предыстория


На 1-ом курсе обучения в вузе мне преподаватель по программированию предложил написать мне децентрализованный чат. Язык и модули я выбирал сам. Такое предложение меня сразу заинтересовало, тем более я давно хотел начать изучать python, да и мог не посещать её пары. Недолго думая я согласился и принялся к работе.


Самым трудным для меня оказалось написать серверную часть. Если интерфейс я написал практически сразу, на него я потратил порядка 2 дней, то над серверной частью мне пришлось подзадуматься. Думал я неделю, но зато на следующий день за пару часов написал рабочий P2P сервер для своего чата.


Сервер


Исходный код всего сервера расположен ниже.


Импорты
import socket
import rsa
from threading import Thread
from time import sleep
import datetime

Сервер
# P2P сервер
class P2P:
    def __init__(self, _port: int, _max_clients: int = 1):
        # Индикатор работы сервера
        self.running = True
        # Порт сервера
        self.port = _port
        # Максимальное кол-во подключений
        self.max_clients = _max_clients
        # Подключённые пользователи
        self.clients_ip = ["" for i in range(self.max_clients)]
        # Словарь с входящими сообщениями
        self.incoming_requests = {}
        # Логи клиентов
        self.clients_logs = [Log for i in range(self.max_clients)]
        # Клиентские сокеты
        self.client_sockets = [socket.socket() for i in range(self.max_clients)]
        # Таймауты клиентов
        for i in self.client_sockets:
            i.settimeout(0.2)
        # Ключи для шифрования исходящих сообщений
        self.keys = [rsa.key.PublicKey for i in range(self.max_clients)]
        # Ключи для дешифрования входящих сообщений
        self.my_keys = [rsa.key.PrivateKey for i in range(self.max_clients)]
        # Информация загруженности сокетов
        self.socket_busy = [False for i in range(self.max_clients)]
        # Чёрный список
        self.blacklist = ["127.0.0.1"] + Log.read_and_return_list("blacklist.txt")
        # Серверный сокет
        self.server_socket = socket.socket()
        # Таймаут сервера
        self.server_socket.settimeout(0.2)
        # Бинд сервера
        self.server_socket.bind(('localhost', _port))
        self.server_socket.listen(self.max_clients)
        self.log = Log("server.log")
        self.log.save_data("Server initialized")

    # server control

    # Создаёт сессию с этим пользователем
    def create_session(self, _address: str):
        self.log.save_data("Creating session with {}".format(_address))
        ind = self.__get_free_socket()
        if _address in self.blacklist:
            self.log.save_data("{} in blacklist".format(_address))
            return
        if ind is None:
            self.log.save_data("All sockets are busy, can`t connect to {}".format(_address))
            return
        try:
            self.__add_user(_address)
            thread = Thread(target=self.__connect, args=(_address, 1))
            thread.start()
            thread.join(0)
            connection, address = self.server_socket.accept()
            connection.settimeout(0.2)
        except OSError:
            self.log.save_data("Failed to create session with {}".format(_address))
            self.__del_user(_address)
            return
        my_key = rsa.newkeys(512)
        self.raw_send(_address, my_key[0].save_pkcs1())
        key = connection.recv(162).decode()
        self.clients_logs[ind].save_data("from {}: {}".format(_address, key))
        key = rsa.PublicKey.load_pkcs1(key)
        self.__add_keys(_address, key, my_key[1])
        while self.running and self.socket_busy[ind]:
            try:
                data = connection.recv(2048)
            except socket.timeout:
                continue
            except OSError:
                self.close_connection(_address)
                return
            if data:
                data = rsa.decrypt(data, self.my_keys[ind])
                self.__add_request(_address, data)
        try:
            self.close_connection(_address)
        except TypeError or KeyError:
            pass

    # Подключается к пользователю
    def __connect(self, _address: str, *args):
        ind = self.__get_ind_by_address(_address)
        try:
            self.client_sockets[ind].connect((_address, self.port))
            self.socket_busy[ind] = True
            return True
        except OSError:
            return False

    # Перезагружает сокет
    def __reload_socket(self, _ind: int):
        self.client_sockets[_ind].close()
        self.client_sockets[_ind] = socket.socket()
        self.socket_busy[_ind] = False

    # Закрывает соединение
    def close_connection(self, _address: str):
        ind = self.__get_ind_by_address(_address)
        self.__del_key(_address)
        self.__reload_socket(ind)
        self.__del_user(_address)

    # Останавливает сервер
    def kill_server(self):
        self.running = False
        sleep(1)
        self.server_socket.close()
        self.log.kill_log()
        for i in self.client_sockets:
            i.close()
        for i in self.clients_logs:
            try:
                i.kill_log()
            except TypeError:
                pass

    # Отправляет сообщение с шифрованием
    def send(self, _address: str, _message: str):
        ind = self.__get_ind_by_address(_address)
        try:
            self.clients_logs[ind].save_data("to {}: {}".format(_address, _message))
            self.client_sockets[ind].send(rsa.encrypt(_message.encode(), self.keys[ind]))
            self.log.save_data("Send message to {}".format(_address))
        except OSError:
            self.log.save_data("Can`t send message to {}".format(_address))

    # Отправляет сообщение без шифрования
    def raw_send(self, _address: str, _message: bytes):
        ind = self.__get_ind_by_address(_address)
        try:
            self.client_sockets[ind].send(_message)
            self.clients_logs[ind].save_data("to {}: {}".format(_address, _message))
            self.log.save_data("Raw send message to {}".format(_address))
        except OSError:
            self.log.save_data("Raw send to {} Failed".format(_address))
    # add

    # Добавляет пользователя
    def __add_user(self, _address: str):
        ind = self.__get_free_socket()
        self.clients_logs[ind] = Log("{}.log".format(_address))
        self.clients_ip[ind] = _address
        self.incoming_requests[_address] = []
        self.log.save_data("Added user {}".format(_address))

    # Добавляет ключ для шифрования и дешифрования адресу
    def __add_keys(self, _address: str, _key: rsa.key.PublicKey, _my_key: rsa.key.PrivateKey):
        ind = self.__get_ind_by_address(_address)
        try:
            self.keys[ind] = _key
            self.my_keys[ind] = _my_key
        except TypeError:
            return

    # Добавляет входящее сообщение от адреса
    def __add_request(self, _address: str, _message: bytes):
        self.incoming_requests[_address].append(_message.decode())
        self.clients_logs[self.__get_ind_by_address(_address)].save_data("from {}: {}".format(_address, str(_message)))
        self.log.save_data("Get incoming message from {}".format(_address))

    # get

    # Возвращает индекс первого свободного соккета
    # if self.__get_free_socket() is not None: *
    def __get_free_socket(self):
        for i in range(len(self.socket_busy)):
            if not self.socket_busy[i]:
                return i
        return None

    # Возвращает номер индекса, к которому подключён адрес
    def __get_ind_by_address(self, _address: str):
        for i in range(len(self.clients_ip)):
            if self.clients_ip[i] == _address:
                return i
        else:
            return None

    # Возвращает входящее сообщение от адреса
    def get_request(self, _address: str):
        data = self.incoming_requests[_address][0]
        self.incoming_requests[_address] = [self.incoming_requests[_address][i]
                                            for i in range(1, len(self.incoming_requests[_address]))]
        return data

    # check

    # Проверяет наличие входящих сообщения от пользователя
    # if self.check_request(_address): *
    def check_request(self, _address: str):
        return bool(self.incoming_requests.get(_address))

    # return True if you already connected to _address else False
    def check_address(self, _address: str):
        return True if _address in self.clients_ip else False

    # del

    # Удаляет пользователя
    def __del_user(self, _address: str):
        ind = self.__get_ind_by_address(_address)
        self.clients_logs[ind].kill_log()
        self.clients_logs[ind] = Log
        self.clients_ip[ind] = ""
        self.incoming_requests.pop(_address)
        self.log.save_data("Deleted user {}".format(_address))

    # Удаляет пользователя
    def __del_key(self, _address: str):
        ind = self.__get_ind_by_address(_address)
        self.keys[ind] = rsa.key.PublicKey
        self.my_keys[ind] = rsa.key.PrivateKey

    # others

    # Возвращает число подключённых пользователей
    def __len__(self):
        num = 0
        for i in self.clients_ip:
            if i != "":
                num += 1
        return num

    # возвращает Правду если есть хотя бы одно подключение
    def __bool__(self):
        for i in self.clients_ip:
            if i != "":
                return True
        return False

Лог
class Log:
    def __init__(self, _name: str):
        self.name = _name
        try:
            self.file = open(_name, "a")
        except FileNotFoundError:
            self.file = open(_name, "w")
        self.save_data("Log started at " + str(datetime.datetime.now()))
        self.file.close()

    # Сохраняет информацию в файл
    def save_data(self, _data: str):
        self.file = open(self.name, "a")
        self.file.write("{}\n".format(_data))
        self.file.close()

    # Возвращает данные из файла в виде листа
    @staticmethod
    def read_and_return_list(_name: str):
        try:
            file = open(_name, "r")
        except FileNotFoundError:
            return []
        data = file.read()
        return data.split("\n")

    # Останавливает лог
    def kill_log(self):
        self.file = open(self.name, "a")
        self.save_data("Log stopped at {}\n".format(datetime.datetime.now()))
        self.file.close()

А теперь приступим к разбору и объяснению. Все серверные функции мы разделим на условные категории в зависимости от того, что они делают:


  • инициализация
  • add функции
  • del функции
  • check функции
  • get функции
  • server control функции
  • Другие функции

Инициализация


init
    def __init__(self, _port: int, _max_clients: int = 1):
        # Индикатор работы сервера
        self.running = True
        # Порт сервера
        self.port = _port
        # Максимальное кол-во подключений
        self.max_clients = _max_clients
        # Подключённые пользователи
        self.clients_ip = ["" for i in range(self.max_clients)]
        # Словарь с входящими сообщениями
        self.incoming_requests = {}
        # Логи клиентов
        self.clients_logs = [Log for i in range(self.max_clients)]
        # Клиентские сокеты
        self.client_sockets = [socket.socket() for i in range(self.max_clients)]
        # Таймауты клиентов
        for i in self.client_sockets:
            i.settimeout(0.2)
        # Ключи для шифрования исходящих сообщений
        self.keys = [rsa.key.PublicKey for i in range(self.max_clients)]
        # Ключи для дешифрования входящих сообщений
        self.my_keys = [rsa.key.PrivateKey for i in range(self.max_clients)]
        # Информация загруженности сокетов
        self.socket_busy = [False for i in range(self.max_clients)]
        # Чёрный список
        self.blacklist = ["127.0.0.1"] + Log.read_and_return_list("blacklist.txt")
        # Серверный сокет
        self.server_socket = socket.socket()
        # Таймаут сервера
        self.server_socket.settimeout(0.2)
        # Бинд сервера
        self.server_socket.bind(('localhost', _port))
        self.server_socket.listen(self.max_clients)
        self.log = Log("server.log")
        self.log.save_data("Server initialized")

Для инициализации сервера запросим порт, на котором будем запускать сервер и максимальное кол-во подключений, по умолчанию 1. Сам сервер будет хранить такие данные:


  • Индикатор работы
  • Порт
  • Максимальное кол-во соединений

Листы длиной равной максимальному количеству пользователей:


  • Ip подключённых клиентов
  • Клиентские сокеты
  • Ключи для шифрования
  • Ключи для дешифрования
  • Индикатор загруженности сокетов

Также объявим чёрный список адресов, который будет загружаться из файла и постоянно содержать адрес "127.0.0.1" во избежание "двойного подключения" к себе самому ( localhost всё ещё доступен), и словарь, который будет хранить входящие сообщения. И нужно установить серверному сокету максимальное кол-во подключений командой listen().


add функции


Все функции в этой категории будут работать только внутри класса. Неправильное обращение с ними может вызвать неправильную работу сервера.


Функция add_user добавляет указанный адрес в рабочие листы сервера, а также запускает лог диалога с пользователем.


add_user
    def __add_user(self, _address: str):
        ind = self.__get_free_socket()
        self.clients_logs[ind] = Log("{}.log".format(_address))
        self.clients_ip[ind] = _address
        self.incoming_requests[_address] = []
        self.log.save_data("Added user {}".format(_address))

Функция add_keys добавляет ключи для шифрования и дешифрования указанному адресу.


add_keys
    def __add_keys(self, _address: str, _key: rsa.key.PublicKey, _my_key: rsa.key.PrivateKey):
        ind = self.__get_ind_by_address(_address)
        try:
            self.keys[ind] = _key
            self.my_keys[ind] = _my_key
        except TypeError:
            return

И последняя функция add_request добавляет в словарь входящих сообщений сообщение от указанного адреса.


add_request
    def __add_request(self, _address: str, _message: bytes):
        self.incoming_requests[_address].append(_message.decode())
        self.clients_logs[self.__get_ind_by_address(_address)].save_data("from {}: {}".format(_address, str(_message)))
        self.log.save_data("Get incoming message from {}".format(_address))

del функции


Эти функции также как и прошлые работают с внутренними данными сервера. Но они, в отличии от прошлых, удаляют данные а не добавляют.


Функция del_user противоположна функции add_user. Она удаляет всё, что связано с указанным адресом с сервером, а также закрывает лог.


del_user
    def __del_user(self, _address: str):
        ind = self.__get_ind_by_address(_address)
        self.clients_logs[ind].kill_log()
        self.clients_logs[ind] = Log
        self.clients_ip[ind] = ""
        self.incoming_requests.pop(_address)
        self.log.save_data("Deleted user {}".format(_address))

Функция del_key удаляет ключи для шифрования и дешифрования указанного адреса.


del_key
    def __del_key(self, _address: str):
        ind = self.__get_ind_by_address(_address)
        self.keys[ind] = rsa.key.PublicKey
        self.my_keys[ind] = rsa.key.PrivateKey

check функции


Эти функции направлены на получение информации о данных на сервере.


Функция check_request проверяет наличие входящих сообщение от указанного адреса и возвращает его в виде True при наличии или False при отсутствии.


check_request
    def check_request(self, _address: str):
        return bool(self.incoming_requests.get(_address))

Функции check_address проверяет есть ли указанный адрес среди подключённых пользователей или нет и возвращает True, если он есть или False, если его нет.


check_address
    def check_address(self, _address: str):
        return True if _address in self.clients_ip else False

get функции


Функция get_free_socket только для внутренней работы сервера и возвращает индекс свободного соккета, если такие есть, иначе ничего.


get_free_socket
    def __get_free_socket(self):
        for i in range(len(self.socket_busy)):
            if not self.socket_busy[i]:
                return i
        return None

Функция get_ind_by_address тоже только для внутренней работы, она возвращает номер соккета, к которому подключён данный адрес или ничего, если адрес никуда не подключён.


get_ind_by_address
    def __get_ind_by_address(self, _address: str):
        for i in range(len(self.clients_ip)):
            if self.clients_ip[i] == _address:
                return i
        else:
            return None

И последняя функция get_request возвращает первое сообщение от указанного адреса и удаляет его из сервера. Она выкинет ошибку, если сообщений нет вообще.


get_request
    def get_request(self, _address: str):
        data = self.incoming_requests[_address][0]
        self.incoming_requests[_address] = [self.incoming_requests[_address][i]
                                            for i in range(1, len(self.incoming_requests[_address]))]
        return data

server control функции


Это основные функции работы сервера, в них заключается логика работы сервера.


Одна из самых важных функций — create_session — она устанавливает соединение с указанным адресом. Здесь осуществляется проверка наличия адреса в в чёрном списке, загруженность соккетов, осуществляется обмен ключами шифрования при успешном подключении и запускается цикл прослушивания соккета, который получает сообщени я и работает с ними.


create_session
    def create_session(self, _address: str):
        self.log.save_data("Creating session with {}".format(_address))
        ind = self.__get_free_socket()
        if _address in self.blacklist:
            self.log.save_data("{} in blacklist".format(_address))
            return
        if ind is None:
            self.log.save_data("All sockets are busy, can`t connect to {}".format(_address))
            return
        try:
            self.__add_user(_address)
            thread = Thread(target=self.__connect, args=(_address, 1))
            thread.start()
            thread.join(0)
            connection, address = self.server_socket.accept()
            connection.settimeout(0.2)
        except OSError:
            self.log.save_data("Failed to create session with {}".format(_address))
            self.__del_user(_address)
            return
        my_key = rsa.newkeys(512)
        self.raw_send(_address, my_key[0].save_pkcs1())
        key = connection.recv(162).decode()
        self.clients_logs[ind].save_data("from {}: {}".format(_address, key))
        key = rsa.PublicKey.load_pkcs1(key)
        self.__add_keys(_address, key, my_key[1])
        while self.running and self.socket_busy[ind]:
            try:
                data = connection.recv(2048)
            except socket.timeout:
                continue
            except OSError:
                self.close_connection(_address)
                return
            if data:
                data = rsa.decrypt(data, self.my_keys[ind])
                self.__add_request(_address, data)
        try:
            self.close_connection(_address)
        except TypeError or KeyError:
            pass

Функция connect осуществляет подключение к пользователю с указанным адресом и возвращает True при успехе или False при неудаче. Использовать её стоит только внутри сервера.


connect
    def __connect(self, _address: str, *args):
        ind = self.__get_ind_by_address(_address)
        try:
            self.client_sockets[ind].connect((_address, self.port))
            self.socket_busy[ind] = True
            return True
        except OSError:
            return False

Функция close_connection закрывает соединение с указанным адресом.


close_connection
    def close_connection(self, _address: str):
        ind = self.__get_ind_by_address(_address)
        self.__del_key(_address)
        self.__reload_socket(ind)
        self.__del_user(_address)

Функция kill_server полностью выключает сервер.


kill_server
    def kill_server(self):
        self.running = False
        sleep(1)
        self.server_socket.close()
        self.log.kill_log()
        for i in self.client_sockets:
            i.close()
        for i in self.clients_logs:
            try:
                i.kill_log()
            except TypeError:
                pass

И последняя функция reload_socket, предназначенная для использования внутри самого сервера, перезагружает сокет с указанным индексом.


reload_socket
    def __reload_socket(self, _ind: int):
        self.client_sockets[_ind].close()
        self.client_sockets[_ind] = socket.socket()
        self.socket_busy[_ind] = False

Другие функции


Функция bool возвращает True, если есть хоть какое-нибудь подключение, или False, если таких нет.


bool
    def __bool__(self):
        for i in self.clients_ip:
            if i != "":
                return True
        return False

Функция len возвращает количество подключённых к серверу клиентов.


len
    def __len__(self):
        num = 0
        for i in self.clients_ip:
            if i != "":
                num += 1
        return num

Лог


Также стоить написать небольшой лог для сервера, который будет документировать процесс работы сервера и процесс обмена сообщения между пользователями. Стоит сказать, что открытие и закрытие файла при каждой записи необходимо в данном случае. Так как при вылете сервера или программы, где он задействован может потребоваться проверить лог работы сервера и данные сохранятся только в таком случае.


А теперь разберём функции на пальцах, тем более здесь ничего сложного нет.


Для инициализации сервера потребуется только имя файла. Сначала мы попробуем открыть файл на до запись, но если такого файла нет, то создадим его. Сразу же в файл запишем время старта лога.


init
    def __init__(self, _name: str):
        self.name = _name
        try:
            self.file = open(_name, "a")
        except FileNotFoundError:
            self.file = open(_name, "w")
        self.save_data("Log started at " + str(datetime.datetime.now()))
        self.file.close()

Функция save_data сохраняет в файл указанное сообщение.


save_data
    def save_data(self, _data: str):
        self.file = open(self.name, "a")
        self.file.write("{}\n".format(_data))
        self.file.close()

Статическая функция read_and_return_list не требует объекта класса для использования, но требует для своей работы имя файла из которого будет взята вся информация и возвращена в виде листа.


read_and_return_list
    @staticmethod
    def read_and_return_list(_name: str):
        try:
            file = open(_name, "r")
        except FileNotFoundError:
            return []
        data = file.read()
        return data.split("\n")

И последняя функция kill_log записывает в файл время остановки лога и закрывает файл.


kill_log
    def kill_log(self):
        self.file = open(self.name, "a")
        self.save_data("Log stopped at {}\n".format(datetime.datetime.now()))
        self.file.close()

Заключение


Написать сервер для одноранговой сети не сложно, но, с другой стороны, есть куда двигаться. Можно реализовать отправку файлов и изображений, скачивание их по частям сразу от нескольких пользователей. Можно усовершенствовать лог или добавить шифрование на отправку ключей для шифрования и дешифрования.


Если есть какие-либо предложения по этому поводу или варианты улучшения кода буду рад почитать в комментариях.