Привет, друзья! Сегодня рассмотрим, как реализовать алгоритм Raft на Python.
Raft — это алгоритм распределённого консенсуса, который делает три вещи:
Выбирает лидера (тот, кто рулит кластером).
Реплицирует данные по всем узлам (чтобы не потерять, если что‑то пойдет не так).
Гарантирует согласованность данных (никакой битой записи в журнале).
Представьте себе группу людей, пытающихся решить, какую пиццу заказать. Если нет лидера — хаос. Если кто‑то заказал ананасовую, а другие решили, что это была шутка, — еще хуже. Вот Raft и помогает избежать этой катастрофы.
Структура проекта
Определимся, что мы собираемся написать:
Node
— сердце системы, представляет один узел кластера.-
Механизмы:
Выборы лидера.
Репликация журнала.
Управление состоянием.
-
Дополнительно:
Обработка отказов.
Оптимизация производительности.
Клиентская логика.
Тестирование и отказоустойчивость.
Узел кластера
Каждый узел в Raft знает:
Своё состояние (лидер, кандидат, или — чаще всего — просто унылый
follower
).Текущий термин (порядковый номер цикла выборов).
Свой журнал (лог операций).
Начнём с базовой структуры:
import random
import threading
import time
from enum import Enum
class State(Enum):
FOLLOWER = 1
CANDIDATE = 2
LEADER = 3
class Node:
def __init__(self, node_id, peers):
self.id = node_id
self.peers = peers # Список других узлов
self.state = State.FOLLOWER
self.current_term = 0
self.voted_for = None
self.log = []
self.commit_index = -1
self.last_applied = -1
self.next_index = {}
self.match_index = {}
self.lock = threading.Lock()
self.election_timeout = self.reset_election_timeout()
self.disabled = False # Инициализация флага отключения
self.timer = threading.Thread(target=self.run_election_timer, daemon=True)
self.timer.start()
def reset_election_timeout(self):
return time.time() + random.uniform(5, 10)
Здесь определяем класс Node
с его основными атрибутами и инициализируем таймер выборов для каждого узла.
Таймер выборов
Узлы в Raft знают, что если лидер долго молчит, значит, пора искать нового. Здесь и поможет таймер выборов.
def run_election_timer(self):
while True:
time.sleep(0.1)
with self.lock:
if self.disabled:
continue
if time.time() >= self.election_timeout:
print(f"Узел {self.id}: лидер потерян, начинаю выборы.")
self.state = State.CANDIDATE
self.current_term += 1
self.voted_for = self.id
self.election_timeout = self.reset_election_timeout()
threading.Thread(target=self.start_election, daemon=True).start()
Этот метод постоянно проверяет, не истёк ли таймаут, и при необходимости инициирует процесс выборов нового лидера.
Выборы
Когда узел становится кандидатом, он рассылает всем остальным запросы на голосование:
def start_election(self):
votes = 1 # Голосуем за себя
for peer in self.peers:
if self.disabled:
continue
if peer.request_vote(self.current_term, self.id):
votes += 1
if votes > len(self.peers) // 2:
print(f"Узел {self.id}: выбран лидером!")
self.become_leader()
else:
print(f"Узел {self.id}: выборы провалились.")
Здесь узел собирает голоса от своих собратьев. Если набирается большинство — он становится лидером.
Как голосуют узлы?
Каждый узел отвечает на запросы голосования:
def request_vote(self, term, candidate_id):
with self.lock:
if self.disabled:
return False
if term > self.current_term:
self.current_term = term
self.voted_for = None
self.state = State.FOLLOWER
if self.voted_for is None or self.voted_for == candidate_id:
print(f"Узел {self.id}: голосую за {candidate_id}.")
self.voted_for = candidate_id
self.election_timeout = self.reset_election_timeout()
return True
else:
print(f"Узел {self.id}: отказал в голосе {candidate_id}.")
return False
Этот метод решает, дать ли свой голос кандидату, основываясь на текущем терминe и предыдущих голосах.
Лидерство
Если кандидат получает большинство голосов, он становится лидером:
def become_leader(self):
self.state = State.LEADER
print(f"Узел {self.id}: я лидер!")
for peer in self.peers:
self.next_index[peer.id] = len(self.log)
self.match_index[peer.id] = -1
threading.Thread(target=self.send_heartbeats, daemon=True).start()
При становлении лидером узел инициализирует индексы для репликации журнала и начинает рассылку сердцебиений.
Сердцебиения
Лидер периодически шлёт всем узлам «сигналы жизни»:
def send_heartbeats(self):
while True:
with self.lock:
if self.state != State.LEADER or self.disabled:
break
for peer in self.peers:
if self.disabled:
continue
print(f"Лидер {self.id}: отправляю heartbeat узлу {peer.id}.")
threading.Thread(target=self.append_entries, args=(peer,), daemon=True).start()
time.sleep(1)
Этот цикл обеспечивает поддержание лидерства и синхронизацию журнала с другими узлами.
Репликация журнала
Когда клиент отправляет команду, лидер добавляет её в журнал и синхронизирует с другими узлами:
def client_command(self, command):
with self.lock:
if self.state != State.LEADER or self.disabled:
print(f"Узел {self.id}: я не лидер, перенаправляю запрос.")
return False
entry = {'term': self.current_term, 'command': command}
self.log.append(entry)
print(f"Лидер {self.id}: добавляю команду {command} в журнал.")
threading.Thread(target=self.replicate_log, daemon=True).start()
return True
Здесь лидер обрабатывает команду клиента, добавляя её в свой журнал и инициируя процесс репликации на других узлах.
Репликация на другие узлы
def replicate_log(self):
while True:
with self.lock:
if self.disabled:
return
replicated = 1 # Лидер уже имеет запись
for peer in self.peers:
if self.match_index.get(peer.id, -1) >= len(self.log) - 1:
replicated += 1
if replicated > len(self.peers) // 2:
self.commit_index = len(self.log) - 1
self.apply_log()
break
time.sleep(0.1)
Этот метод гарантирует, что запись будет реплицирована на большинстве узлов перед её применением.
Применение журнала к состоянию
Метод apply_log
применяет подтверждённые записи к состоянию узла:
def apply_log(self):
with self.lock:
while self.last_applied < self.commit_index:
self.last_applied += 1
entry = self.log[self.last_applied]
# Здесь мы применяем команду к состоянию
print(f"Узел {self.id} применил команду: {entry['command']}")
Последовательно применяем команды из журнала к состоянию узла.
Обработка AppendEntries
Узлы должны уметь принимать записи от лидера:
def append_entries(self, peer):
with self.lock:
if self.disabled:
return
prev_log_index = self.next_index.get(peer.id, 0) - 1
prev_log_term = self.log[prev_log_index]['term'] if prev_log_index >= 0 and prev_log_index < len(self.log) else -1
entries = self.log[self.next_index.get(peer.id, 0):]
term = self.current_term
success = peer.receive_append_entries(
term=term,
leader_id=self.id,
prev_log_index=prev_log_index,
prev_log_term=prev_log_term,
entries=entries,
leader_commit=self.commit_index
)
with self.lock:
if self.disabled:
return
if success:
self.match_index[peer.id] = self.next_index.get(peer.id, 0) + len(entries) - 1
self.next_index[peer.id] = self.match_index[peer.id] + 1
else:
self.next_index[peer.id] = max(0, self.next_index.get(peer.id, 0) - 1)
Этот метод отправляет записи журнала другим узлам и обновляет индексы в зависимости от ответа.
А теперь реализуем обработку входящих AppendEntries
def receive_append_entries(self, term, leader_id, prev_log_index, prev_log_term, entries, leader_commit):
with self.lock:
if self.disabled:
return False
if term < self.current_term:
return False
self.state = State.FOLLOWER
self.current_term = term
self.election_timeout = self.reset_election_timeout()
if prev_log_index >= 0:
if len(self.log) <= prev_log_index or self.log[prev_log_index]['term'] != prev_log_term:
return False
# Добавляем новые записи, если их ещё нет
for entry in entries:
if len(self.log) > prev_log_index + 1:
if self.log[prev_log_index + 1]['term'] != entry['term']:
self.log = self.log[:prev_log_index + 1]
self.log.append(entry)
else:
self.log.append(entry)
if leader_commit > self.commit_index:
self.commit_index = min(leader_commit, len(self.log) - 1)
threading.Thread(target=self.apply_log, daemon=True).start()
return True
Этот метод будет проверять согласованность журнала и обновлять его, если все в порядке.
Симуляция отказов
Чтобы протестировать отказоустойчивость, добавим возможность отключать узлы:
def disable(self):
with self.lock:
self.disabled = True
print(f"Узел {self.id} отключен.")
def enable(self):
with self.lock:
self.disabled = False
self.election_timeout = self.reset_election_timeout()
print(f"Узел {self.id} включен.")
И изменим методы отправки сообщений, чтобы учитывать состояние узла:
def request_vote(self, term, candidate_id):
if getattr(self, 'disabled', False):
return False
# Остальной код...
def receive_append_entries(self, *args, **kwargs):
if getattr(self, 'disabled', False):
return False
# Остальной код...
Теперь можно симулировать отключение узлов и проверять, как кластер реагирует на это.
Протестируем кластер
Создадим несколько узлов и запустим их:
# main.py
from node import Node, State
import time
if __name__ == "__main__":
nodes = []
# Создаем все узлы сначала с пустыми peers
for i in range(5):
node = Node(node_id=i, peers=[])
nodes.append(node)
# Теперь устанавливаем peers для каждого узла
for node in nodes:
node.peers = [peer for peer in nodes if peer.id != node.id]
leader = None
# Ждем, пока лидер не будет выбран
while not leader:
for node in nodes:
if node.state == State.LEADER:
leader = node
break
time.sleep(0.5)
print(f"Лидер выбран: Узел {leader.id}")
leader.client_command("Сохранить данные")
# Отключаем лидера
leader.disable()
print(f"Узел {leader.id} отключен")
time.sleep(15)
# Проверяем, выбран ли новый лидер
new_leader = None
for node in nodes:
if node.state == State.LEADER and not getattr(node, 'disabled', False):
new_leader = node
break
if new_leader:
print(f"Новый лидер: Узел {new_leader.id}")
new_leader.client_command("Новая команда")
else:
print("Не удалось выбрать нового лидера.")
Скрипт создаст пять узлов, инициирует выборы лидера, отправит команды и симулирует отказ лидера. Вот что можно ожидать в консоли:
# main.py
from node import Node, State
import time
if __name__ == "__main__":
nodes = []
# Создаем все узлы сначала с пустыми peers
for i in range(5):
node = Node(node_id=i, peers=[])
nodes.append(node)
# Теперь устанавливаем peers для каждого узла
for node in nodes:
node.peers = [peer for peer in nodes if peer.id != node.id]
leader = None
# Ждем, пока лидер не будет выбран
while not leader:
for node in nodes:
if node.state == State.LEADER:
leader = node
break
time.sleep(0.5)
print(f"Лидер выбран: Узел {leader.id}")
leader.client_command("Сохранить данные")
# Отключаем лидера
leader.disable()
print(f"Узел {leader.id} отключен")
time.sleep(15)
# Проверяем, выбран ли новый лидер
new_leader = None
for node in nodes:
if node.state == State.LEADER and not getattr(node, 'disabled', False):
new_leader = node
break
if new_leader:
print(f"Новый лидер: Узел {new_leader.id}")
new_leader.client_command("Новая команда")
else:
print("Не удалось выбрать нового лидера.")
Вывод получится такой:
Узел 0: лидер потерян, начинаю выборы.
Узел 0: голосую за 0.
Узел 1: голосую за 0.
Узел 2: голосую за 0.
Узел 3: голосую за 0.
Узел 4: голосую за 0.
Узел 0: выбран лидером!
Узел 0: я лидер!
Лидер 0: отправляю heartbeat узлу 1.
Лидер 0: отправляю heartbeat узлу 2.
Лидер 0: отправляю heartbeat узлу 3.
Лидер 0: отправляю heartbeat узлу 4.
Узел 0: добавляю команду Сохранить данные в журнал.
Лидер 0: отправляю heartbeat узлу 1.
Лидер 0: отправляю heartbeat узлу 2.
Лидер 0: отправляю heartbeat узлу 3.
Лидер 0: отправляю heartbeat узлу 4.
Узел 0 отключен
Узел 1: лидер потерян, начинаю выборы.
Узел 1: голосую за 1.
Узел 2: голосую за 1.
Узел 3: голосую за 1.
Узел 4: голосую за 1.
Узел 1: выбран лидером!
Узел 1: я лидер!
Лидер 1: отправляю heartbeat узлу 0.
Лидер 1: отправляю heartbeat узлу 2.
Лидер 1: отправляю heartbeat узлу 3.
Лидер 1: отправляю heartbeat узлу 4.
Новый лидер: Узел 1
Узел 1: добавляю команду Новая команда в журнал.
Лидер 1: отправляю heartbeat узлу 0.
Лидер 1: отправляю heartbeat узлу 2.
Лидер 1: отправляю heartbeat узлу 3.
Лидер 1: отправляю heartbeat узлу 4.
Узел 1: применил команду: Новая команда
В начале узлы начинают выборы лидера и узел 0 становится первым лидером после получения большинства голосов. Затем лидер 0 начинает отправлять сердцебиения и реплицировать команду «Сохранить данные». Когда мы отключаем лидера 0, узел 1 обнаруживает его отсутствие, инициирует новые выборы и становится новым лидером, после чего успешно обрабатывает и реплицирует команду «Новая команда».
Но тут сделаю замечание, что из‑за асинхронности и случайных таймаутов порядок событий может меняться при каждом запуске.
Если захотите прикрутить Raft в реальный проект, то нужно будет добавить обработку сетевых сбоев, механизм «догонки» отставших узлов, и проверки корректности журналов. Улучшите производительность, перейдя на асинхронное программирование, минимизируйте блокировки и оптимизируйте передачу данных. Реализуйте обратную связь для клиентов, чтобы они получали подтверждения, и протестируйте систему в сложных сценариях отказов.
Всех желающих приглашаю на открытые уроки по архитектуре высоких нагрузок:
9 декабря: «Обеспечение отказоустойчивости хранилищ» — Вы узнаете, как правильно проектировать и настраивать хранилища, чтобы минимизировать простои и предотвратить потерю данных при сбоях. Записаться
16 декабря: «Распределённые транзакции — как добиться согласования данных в распределённой сети». Записаться