Доброго времени суток! Эта статья написана для тех, кто в общих чертах знаком с тем, что такое и для чего используется Apache Kafka, кто такие Producer и Consumer и как они друг с другом работают. Целью этой статьи является показать способ использования библиотеки modern-cpp-kafka
для работы с Apache Kafka на современном C++. В общих чертах с темой можно ознакомится, например, здесь (отсюда же взяты скриншоты), а в этой статье будет рассмотрено решение проблем с владением и (де)сериализацией наиболее простым способом.
Идея написания этого небольшого руководства появилась у меня, когда я начал изучать одну из самых популярных библиотек для работы с Apache Kafka - а именно modern-cpp-kafka
. Она основана в виде оболочки над старой доброй реализацией для C - lirdbkaka
. Версия для C++ подкупила меня тем, что она предоставляет довольно удобный интерфейс для работы, использует современные и актуальные семантики языка вроде RAII и шаблонов, а также, как утверждает создатель, отсутствие оверхеда и самую быструю скорость при работе с данными размерами в пределах 256 B ~ 2 KB.
Однако, как водится, ничто не идеально, а именно - владение данными, сериализация и десериализация, которые реализованы что в librdkafka
, что в modern-cpp-kafka
примерно никак. Для того, чтобы разобраться, что именно не так, необходимо: рассмотреть механизм работы самой Kafka; ознакомиться с реализацией функций отправки и приема в librdkafka
и modern-cpp-kafka
; понять, при чем здесь владение. И уже затем – разработать способ сериализации и десериализации.
В конце статьи я приведу код собственного решения проблемы с владением данными и сериализацией. Решение отчасти банальное и не годится для промышленного использования, но оно хорошо подойдет начинающим пользователям ввиду своей минималистичности и простоты.
Механизм работы с данными в Apache Kafka
Максимально кратко рассмотрим механизм добавления и извлечения данных в Apache Kafka. Данные доставляются, хранятся и извлекаются исключительно в двоичном представлении, что автоматически поднимает вопрос о том, как получить данные в нужном пользователю виде - то есть о сериализации и десериализации.
![Пример добавления сообщения с ключом int, равным 123, и значением string, равным "hello world" Пример добавления сообщения с ключом int, равным 123, и значением string, равным "hello world"](https://habrastorage.org/getpro/habr/upload_files/51b/305/01d/51b30501db8a8480ee8299d5b7d0ceb7.png)
Как видно на скриншоте, при добавлении ключа и соответствующего ему значения требуется преобразовать их в двоичный вид (сериализовать), для чего используются сериализаторы как для ключа, так и значения. Как правило, эти задачи берет на себя Producer.
Ситуация с извлечением данных выглядит очень похоже.
![Пример извлечения сообщения с ключом int, равным 123, и значением string, равным "hello world" Пример извлечения сообщения с ключом int, равным 123, и значением string, равным "hello world"](https://habrastorage.org/getpro/habr/upload_files/bda/15d/594/bda15d59448d3309edb474248c40228d.png)
На этом скриншоте показана схема работы Consumer-а, который занимается извлечением двоичных данных и преобразованием их в нужный тип (десериализацией).
Механизм работы с данными в librdkafka и modern-cpp-kafka
Библиотека librdkafka
, будучи написанной на C, использует именно C-шный механизм так называемого стирания типов (type erasure) путем использования связки void
-указателя и длины данных, что нам демонстрирует функция rd_kafka_produce
.
RD_EXPORT
int rd_kafka_produce(rd_kafka_topic_t *rkt,
int32_t partition,
int msgflags,
void *payload,
size_t len,
const void *key,
size_t keylen,
void *msg_opaque);
Критика такого подхода стара как сам язык C, однако это единственный доступный в языке способ передавать любые данные – нужно лишь их сериализовать. Тем не менее, ни стандартная библиотека, ни librdkafka
не предоставляет никаких инструментов для сериализации.
Не решает ее, увы, и библиотека modern-cpp-kafka
. Рассмотрим функцию KafkaProducer::send
для отправки сообщения:
inline void
KafkaProducer::send(const producer::ProducerRecord& record, // Структура с данными (ключ, значение, хедеры, топик, партишен и id)
const producer::Callback& deliveryCb, // Callback-функция
SendOption option, // Опции для отправки
ActionWhileQueueIsFull action); // Опции действия на заполненность очереди (блокировать или нет)
Нам нужен тип первого параметра - ProducerRecord
. Его структура (не включая методы) выглядит так:
/**
* A key/value pair to be sent to Kafka.
* This consists of a topic name to which the record is being sent, an optional partition number, and an optional key and value.
* Note: `ProducerRecord` would not take the ownership from the memory block of `Value`.
*/
class ProducerRecord
{
public:
using Id = std::uint64_t;
// конструкторы, геттеры, сеттеры
private:
Topic _topic;
Partition _partition;
Key _key;
Value _value;
Headers _headers;
Optional<Id> _id;
};
Типы Key
и Value
являются type alias-ами на тип ConstBuffer
, имеющего следующую сигнатуру:
// Which is similar with `boost::const_buffer` (thus avoid the dependency towards `boost`)
class ConstBuffer
{
public:
// побайтовое копирование указателя
explicit ConstBuffer(const void* data = nullptr, std::size_t size = 0): _data(data), _size(size) {}
const void* data() const { return _data; }
std::size_t size() const { return _size; }
std::string toString() const
{
// опустим реализацию
}
private:
const void* _data;
std::size_t _size;
};
Поля класса выглядят как-то знакомо, не правда ли? Да, это именно та связка void
-указателя и длины, которая используется и в функции rd_kafka_produce.
Копирование указателей выполняется побайтово, из-за этого мы сталкиваемся с проблемой "висящего" указателя и, как следствие, use-after-free. Именно реализация класса ConstBuffer
делает класс ProducerRecord
невладеющим (и крайне неудобным в нетривиальных случаях), что и сказано прямиков в коментарии в описании класса (для удобства переведенного на русский):
Примечание:
ProducerRecord
не будет принимать права владения блоком памятиValue
.
Как мы видим, что в librdkafka
, что в modern-cpp-kafka
используется невладеющий механизм для передачи или хранения параметров. Благодаря такому подходу обеспечивается простота использования интерфейса C-шной библиотеки из вызывающего C++ кода, но создает проблемы с передачей параметров для создания класса, потому что указатель на не динамически выделенные данные инвалидируется после разрушения объекта.
Примером может служить этот простой код. Здесь мы просто хотим передать сериализуемый объект типа T
и на выходе получить готовый к передаче при помощи KafkaProducer::send
объект Value
. И если внутри функции все хорошо, то после выхода из функции объект std::vector
-а будет уничтожен и указатель инвалидируется. Решением, конечно, может выступать подход, в котором мы помещаем объекты в одной области видимости, но на мой взгляд это плохой способ ввиду нарушения принципа единой ответственности. Описанную проблему можно увидеть в коде ниже.
Примечание. Функции Serialize
и Deserialize
будут описаны ниже, в соответствующем пункте, пока же нам важно знать, что Serialize
возврашает std::vector<std::byte>
.
// Хотим получить готовый объект из сериализованных данных
// но получаем "висящий" указатель - увы и ах!
template <typename T>
ConstBuffer ValueFromNonOwning(const T& value) {
const std::vector<byte> serialized = Serialize(value);
// глубокого копирования не происходит - "висящий" указатель
return ConstBuffer(serialized.data(), serialized.size());
}
// Не спасет тут и умный указатель: ни на локальную переменную serialized
// т.к. данные из умного указателя будут просто скопированы по значению
// вместо создания копии; ни на сам возвращаемый объект ConstBuffer
// (по изначальной причине) - и снова увы и ах!
template <typename T>
ConstBuffer ValueFromNonOwningUniquePtr(const T& value) {
// выделяем память в куче (с shared_ptr та же история, ведь счетчик ссылок равен 0)
const auto serialized = std::make_unique<std::vector<std::byte>>(Serialize(value));
// проблема остается, потому что теперь разрушается сам умный указатель
return ConstBuffer(serialized->data(), serialized->size());
}
Владеющий ConstBuffer
Чтобы починить проблему с "висящим" указателем, мне пришла идея написать владеющий буфер, который будет гарантировать сохранение данных на протяжении всей жизни объекта. Так появился OwningBuffer
:
// Владеющий аналог класса ConstBuffer
class OwningBuffer {
public:
explicit OwningBuffer(const void* data = nullptr, const std::size_t size = 0) {
if (data && size > 0) {
m_rawData.assign(static_cast<const std::byte*>(data),
static_cast<const std::byte*>(data) + size);
}
}
explicit OwningBuffer(const ConstBuffer& buffer)
: OwningBuffer(buffer.data(), buffer.size()) {}
// конструктор копирования для создания из сырых данных
explicit OwningBuffer(const std::vector<std::byte>& bytes) : m_rawData(bytes) {}
// конструктор перемещения для создания из сырых данных
explicit OwningBuffer(std::vector<std::byte>&& bytes) noexcept
: m_rawData(std::move(bytes)) {}
[[nodiscard]] const void* data() const { return m_rawData.data(); }
[[nodiscard]] std::size_t size() const { return m_rawData.size(); }
[[nodiscard]] std::string toString() const {
// так как создание ConstBuffer не содержит сложных операций,
// нет и оверхеда на создание временного объекта
const ConstBuffer buffer(m_rawData.data(), m_rawData.size());
return buffer.toString();
}
// получение всегда валидного объекта ConstBuffer
[[nodiscard]] ConstBuffer asConstBuffer() const {
return ConstBuffer(m_rawData.data(), m_rawData.size());
}
private:
// сырые данные объекта
std::vector<std::byte> m_rawData;
};
// Такой вариант функции работает корректно, никаких "висящих" указателей
template <typename T>
OwningBuffer ValueFrom(const T& value) {
const auto serialized = Serialize(value);
return OwningBuffer(serialized);
}
Интерфейс класса аналогичен интерфейсу ConstBuffer
, но содержит некоторые дополнения в виде новых конструкторов для создания объекта из сырых данных, а также вспомогательного метода для получения валидного объекта ConstBuffer
. Владеющий подход полностью решает проблему с "висящими" указателями и use-after-free, и теперь можно перейти к сериализации и десериализации.
Обратите внимание на метод toString
: благодаря отсутствию сложной логики (к примеру, копирования) и аллокаций мы можем создать временный объект ConstBuffer
и вызвать его метод toString
, дабы не заниматься дублированием кода. Такое вот решение без оверхеда.
Сериализация и десериализация
Теперь, решив проблему с владением, можно перейти к самому главному - сериализации и десериализации данных. Несмотря на обилие библиотек, которые предоставляют такую функциональность, я не буду их использовать, чтобы не перегружать статью лишними зависимостями и усложнять материал разбором доступных средств для сериализации. Поэтому мы будем использовать старый добрый std::byte
из C++17 в еще более старом std::vector
и reinterpret_cast
.
#include <cstring>
#include <stdexcept>
#include <type_traits>
#include <vector>
template <typename T>
std::vector<std::byte> Serialize(const T& value) {
static_assert(std::is_trivially_copyable_v<T>, "Type must be trivially copyable");
const auto begin = reinterpret_cast<const std::byte*>(&value);
return {begin, begin + sizeof(T)};
}
template <typename T>
T Deserialize(const std::vector<std::byte>& serializedData) {
static_assert(std::is_trivially_copyable_v<T>, "Type must be trivially copyable");
static_assert(std::is_default_constructible_v<T>, "Type must be default constructible");
// примечание: проверяется только размер объекта.
// никто не мешает записать в int64_t содержимое std::string размером 5 байт
if (serializedData.size() != sizeof(T)) {
throw std::runtime_error("Serialized data size does not match target type size");
}
T value;
std::memcpy(&value, serializedData.data(), sizeof(T));
return value;
}
// специализация для std::string
template <>
inline std::vector<std::byte> Serialize<std::string>(const std::string& value) {
auto begin = reinterpret_cast<const std::byte*>(value.data());
return {begin, begin + value.size()};
}
// специализация для std::string
template <>
inline std::string Deserialize<std::string>(
const std::vector<std::byte>& serializedData) {
return {reinterpret_cast<const char*>(serializedData.data()), serializedData.size()};
}
В static_assert
-ах, по большому счету, нет такой необходимости, ведь код и так и так не скомпилируется, если что-то пойдет не так, но всегда приятнее видеть красивые сообщения об ошибках, верно? Механизм сериализации банален: берем адрес объекта и прибавляем его размер, таким образом "охватывая" все содержимое объекта, на выходе получаем вектор байтов, то есть сырые данные объекта.
С десериализацией все немного сложнее (и не так гладко). Здесь мы используем функцию memcpy
, чтобы заполнить созданный при помощи конструктора по умолчанию объект переданными сырыми байтами. Также нужно иметь в виду, что проверяются только размеры десериализуемого объекта, то есть можно записать объект, например, std::string
в объект типа float
и получить бессмыслицу.
Остается лишь создать обертки для сериализации и десериализации. В этом нам помогут функции ValueTo
и ValueFrom
:
// сериализация
template <typename T>
OwningBuffer ValueFrom(const T& value) {
const auto serialized = Serialize(value);
return OwningBuffer(serialized);
}
// десериализация
template <typename T>
T ValueTo(const Value& value) {
if (!value.data()) {
throw std::runtime_error("Received empty value");
}
// преобразуем void* к const byte* и "захватываем" все значения
const std::vector<std::byte> serializedData(
static_cast<const std::byte*>(value.data()),
static_cast<const std::byte*>(value.data()) + value.size());
// возвращаем объект, полученный при помощи десериализации
return Deserialize<T>(serializedData);
}
Демонстрация работы
В качестве примера рассмотрим классическую модель Single Producer - SIngle Consumer. Код Producer-а:
#include <kafka/KafkaProducer.h>
#include <iostream>
#include <string>
#include "KafkaUtils.h"
using namespace kafka::clients::producer;
void SendValues(const kafka::Topic& topic, KafkaProducer& producer) {
// Prepare delivery callback
auto deliveryCb = [](const RecordMetadata& metadata, const kafka::Error& error) {
if (!error) {
std::cout << "Message delivered: " << metadata.toString() << std::endl;
} else {
std::cerr << "Message failed to be delivered: " << error.message() << std::endl;
}
};
{
std::cout << "Sending messages with FLOAT type" << std::endl;
constexpr float f = 2.34;
kafka::extensions::SendValue(producer, topic, kafka::NullKey,
kafka::extensions::ValueFrom(f).AsConstBuffer(),
deliveryCb);
}
{
std::cout << "Sending messages with STRINGS type" << std::endl;
const auto values = std::vector<std::string>{"amogus", "breakpoint", "cappa",
"delta", "extension", "final"};
for (const auto& value : values) {
kafka::extensions::SendValue(producer, topic, kafka::NullKey,
kafka::extensions::ValueFrom(value).AsConstBuffer(),
deliveryCb);
}
}
}
void DoProducerWork() {
// взято из конфигурации Docker Compose
const std::string brokers = "localhost:29092";
const kafka::Topic topic = "test-topic";
// Prepare the configuration
kafka::Properties props;
props.put("bootstrap.servers", brokers);
// Create a producer
KafkaProducer producer(props);
std::println(std::cout, "SendValues function");
SendValues(topic, producer);
}
int main() {
try {
DoProducerWork();
} catch (const std::exception& e) {
std::cerr << "Error: " << e.what() << std::endl;
return 1;
}
return 0;
}
Код Consumer-а:
#include <KafkaUtils.h>
#include <kafka/KafkaConsumer.h>
#include <csignal>
#include <iostream>
#include <string>
std::atomic_bool running = {true};
void StopRunning(int sig) {
if (sig != SIGINT) return;
if (running) {
running = false;
} else {
// Restore the signal handler, -- to avoid stuck with this handler
signal(SIGINT, SIG_IGN); // NOLINT
}
}
void DoConsumerWork() {
using namespace kafka;
using namespace kafka::clients::consumer;
const std::string brokers = "localhost:29092";
const Topic topic = "test-topic";
// Prepare the configuration
Properties props;
props.put("bootstrap.servers", brokers);
// Create a consumer instance
KafkaConsumer consumer(props);
// Subscribe to topics
consumer.subscribe({topic});
while (running) {
// Poll messages from Kafka brokers
for (const auto records = consumer.poll(std::chrono::milliseconds(100));
const auto& record : records) {
if (record.error()) {
std::cerr << record.toString() << std::endl;
continue;
}
std::cout << "Got a new message..." << std::endl;
std::cout << " Topic : " << record.topic() << std::endl;
std::cout << " Partition: " << record.partition() << std::endl;
std::cout << " Offset : " << record.offset() << std::endl;
std::cout << " Timestamp: " << record.timestamp().toString() << std::endl;
std::cout << " Headers : " << toString(record.headers()) << std::endl;
try {
const auto stringValue = kafka::extensions::ValueTo<std::string>(record.value());
std::cout << " STRING [" << stringValue << "]" << std::endl;
} catch (const std::exception& e) {
std::cerr << " Failed to deserialize as string: " << e.what() << std::endl;
}
try {
const auto floatValue = kafka::extensions::ValueTo<float>(record.value());
std::cout << " FLOAT [" << floatValue << "]" << std::endl;
} catch (const std::exception& e) {
std::cerr << " Failed to deserialize as float: " << e.what() << std::endl;
}
}
}
}
int main() {
// Use Ctrl-C to terminate the program
signal(SIGINT, StopRunning); // NOLINT
DoConsumerWork();
return 0;
}
В этом примере происходит демонстрация того, что если результирующий тип подходит по критерию размера, то он может быть использован и это не будет ошибкой.
Заключение
В этой статье я постарался совместить сразу несколько вещей: рассказать про Apache Kafka, про хранение данных внутри, про наличие определенных проблем и подходы к их решению. Написание текстов (помимо курсовых и дипломной работ) для меня в новинку, поэтому призываю каждого, кто прочитал, поделиться мыслями – с удовольствием почитаю и отвечу.
Демонстрацию сериализации и десериализации прямо в браузере можно посмотреть на сайте godbolt. Исходный код с настройкой Apache Kafka в Docker Compose находится в моем github-репозитории.