Независимо от механизма репликации, необходимо использовать fsync() для предотвращения глобальной потери данных в non-Byzantine протоколах.

Статья переведена и адаптирована командой Southbridge.

С самого начала разработки Redpanda целью было создание более быстрой и безопасной платформы потоковых данных, способной безотказно поддерживать приложения, критически важные для бизнеса. Эта цель повлияла на многие из выборов в проектировании:

  • Выбор протокола репликации Raft вместо ISR протокола репликации Apache Kafka®, поскольку Raft широко изучен в академических кругах, имеет несколько референсных реализаций масштаба промышленности и сильную модель согласованности с четко определенными режимами отказа.

  • Отказ от протокола транзакций с более высокой пропускной способностью, но подверженного проблеме KAFKA-14402, и выбор явного записи маркеров начала транзакции (выпущено в версии 22.3.1).

  • Сотрудничество с компанией Jepsen, предоставляющей услуги аудита распределенных систем, для независимого тестирования Redpanda на предмет потери данных и других нарушений согласованности.

  • Ежедневное проведение 11-часовых хаос-тестов на согласованность.

  • Всегда использование fsync() перед подтверждением запроса клиента при установленной безопасности с параметром acks=all.

В этой статье фокус на последнем пункте и демонстрируется — как в теории, так и на практике — что потеря несинхронизированных данных даже на одном узле (!) достаточна для вызывания глобальной потери данных в реплицируемой системе.

Что такое fsync?

По умолчанию, API записи на диск является асинхронным. Когда приложение использует API операционной системы для записи данных на диск, ОС копирует данные и может подтвердить запрос на запись, не ожидая достижения данных диска. Это поведение повышает скорость передачи данных и пропускную способность, но уменьшает безопасность.

Хотя такое поведение более безопасно, чем буферизация данных на уровне приложения, оно не является полностью последовательным. Операционная система копирует данные, поэтому, даже если приложение аварийно завершается, данные остаются неизменными, и ОС завершает запись. Однако, если происходит потеря питания или сбой операционной системы, недавно записанные данные могут быть потеряны.

С использованием fsync приложение запрашивает операционную систему вернуть управление только после того, как все недавно записанные данные будут записаны на диск. Это гарантирует, что приложение работает синхронно с диском, гарантируя, что все данные записываются на диск перед продолжением выполнения программы.

Для чего нужна репликация?

Репликация — это техника, используемая для повышения доступности и сохранности данных приложения. Когда приложение или его данные хранятся на одном узле, если этот узел выходит из строя, приложение становится недоступным, а данные становятся недоступными. Репликация решает эту проблему, храня данные на разных узлах, поддерживая их согласованность и обеспечивая возможность чтения и записи данных даже при наличии разделения сети и сбоев (например, сбои с остановкой работы).

Репликация гарантирует согласованность данных между всеми узлами и обеспечивает, что данные находятся в такой же согласованности, как если бы они находились на одном узле (линейная согласованность). Это означает, что даже если один узел выходит из строя, приложение и его данные остаются доступными, и пользователи могут продолжать получать доступ к приложению и использовать его без прерываний.

Может ли репликация снизить риски безопасности при работе системы без fsync?

Сбои узлов являются основной уязвимостью при работе системы без fsync. Однако, согласованная репликация может выдержать сбои узлов, не нарушая согласованность данных. Таким образом, репликация решает эту уязвимость и делает использование fsync ненужным для реплицированных систем.

Конечно, потеря данных все еще может произойти, если все узлы потеряют питание или одновременно переживут сбои операционной системы. Однако, возможно сократить вероятность такого события, например, путем использования зон доступности, что создает ощущение безопасности.

Приведенный выше аргумент является распространенным недоразумением. Даже потеря питания на одном узле, приводящая к потере локальных несинхронизированных данных, может привести к незаметной глобальной потере данных в реплицированной системе, которая не использует fsync, независимо от используемого протокола репликации.

Следует отметить, что большинство протоколов репликации терпят только сбои с остановкой работы, что означает, что хотя узлы могут выходить из строя, при их перезапуске они должны иметь такое же состояние (данные), как у них было в момент сбоя.

Запуск системы без fsync исключает аварийные остановки узлов из категории сбоев с остановкой работы, и мы не можем использовать репликацию для обоснования отсутствия fsync.

Может ли протокол репликации поддерживать неисправности, выходящие за пределы сбои с остановкой работы, и терпеть отсутствие fsync?

Да, Byzantine неисправности являются более широким классом неисправностей, который предполагает, что узлы могут проявлять любой вид неблагоприятного поведения, включая потерю несинхронизированных записей. В статье «Reaching Agreement in the Presence of Faults» показано, что для того чтобы терпеть n Byzantine неисправностей, система должна быть реплицирована на 3n+1 узлах.

Однако установка коэффициента репликации равного четырем в Redpanda или Kafka недостаточно для защиты от одной Byzantine неисправности. Система должна использовать передовые протоколы Byzantine устойчивости (Byzantine fault‑tolerant, BFT), которые на данный момент не используются ни в одной из этих систем.

Мы не выбрали протокол BFT из‑за его сложности, зрелости и характеристик производительности. Требуется время, чтобы сформировать протокол исследования для промышленного использования. Например, протокол Paxos, первый протокол согласованной репликации, потребовал десятилетия исследования распределенных систем для поддержки важных промышленных возможностей, таких как возможность замены вышедших из строя узлов.

Потеря несинхронизированных данных — это лишь небольшая часть Byzantine ошибок. Существует ли протокол, который охватывает это подмножество и обеспечивает тот же уровень гарантий, что и Raft, с 2n+1 узлами вместо 3n+1?

Это хороший вопрос компьютерной науки, и я не знаю общего ответа, но при n=1 это невозможно.

Доказательство невозможности

Протоколы репликации Raft/Paxos разработаны с учетом устойчивости к неисправностям. Пока большинство узлов доступно для выбора лидера (2 RTT), система может гарантировать доступность и согласованность. Однако, если узлу разрешено потерять несинхронизированные данные, это условие не может быть соблюдено ни в одном протоколе репликации. Давайте предположим обратное и докажем это методом от противного. 

Рассмотрим реплицированную систему, состоящую из трех узлов ({A,B,C}) с RF=3. Предположим, что узел {A} становится изолированным от клиента и других узлов, и клиент записывает последовательные записи {1,2,3,4,5} в систему репликации. Как только клиент получает подтверждение (ack), происходит следующее:

  • Узел {B} становится изолированным от других узлов;      

  • Узел {C} выходит из строя, теряет несинхронизированные данные и перезапускается только с данными {1,2,3};

  • Узел {A} восстанавливает связь с узлом {C} и клиентом.

Мы предположили, что через некоторое время (2 RTT) кластер должен стать доступным для обслуживания чтения и записи. Однако у узлов {A,C} нет суффиксов {4,5}, поэтому, если они станут доступными, произойдет потеря данных. Это противоречит гарантии системы о согласованности.

В заключение, невозможно, чтобы реплицированная система терпела потерю несинхронизированных данных и сохраняла тот же уровень согласованности и доступности, что и протоколы Raft/Paxos с RF=3.

Контрпример

Иногда контрпример может быть более убедительным, чем доказательство, поэтому мы решили воспроизвести потерю данных. Apache Kafka по умолчанию не использует fsync, поэтому мы решили использовать его для демонстрации возможности потери глобальных данных из-за потери локальных данных на одном узле.

Скопируйте репозиторий с примером:

git clone https://github.com/redpanda-data/kafka-fsync
cd kafka-fsync

Соберите контейнер с локально развернутым кластером Kafka, состоящим из трех процессов Kafka и одного процесса Apache ZooKeeper:

docker build -t kafka-fsync .

Запустите контейнер и войдите в него:

docker run -d --name kafka_fsync -v $(pwd):/fsync kafka-fsync
docker exec -it kafka_fsync /bin/bash

Создайте каталогов данных:

cd /fsync
./create.dirs.sh

Запустите процесс ZooKeeper

/root/apache-zookeeper-3.8.1-bin/bin/zkServer.sh --config . start

Запустите три процесса Kafka:

nohup /root/kafka_2.12-3.4.0/bin/kafka-server-start.sh kafka1/server.properties >> /fsync/kafka1/kafka.log 2>&1 & echo $! > /fsync/kafka1/pid &
nohup /root/kafka_2.12-3.4.0/bin/kafka-server-start.sh kafka2/server.properties >> /fsync/kafka2/kafka.log 2>&1 & echo $! > /fsync/kafka2/pid &
nohup /root/kafka_2.12-3.4.0/bin/kafka-server-start.sh kafka3/server.properties >> /fsync/kafka3/kafka.log 2>&1 & echo $! > /fsync/kafka3/pid &

Создайте topic1 с RF=3

/root/kafka_2.12-3.4.0/bin/kafka-topics.sh --create --topic topic1 --partitions 1 --replication-factor 3 --bootstrap-server 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094

В первую очередь, давайте сконцентрируемся на изоляции процесса kafka1. Хотя существуют различные методы, такие как использование docker-compose и манипуляции с iptables, для упрощения мы решаем просто завершить процесс kafka1 (наблюдателю невозможно отличить сетевые разделы от сбоев процесса). Кроме того, поскольку операционная система остается незатронутой, у kafka1 не возникло потери локальных данных.

cat kafka1/pid | xargs kill -9

Напишите десять записей с acks=all

python3 write10.py

Вывод:

wrote key0=value0 at offset=0
wrote key1=value1 at offset=1
...
wrote key8=value8 at offset=8
wrote key9=value9 at offset=9

Разберемся, какой узел является лидером

/root/kafka_2.12-3.4.0/bin/kafka-topics.sh --describe --topic topic1 --bootstrap-server 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094

На моей машине это был kafka3. Теперь перейдем к изоляции ZooKeeper, чтобы временно «заморозить» лидерство на третьем узле. Аналогично нашему подходу с kafka1, мы симулируем это, завершая процесс ZooKeeper.

cat zookeeper/zookeeper_server.pid | xargs kill -9

Теперь давайте симулируем эффекты следующих событий:

  • kafka2 был изолирован

  • kafka3 сбоит и теряет лог-суффикс

  • ZooKeeper, kafka3 и kafka1 восстанавливают соединение и формируют кластер

Мы завершаем работу kafka2, чтобы имитировать изоляцию узла, и завершаем работу kafka3, чтобы имитировать сбой:

cat kafka2/pid kafka3/pid | xargs kill -9

Простого завершения процесса недостаточно, чтобы вызвать локальную потерю несинхронизированных данных. Поэтому, чтобы имитировать это, мы вручную удалим последние десять байтов с помощью команды truncate.

truncate -s -10 kafka3/data/topic1-0/00000000000000000000.log

Восстановление подключения к Zookeeper

/root/apache-zookeeper-3.8.1-bin/bin/zkServer.sh --config . start

Давайте дадим ему минуту, чтобы удалить эфемерную информацию. Затем запустите бывшего лидера (в моем случае kafka3)

nohup /root/kafka_2.12-3.4.0/bin/kafka-server-start.sh kafka3/server.properties >> /fsync/kafka3/kafka.log 2>&1 & echo $! > /fsync/kafka3/pid &

Подождите, пока он станет лидером

/root/kafka_2.12-3.4.0/bin/kafka-topics.sh --describe --topic topic1 --bootstrap-server 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094

Затем восстановите подключение процесса kafka1

nohup /root/kafka_2.12-3.4.0/bin/kafka-server-start.sh kafka1/server.properties >> /fsync/kafka1/kafka.log 2>&1 & echo $! > /fsync/kafka1/pid &

Опять же, давайте подождем, пока формируется ISR из двух узлов.

/root/kafka_2.12-3.4.0/bin/kafka-topics.sh --describe --topic topic1 --bootstrap-server 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094

Получили рабочий кластер Kafka, напишем еще десять записей

python3 write10.py

В идеале мы должны увидеть

wrote key0=value0 at offset=10
wrote key1=value1 at offset=11
...
wrote key8=value8 at offset=18
wrote key9=value9 at offset=19

Но то, что мы видим на самом деле

wrote key0=value0 at offset=9
wrote key1=value1 at offset=10
...
wrote key8=value8 at offset=17
wrote key9=value9 at offset=18

Таким образом, вызвав потерю локальных данных на одном узле (это может произойти без использования fsync), мы вызвали глобальную потерю данных, и Kafka потеряла запись key9=value9 с offset=9.

Заключение

Использование fsync является важным для обеспечения согласованности данных и их сохранности в реплицируемой системе. В посте подчеркивается распространенное заблуждение о том, что только репликация может устранить необходимость в использовании fsync, и демонстрируется, что потеря несинхронизированных данных на одном узле все равно может привести к глобальной потере данных в non-Byzantine реплицируемой системе.

Комментарии (2)


  1. anton19286
    06.07.2023 15:18

    Невычитанный машинный перевод (


  1. grossws
    06.07.2023 15:18

    Репликация гарантирует согласованность данных между всеми узлами и обеспечивает, что данные находятся в такой же согласованности, как если бы они находились на одном узле (линейная согласованность)

    С каких это пор? Репликация сама по себе ничего такого не гарантирует в принципе. Можно вспомнить DynamoDB или Apache Cassandra где это совсем не так.