На днях мне пришла в голову мысль, что было бы здорово написать простой Redis-подобный сервер баз данных. Хотя у меня значительный опыт работы с приложениями WSGI, сервер базы данных представил новый вызов и оказался хорошей практикой в процессе обучения работе с сокетами в Python. В этой статье расскажу, что я узнал в процессе исследования.


Цель моего проекта заключалась в том, чтобы написать простой сервер, который я мог бы использовать с очередью задач под названием huey. Huey использует Redis в качестве механизма хранения по умолчанию для отслеживания заданий в очереди, результатов выполнения и других вещей. В статье я сократил исходный код проекта, чтобы обойтись без воды; недостающий код вы можете легко дописать сами, но если интересно, можете взглянуть на конечный результат.


Сервер, который мы будем создавать, сможет отвечать на следующие команды:


  • GET <key>
  • SET <key> <value>
  • DELETE <key>
  • FLUSH
  • MGET <key1> ... <keyn>
  • MSET <key1> <value1> ... <keyn> <valuen>

Также будем поддерживать следующие типы данных:


  • Строки и двоичные данные.
  • Числа.
  • NULL.
  • Массивы (могут быть вложенными).
  • Словари (могут быть вложенными).
  • Сообщения об ошибках.

Чтобы обрабатывать несколько клиентов асинхронно, мы будем использовать gevent, но вы также можете использовать модуль SocketServer стандартной библиотеки с помощью ForkingMixin или ThreadingMixin.


Скелет проекта


Давайте создадим скелет для нашего сервера. Нам понадобится сам сервер и функция обратного вызова, которая будет выполняться при подключении нового клиента. Кроме того, понадобится какая-то логика для обработки запроса от клиента и отправки ему ответа.


Начнем:


from gevent import socket
from gevent.pool import Pool
from gevent.server import StreamServer

from collections import namedtuple
from io import BytesIO
from socket import error as socket_error

# We'll use exceptions to notify the connection-handling loop of problems.
class CommandError(Exception): pass
class Disconnect(Exception): pass

Error = namedtuple('Error', ('message',))

class ProtocolHandler(object):
    def handle_request(self, socket_file):
        # Parse a request from the client into it's component parts.
        pass

    def write_response(self, socket_file, data):
        # Serialize the response data and send it to the client.
        pass

class Server(object):
    def __init__(self, host='127.0.0.1', port=31337, max_clients=64):
        self._pool = Pool(max_clients)
        self._server = StreamServer(
            (host, port),
            self.connection_handler,
            spawn=self._pool)

        self._protocol = ProtocolHandler()
        self._kv = {}

    def connection_handler(self, conn, address):
        # Convert "conn" (a socket object) into a file-like object.
        socket_file = conn.makefile('rwb')

        # Process client requests until client disconnects.
        while True:
            try:
                data = self._protocol.handle_request(socket_file)
            except Disconnect:
                break

            try:
                resp = self.get_response(data)
            except CommandError as exc:
                resp = Error(exc.args[0])

            self._protocol.write_response(socket_file, resp)

    def get_response(self, data):
        # Here we'll actually unpack the data sent by the client, execute the
        # command they specified, and pass back the return value.
        pass

    def run(self):
        self._server.serve_forever()

Код выше, надеюсь, достаточно ясен. Мы разделили задачи так, что обработка протокола находится в собственном классе с двумя общедоступными методами: handle_request и write_response. Сам сервер использует обработчик протокола для распаковки клиентских запросов и повторения ответов сервера на клиентский сервер. Метод get_response() будет использоваться для выполнения команды, инициированной клиентом.


Более подробно рассматривая код метода connection_handler(), вы увидите, что мы получаем оболочку вокруг объекта сокета, подобную файлу. Эта оболочка позволяет абстрагироваться от некоторых особенностей, с которыми обычно сталкиваются при работе с чистыми сокетами. Функция входит в бесконечный цикл, считывает запросы от клиента, отправляет ответы и, наконец, выходит из цикла, когда клиент отключается (отмечается методом read(), который возвращает пустую строку).


Мы используем типизированные исключения для обработки случаев, когда клиенты отключаются, и для уведомления пользователей о командах обработки ошибок. Например, если пользователь отправляет неверно составленный запрос на сервер, мы выбрасываем CommandError, который сериализуется в ответ на ошибку и отправляется клиенту.


Прежде чем идти дальше, давайте обсудим, как будет взаимодействовать клиент и сервер.


Проводной протокол связи


Первой задачей, с которой я столкнулся, было то, как обрабатывать отправку двоичных данных по проводному протоколу связи. Большинство примеров, которые я нашел в Интернете, были бессмысленными эхо-серверами, которые преобразовали сокет в файл-подобный объект и просто вызвали readline(). Если бы я хотел хранить некоторые pickle-данные или строки с новыми строками, мне нужно было бы иметь какой-то формат сериализации.


Потратив время в попытках придумать что-то подходящее, я решил прочитать документацию по протоколу Redis, которая оказалась очень простой в реализации и имеет дополнительное преимущество в поддержке нескольких разных типов данных.


В протоколе Redis используется клиентский шаблон связи «запрос/ответ». Ответы с сервера будут использовать первый байт, чтобы указать тип данных, а затем данные, завершенные возвратом каретки/строки.


Тип данных Префикс Структура Пример
Простая строка + +{string data}\r\n +this is a simple string\r\n
Ошибка -{error message}\r\n -ERR unknown command "FLUHS"\r\n
Целое : :{the number}\r\n :1337\r\n
Двоичные данные $ ${number of bytes}\r\n{data}\r\n $6\r\n
foobar\r\n
Массив * *{number of elements}\r\n{0 or more of above}\r\n *3\r\n
+a simple string element\r\n
:12345\r\n
$7\r\n
testing\r\n
Словарь % %{number of keys}\r\n{0 or more of above}\r\n %3\r\n
+key1\r\n
+value1\r\n
+key2\r\n
*2\r\n
+value2-0\r\n
+value2-1\r\n
:3\r\n
$7\r\n
testing\r\n
NULL $ $-1\r\n (string of length -1) $-1\r\n

Давайте заполним класс обработчика протокола, чтобы он реализовывал протокол Redis.


class ProtocolHandler(object):
    def __init__(self):
        self.handlers = {
            '+': self.handle_simple_string,
            '-': self.handle_error,
            ':': self.handle_integer,
            '$': self.handle_string,
            '*': self.handle_array,
            '%': self.handle_dict}

    def handle_request(self, socket_file):
        first_byte = socket_file.read(1)
        if not first_byte:
            raise Disconnect()

        try:
            # Delegate to the appropriate handler based on the first byte.
            return self.handlers[first_byte](socket_file)
        except KeyError:
            raise CommandError('bad request')

    def handle_simple_string(self, socket_file):
        return socket_file.readline().rstrip('\r\n')

    def handle_error(self, socket_file):
        return Error(socket_file.readline().rstrip('\r\n'))

    def handle_integer(self, socket_file):
        return int(socket_file.readline().rstrip('\r\n'))

    def handle_string(self, socket_file):
        # First read the length ($<length>\r\n).
        length = int(socket_file.readline().rstrip('\r\n'))
        if length == -1:
            return None  # Special-case for NULLs.
        length += 2  # Include the trailing \r\n in count.
        return socket_file.read(length)[:-2]

    def handle_array(self, socket_file):
        num_elements = int(socket_file.readline().rstrip('\r\n'))
        return [self.handle_request(socket_file) for _ in range(num_elements)]

    def handle_dict(self, socket_file):
        num_items = int(socket_file.readline().rstrip('\r\n'))
        elements = [self.handle_request(socket_file)
                    for _ in range(num_items * 2)]
        return dict(zip(elements[::2], elements[1::2]))

В части сериализации протокола мы сделаем противоположное: переместим объекты Python в их сериализованные копии!


class ProtocolHandler(object):
    # ... above methods omitted ...
    def write_response(self, socket_file, data):
        buf = BytesIO()
        self._write(buf, data)
        buf.seek(0)
        socket_file.write(buf.getvalue())
        socket_file.flush()

    def _write(self, buf, data):
        if isinstance(data, str):
            data = data.encode('utf-8')

        if isinstance(data, bytes):
            buf.write('$%s\r\n%s\r\n' % (len(data), data))
        elif isinstance(data, int):
            buf.write(':%s\r\n' % data)
        elif isinstance(data, Error):
            buf.write('-%s\r\n' % error.message)
        elif isinstance(data, (list, tuple)):
            buf.write('*%s\r\n' % len(data))
            for item in data:
                self._write(buf, item)
        elif isinstance(data, dict):
            buf.write('%%%s\r\n' % len(data))
            for key in data:
                self._write(buf, key)
                self._write(buf, data[key])
        elif data is None:
            buf.write('$-1\r\n')
        else:
            raise CommandError('unrecognized type: %s' % type(data))

Дополнительным преимуществом обработки протокола в собственном классе является то, что мы можем повторно использовать методы handle_request и write_response для создания клиентской библиотеки.


Реализация комманд


Теперь класс Server, который мы проектируем, должен иметь метод get_response(). Команды будут считаться отправленными клиентом как простые строки или массив аргументов команды, поэтому параметр данных, переданный в get_response(), будет либо строкой байтов, либо списком. Чтобы упростить обработку, если данные окажутся простой строкой, мы преобразуем ее в список путем разделения на пробелы.


Первым аргументом будет имя команды с любыми дополнительными аргументами, принадлежащими указанной команде. Как и при сопоставлении первого байта с обработчиками в ProtocolHandler, давайте создадим сопоставление команд с функциями обратного вызова в классе Server:


class Server(object):
    def __init__(self, host='127.0.0.1', port=31337, max_clients=64):
        self._pool = Pool(max_clients)
        self._server = StreamServer(
            (host, port),
            self.connection_handler,
            spawn=self._pool)

        self._protocol = ProtocolHandler()
        self._kv = {}

        self._commands = self.get_commands()

    def get_commands(self):
        return {
            'GET': self.get,
            'SET': self.set,
            'DELETE': self.delete,
            'FLUSH': self.flush,
            'MGET': self.mget,
            'MSET': self.mset}

    def get_response(self, data):
        if not isinstance(data, list):
            try:
                data = data.split()
            except:
                raise CommandError('Request must be list or simple string.')

        if not data:
            raise CommandError('Missing command')

        command = data[0].upper()
        if command not in self._commands:
            raise CommandError('Unrecognized command: %s' % command)

        return self._commands[command](*data[1:])

Наш сервер почти готов! Нам просто нужно реализовать шесть методов для команд, определенных в методе get_commands():


class Server(object):
    def get(self, key):
        return self._kv.get(key)

    def set(self, key, value):
        self._kv[key] = value
        return 1

    def delete(self, key):
        if key in self._kv:
            del self._kv[key]
            return 1
        return 0

    def flush(self):
        kvlen = len(self._kv)
        self._kv.clear()
        return kvlen

    def mget(self, *keys):
        return [self._kv.get(key) for key in keys]

    def mset(self, *items):
        data = zip(items[::2], items[1::2])
        for key, value in data:
            self._kv[key] = value
        return len(data)

Вот он! Теперь наш сервер готов начать обработку запросов. В следующем разделе мы будем использовать клиент для взаимодействия с сервером.


Клиент


Чтобы взаимодействовать с сервером, давайте повторно использовать класс ProtocolHandler для реализации простого клиента. Клиент будет подключаться к серверу и отправлять команды, закодированные в виде списков. Мы будем повторно использовать как write_response(), так и логику handle_request() для запросов на кодирование и обработку ответов сервера соответственно.


class Client(object):
    def __init__(self, host='127.0.0.1', port=31337):
        self._protocol = ProtocolHandler()
        self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self._socket.connect((host, port))
        self._fh = self._socket.makefile('rwb')

    def execute(self, *args):
        self._protocol.write_response(self._fh, args)
        resp = self._protocol.handle_request(self._fh)
        if isinstance(resp, Error):
            raise CommandError(resp.message)
        return resp

С помощью метода execute() мы можем передать произвольный список параметров, которые будут закодированы как массив и отправлены на сервер. Ответ сервера анализируется и возвращается как Python-объект. Для удобства мы можем написать клиентские методы для отдельных команд:


class Client(object):
    # ...
    def get(self, key):
        return self.execute('GET', key)

    def set(self, key, value):
        return self.execute('SET', key, value)

    def delete(self, key):
        return self.execute('DELETE', key)

    def flush(self):
        return self.execute('FLUSH')

    def mget(self, *keys):
        return self.execute('MGET', *keys)

    def mset(self, *items):
        return self.execute('MSET', *items)

Чтобы проверить наш клиент, давайте сконфигурируем Python-скрипт для запуска сервера непосредственно из командной строки:


# Add this to bottom of module:
if __name__ == '__main__':
    from gevent import monkey; monkey.patch_all()
    Server().run()

Проверка сервера


Чтобы проверить сервер, просто запустите серверный Python-модуль из командной строки. В другом терминале откройте интерпретатор Python и импортируйте класс Client из модуля сервера. При создании экземпляра клиента будет октрыто соединение, и вы сможете запускать команды!


>>> from server_ex import Client
>>> client = Client()
>>> client.mset('k1', 'v1', 'k2', ['v2-0', 1, 'v2-2'], 'k3', 'v3')
3
>>> client.get('k2')
['v2-0', 1, 'v2-2']
>>> client.mget('k3', 'k1')
['v3', 'v1']
>>> client.delete('k1')
1
>>> client.get('k1')
>>> client.delete('k1')
0
>>> client.set('kx', {'vx': {'vy': 0, 'vz': [1, 2, 3]}})
1
>>> client.get('kx')
{'vx': {'vy': 0, 'vz': [1, 2, 3]}}
>>> client.flush()
2

Код, представленный в этой статье, исключительно для демонстрационных целей. Надеюсь, вам понравилось читать об этом проекте так, как мне понравилось писать о нем. Здесь вы можете найти полную копию исходного кода.


Подумайте над следующим, если вы захотите расширить проект:


  • Добавьте больше команд!
  • Используйте обработчик протокола для реализации журнала команд в режиме «только добавление».
  • Более надежная обработка ошибок.
  • Разрешить клиенту закрывать соединение и повторно подключаться.
  • Логирование.
  • Перепешите код так, чтобы использовать SocketServer из стандартной библиотеки и ThreadingMixin.

Автор публикации — Чарльз Лейфер. Перевод — Евгений Зятев.
С оригиналом можно ознакомиться в блоге Чарльза.

Комментарии (0)