На днях мне пришла в голову мысль, что было бы здорово написать простой Redis-подобный сервер баз данных. Хотя у меня значительный опыт работы с приложениями WSGI, сервер базы данных представил новый вызов и оказался хорошей практикой в процессе обучения работе с сокетами в Python. В этой статье расскажу, что я узнал в процессе исследования.
Цель моего проекта заключалась в том, чтобы написать простой сервер, который я мог бы использовать с очередью задач под названием huey. Huey использует Redis в качестве механизма хранения по умолчанию для отслеживания заданий в очереди, результатов выполнения и других вещей. В статье я сократил исходный код проекта, чтобы обойтись без воды; недостающий код вы можете легко дописать сами, но если интересно, можете взглянуть на конечный результат.
Сервер, который мы будем создавать, сможет отвечать на следующие команды:
GET <key>
SET <key> <value>
DELETE <key>
FLUSH
MGET <key1> ... <keyn>
MSET <key1> <value1> ... <keyn> <valuen>
Также будем поддерживать следующие типы данных:
- Строки и двоичные данные.
- Числа.
- NULL.
- Массивы (могут быть вложенными).
- Словари (могут быть вложенными).
- Сообщения об ошибках.
Чтобы обрабатывать несколько клиентов асинхронно, мы будем использовать gevent, но вы также можете использовать модуль SocketServer стандартной библиотеки с помощью ForkingMixin
или ThreadingMixin
.
Скелет проекта
Давайте создадим скелет для нашего сервера. Нам понадобится сам сервер и функция обратного вызова, которая будет выполняться при подключении нового клиента. Кроме того, понадобится какая-то логика для обработки запроса от клиента и отправки ему ответа.
Начнем:
from gevent import socket
from gevent.pool import Pool
from gevent.server import StreamServer
from collections import namedtuple
from io import BytesIO
from socket import error as socket_error
# We'll use exceptions to notify the connection-handling loop of problems.
class CommandError(Exception): pass
class Disconnect(Exception): pass
Error = namedtuple('Error', ('message',))
class ProtocolHandler(object):
def handle_request(self, socket_file):
# Parse a request from the client into it's component parts.
pass
def write_response(self, socket_file, data):
# Serialize the response data and send it to the client.
pass
class Server(object):
def __init__(self, host='127.0.0.1', port=31337, max_clients=64):
self._pool = Pool(max_clients)
self._server = StreamServer(
(host, port),
self.connection_handler,
spawn=self._pool)
self._protocol = ProtocolHandler()
self._kv = {}
def connection_handler(self, conn, address):
# Convert "conn" (a socket object) into a file-like object.
socket_file = conn.makefile('rwb')
# Process client requests until client disconnects.
while True:
try:
data = self._protocol.handle_request(socket_file)
except Disconnect:
break
try:
resp = self.get_response(data)
except CommandError as exc:
resp = Error(exc.args[0])
self._protocol.write_response(socket_file, resp)
def get_response(self, data):
# Here we'll actually unpack the data sent by the client, execute the
# command they specified, and pass back the return value.
pass
def run(self):
self._server.serve_forever()
Код выше, надеюсь, достаточно ясен. Мы разделили задачи так, что обработка протокола находится в собственном классе с двумя общедоступными методами: handle_request
и write_response
. Сам сервер использует обработчик протокола для распаковки клиентских запросов и повторения ответов сервера на клиентский сервер. Метод get_response()
будет использоваться для выполнения команды, инициированной клиентом.
Более подробно рассматривая код метода connection_handler()
, вы увидите, что мы получаем оболочку вокруг объекта сокета, подобную файлу. Эта оболочка позволяет абстрагироваться от некоторых особенностей, с которыми обычно сталкиваются при работе с чистыми сокетами. Функция входит в бесконечный цикл, считывает запросы от клиента, отправляет ответы и, наконец, выходит из цикла, когда клиент отключается (отмечается методом read()
, который возвращает пустую строку).
Мы используем типизированные исключения для обработки случаев, когда клиенты отключаются, и для уведомления пользователей о командах обработки ошибок. Например, если пользователь отправляет неверно составленный запрос на сервер, мы выбрасываем CommandError
, который сериализуется в ответ на ошибку и отправляется клиенту.
Прежде чем идти дальше, давайте обсудим, как будет взаимодействовать клиент и сервер.
Проводной протокол связи
Первой задачей, с которой я столкнулся, было то, как обрабатывать отправку двоичных данных по проводному протоколу связи. Большинство примеров, которые я нашел в Интернете, были бессмысленными эхо-серверами, которые преобразовали сокет в файл-подобный объект и просто вызвали readline()
. Если бы я хотел хранить некоторые pickle-данные или строки с новыми строками, мне нужно было бы иметь какой-то формат сериализации.
Потратив время в попытках придумать что-то подходящее, я решил прочитать документацию по протоколу Redis, которая оказалась очень простой в реализации и имеет дополнительное преимущество в поддержке нескольких разных типов данных.
В протоколе Redis используется клиентский шаблон связи «запрос/ответ». Ответы с сервера будут использовать первый байт, чтобы указать тип данных, а затем данные, завершенные возвратом каретки/строки.
Тип данных | Префикс | Структура | Пример |
---|---|---|---|
Простая строка | + | +{string data}\r\n | +this is a simple string\r\n |
Ошибка | – | -{error message}\r\n | -ERR unknown command "FLUHS"\r\n |
Целое | : | :{the number}\r\n | :1337\r\n |
Двоичные данные | $ | ${number of bytes}\r\n{data}\r\n | $6\r\n foobar\r\n |
Массив | * | *{number of elements}\r\n{0 or more of above}\r\n | *3\r\n +a simple string element\r\n :12345\r\n $7\r\n testing\r\n |
Словарь | % | %{number of keys}\r\n{0 or more of above}\r\n | %3\r\n +key1\r\n +value1\r\n +key2\r\n *2\r\n +value2-0\r\n +value2-1\r\n :3\r\n $7\r\n testing\r\n |
NULL | $ | $-1\r\n (string of length -1) | $-1\r\n |
Давайте заполним класс обработчика протокола, чтобы он реализовывал протокол Redis.
class ProtocolHandler(object):
def __init__(self):
self.handlers = {
'+': self.handle_simple_string,
'-': self.handle_error,
':': self.handle_integer,
'$': self.handle_string,
'*': self.handle_array,
'%': self.handle_dict}
def handle_request(self, socket_file):
first_byte = socket_file.read(1)
if not first_byte:
raise Disconnect()
try:
# Delegate to the appropriate handler based on the first byte.
return self.handlers[first_byte](socket_file)
except KeyError:
raise CommandError('bad request')
def handle_simple_string(self, socket_file):
return socket_file.readline().rstrip('\r\n')
def handle_error(self, socket_file):
return Error(socket_file.readline().rstrip('\r\n'))
def handle_integer(self, socket_file):
return int(socket_file.readline().rstrip('\r\n'))
def handle_string(self, socket_file):
# First read the length ($<length>\r\n).
length = int(socket_file.readline().rstrip('\r\n'))
if length == -1:
return None # Special-case for NULLs.
length += 2 # Include the trailing \r\n in count.
return socket_file.read(length)[:-2]
def handle_array(self, socket_file):
num_elements = int(socket_file.readline().rstrip('\r\n'))
return [self.handle_request(socket_file) for _ in range(num_elements)]
def handle_dict(self, socket_file):
num_items = int(socket_file.readline().rstrip('\r\n'))
elements = [self.handle_request(socket_file)
for _ in range(num_items * 2)]
return dict(zip(elements[::2], elements[1::2]))
В части сериализации протокола мы сделаем противоположное: переместим объекты Python в их сериализованные копии!
class ProtocolHandler(object):
# ... above methods omitted ...
def write_response(self, socket_file, data):
buf = BytesIO()
self._write(buf, data)
buf.seek(0)
socket_file.write(buf.getvalue())
socket_file.flush()
def _write(self, buf, data):
if isinstance(data, str):
data = data.encode('utf-8')
if isinstance(data, bytes):
buf.write('$%s\r\n%s\r\n' % (len(data), data))
elif isinstance(data, int):
buf.write(':%s\r\n' % data)
elif isinstance(data, Error):
buf.write('-%s\r\n' % error.message)
elif isinstance(data, (list, tuple)):
buf.write('*%s\r\n' % len(data))
for item in data:
self._write(buf, item)
elif isinstance(data, dict):
buf.write('%%%s\r\n' % len(data))
for key in data:
self._write(buf, key)
self._write(buf, data[key])
elif data is None:
buf.write('$-1\r\n')
else:
raise CommandError('unrecognized type: %s' % type(data))
Дополнительным преимуществом обработки протокола в собственном классе является то, что мы можем повторно использовать методы handle_request
и write_response
для создания клиентской библиотеки.
Реализация комманд
Теперь класс Server
, который мы проектируем, должен иметь метод get_response()
. Команды будут считаться отправленными клиентом как простые строки или массив аргументов команды, поэтому параметр данных, переданный в get_response()
, будет либо строкой байтов, либо списком. Чтобы упростить обработку, если данные окажутся простой строкой, мы преобразуем ее в список путем разделения на пробелы.
Первым аргументом будет имя команды с любыми дополнительными аргументами, принадлежащими указанной команде. Как и при сопоставлении первого байта с обработчиками в ProtocolHandler
, давайте создадим сопоставление команд с функциями обратного вызова в классе Server
:
class Server(object):
def __init__(self, host='127.0.0.1', port=31337, max_clients=64):
self._pool = Pool(max_clients)
self._server = StreamServer(
(host, port),
self.connection_handler,
spawn=self._pool)
self._protocol = ProtocolHandler()
self._kv = {}
self._commands = self.get_commands()
def get_commands(self):
return {
'GET': self.get,
'SET': self.set,
'DELETE': self.delete,
'FLUSH': self.flush,
'MGET': self.mget,
'MSET': self.mset}
def get_response(self, data):
if not isinstance(data, list):
try:
data = data.split()
except:
raise CommandError('Request must be list or simple string.')
if not data:
raise CommandError('Missing command')
command = data[0].upper()
if command not in self._commands:
raise CommandError('Unrecognized command: %s' % command)
return self._commands[command](*data[1:])
Наш сервер почти готов! Нам просто нужно реализовать шесть методов для команд, определенных в методе get_commands()
:
class Server(object):
def get(self, key):
return self._kv.get(key)
def set(self, key, value):
self._kv[key] = value
return 1
def delete(self, key):
if key in self._kv:
del self._kv[key]
return 1
return 0
def flush(self):
kvlen = len(self._kv)
self._kv.clear()
return kvlen
def mget(self, *keys):
return [self._kv.get(key) for key in keys]
def mset(self, *items):
data = zip(items[::2], items[1::2])
for key, value in data:
self._kv[key] = value
return len(data)
Вот он! Теперь наш сервер готов начать обработку запросов. В следующем разделе мы будем использовать клиент для взаимодействия с сервером.
Клиент
Чтобы взаимодействовать с сервером, давайте повторно использовать класс ProtocolHandler
для реализации простого клиента. Клиент будет подключаться к серверу и отправлять команды, закодированные в виде списков. Мы будем повторно использовать как write_response()
, так и логику handle_request()
для запросов на кодирование и обработку ответов сервера соответственно.
class Client(object):
def __init__(self, host='127.0.0.1', port=31337):
self._protocol = ProtocolHandler()
self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._socket.connect((host, port))
self._fh = self._socket.makefile('rwb')
def execute(self, *args):
self._protocol.write_response(self._fh, args)
resp = self._protocol.handle_request(self._fh)
if isinstance(resp, Error):
raise CommandError(resp.message)
return resp
С помощью метода execute()
мы можем передать произвольный список параметров, которые будут закодированы как массив и отправлены на сервер. Ответ сервера анализируется и возвращается как Python-объект. Для удобства мы можем написать клиентские методы для отдельных команд:
class Client(object):
# ...
def get(self, key):
return self.execute('GET', key)
def set(self, key, value):
return self.execute('SET', key, value)
def delete(self, key):
return self.execute('DELETE', key)
def flush(self):
return self.execute('FLUSH')
def mget(self, *keys):
return self.execute('MGET', *keys)
def mset(self, *items):
return self.execute('MSET', *items)
Чтобы проверить наш клиент, давайте сконфигурируем Python-скрипт для запуска сервера непосредственно из командной строки:
# Add this to bottom of module:
if __name__ == '__main__':
from gevent import monkey; monkey.patch_all()
Server().run()
Проверка сервера
Чтобы проверить сервер, просто запустите серверный Python-модуль из командной строки. В другом терминале откройте интерпретатор Python и импортируйте класс Client
из модуля сервера. При создании экземпляра клиента будет октрыто соединение, и вы сможете запускать команды!
>>> from server_ex import Client
>>> client = Client()
>>> client.mset('k1', 'v1', 'k2', ['v2-0', 1, 'v2-2'], 'k3', 'v3')
3
>>> client.get('k2')
['v2-0', 1, 'v2-2']
>>> client.mget('k3', 'k1')
['v3', 'v1']
>>> client.delete('k1')
1
>>> client.get('k1')
>>> client.delete('k1')
0
>>> client.set('kx', {'vx': {'vy': 0, 'vz': [1, 2, 3]}})
1
>>> client.get('kx')
{'vx': {'vy': 0, 'vz': [1, 2, 3]}}
>>> client.flush()
2
Код, представленный в этой статье, исключительно для демонстрационных целей. Надеюсь, вам понравилось читать об этом проекте так, как мне понравилось писать о нем. Здесь вы можете найти полную копию исходного кода.
Подумайте над следующим, если вы захотите расширить проект:
- Добавьте больше команд!
- Используйте обработчик протокола для реализации журнала команд в режиме «только добавление».
- Более надежная обработка ошибок.
- Разрешить клиенту закрывать соединение и повторно подключаться.
- Логирование.
- Перепешите код так, чтобы использовать
SocketServer
из стандартной библиотеки иThreadingMixin
.
Автор публикации — Чарльз Лейфер. Перевод — Евгений Зятев.
С оригиналом можно ознакомиться в блоге Чарльза.