Распределенная блокировка — очень удобный инструмент в кластере, который помогает обеспечивать эксклюзивный доступ к некоторому общему ресурсу. Цель такой блокировки — обеспечить доступ к ресурсу лишь одному сервису или запросу в данный момент времени. Так предотвращается гонка за данными и их неконсистентность. Распределенная (или кластерная) блокировка называется так потому, что она обеспечивается несколькими узлами, и выход из строя одного из них не повлияет на приложение. В этой статье я расскажу, как реализовать этот инструмент с помощью Tarantool 3.
Tarantool 3
Tarantool 3 — это новая версия in-memory базы данных, которая лежит в основе промежуточного ПО для хранения и обработки данных Tarantool. Она полностью совместима с файлами и репликацией второй версии.
Перечислю основные характеристики Tarantool 3 в сравнении с Tarantool 2:
Cервер с конфигом.
Ориентация на кластерность.
Удобная система триггеров.
Переопределение сетевого API (IPROTO).
Упрощение настройки репликации.
Имена узлов вместо UUID.
Расширенная статистика по потреблению памяти.
Значения полей по умолчанию.
Подробнее можно почитать в другой статье на Хабре
Принцип работы
Блокировки будут храниться в таблице (спейсе) с такой структурой:
name |
token |
expire |
Взятой блокировкой будет считаться наличие строки в таблице c ненулевым expire. Отпущенной блокировкой будет считаться или отсутствие строки в таблице, или нулевой expire.
Для надежного хранения состояния блокировки настроим синхронную репликацию. Это делается при создании таблицы (спейса). Для операций с блокировками будем использовать транзакции в режиме linearizable. Для автоматического переключения лидера настроим raft-фейловер.
Так будет выглядеть топология приложения.
Напишем хранимую процедуру на Lua для взятия блокировки. Процедура работает следующим образом:
Принимает параметры: имя и таймаут.
Проверяет, существует ли блокировка. Если нет — создает новую.
Если блокировка существовала, то проверяет, отпущена ли она (expire == 0).
Если отпущена, то наращивает token и устанавливает время expire.
Если уже была взята, процедура возвращает nil.
function _G.acquireLock(name, timeout)
box.begin({ txn_isolation = "linearizable", timeout = timeout })
local lock = box.space.locks:get(name)
if lock == nil then
lock = { name, 0, clock.time64() + timeout * 1e9 }
box.space.locks:insert(lock)
box.commit()
return lock
end
if lock['expire'] == 0 then
box.space.locks:update({ name }, { "=", "token", lock['token'] + 1 },
{ "=", "expire", clock.time64() + timeout * 1e9 })
box.commit()
return lock
end
box.commit()
return nil
end
Важно учесть, что наш алгоритм запущен в кластере и использует время. Для корректности работы важно держать время синхронизированным. Как это сделать.
Теперь напишем процедуру на Lua для отпускания блокировки. Процедура принимает параметры: имя и токен — и проверяет, что токен блокировки совпадает и expire не равен нулю. Тогда блокировка отпускается и процедура возвращает true, иначе — false.
function _G.releaseLock(name, token)
box.begin({ txn_isolation = "linearizable" })
local lock = box.space.locks:get(name)
if lock == nil then
box.commit()
return false
end
if lock['token'] == token and lock['expire'] ~= 0 then
box.space.locks:update({ name }, { { "=", "expire", 0 } })
box.commit()
return true
end
box.commit()
return false
end
Для создания таблицы (спейса) с блокировками используется фоновая процедура. Она:
Ждет, что узел станет лидером.
Создает таблицу с необходимыми полями.
Запускает цикл для обработки тайм-аутов блокировок.
fiber.create(function()
fiber.name("expire-lock-fiber")
box.ctl.wait_rw()
box.schema.space.create("locks", {
is_sync=true,
if_not_exists=true})
box.space.locks:format({{name="name", type="string"},
{name='token', type='unsigned'},
{name='expire', type='unsigned'}})
box.space.locks:create_index('name', {
parts={{field="name", type="string"}},
if_not_exists=true})
box.space.locks:create_index('expire', {
parts={{field="expire", type="unsigned"}},
unique=false,
if_not_exists=true})
while true do
box.ctl.wait_rw()
local now = clock.time64()
for _, t in box.space.locks.index.expire:pairs({0}, {iterator="GT"}) do
if t[3] < now then
local rc, err = pcall(box.space.locks.update, box.space.locks, {t["name"]}, {{"=", "expire", 0}})
if not rc then
log.info(err)
break
end
end
end
fiber.sleep(1)
end
end)
Инициализация приложения
Сначала инициализируем рабочее окружение для разработки:
tt init
Создадим директорию будущего приложения. Приложения располагаются в instances.enabled:
mkdir instances.enabled/app
Создадим файл конфигурации будущего локального кластера. Этот файл будет содержать топологию и настройки узлов кластера:
touch instances.enabled/app/config.yml
В конфигурации укажем:
Что в кластере должно быть три узла.
Параметры фейловера для работы кластера.
Включение mvcc-режима работы.
Файл с хранимыми процедурами.
# Настраиваем пользователей и их секреты
credentials:
users:
client:
password: "secret"
replicator:
password: "topsecret"
roles: [replication]
# Указываем под каким пользователем узлы Tarantool
# будут подключаться друг к другу
iproto:
advertise:
peer:
login: replicator
# Настраиваем raft failover для автоматического и
# и ручного консистентного переключения лидера
replication:
failover: election
# Использует мультиверсионный движок базы данных
# для решения проблем грязных чтений
database:
use_mvcc_engine: true
# Указываем исходный файл для создания API блокировок
app:
file: init.lua
# Топология кластера
# 3 узла объединённых репликацией
groups:
group-001:
replicasets:
replicaset-001:
replication:
bootstrap_strategy: config
bootstrap_leader: instance-001
instances:
instance-001:
iproto:
listen:
- uri: 127.0.0.1:3301
instance-002:
iproto:
listen:
- uri: 127.0.0.1:3302
instance-003:
iproto:
listen:
- uri: 127.0.0.1:3303
Создадим файл, управляющий запуском локальных узлов кластера:
touch instances.enabled/app/instances.yaml
Укажем три узла для запуска:
instance-001:
instance-002:
instance-003:
Приложение на Lua
Создадим файл init.lua:
touch instances.enabled/app/init.lua
Полный листинг приложения:
local log = require('log')
local fiber = require('fiber')
local clock = require('clock')
log.info("starting application")
function _G.acquireLock(name, timeout)
box.begin({ txn_isolation = "linearizable", timeout = timeout })
local lock = box.space.locks:get(name)
if lock == nil then
lock = { name, 0, clock.time64() + timeout * 1e9 }
box.space.locks:insert(lock)
box.commit()
return lock
end
if lock['expire'] == 0 then
box.space.locks:update({ name }, { "=", "token", lock['token'] + 1 },
{ "=", "expire", clock.time64() + timeout * 1e9 })
box.commit()
return lock
end
box.commit()
return nil
end
function _G.releaseLock(name, token)
box.begin({ txn_isolation = "linearizable" })
local lock = box.space.locks:get(name)
if lock == nil then
box.commit()
return false
end
if lock['token'] == token and lock['expire'] ~= 0 then
box.space.locks:update({ name }, { { "=", "expire", 0 } })
box.commit()
return true
end
box.commit()
return false
end
fiber.create(function()
fiber.name("expire-lock-fiber")
box.ctl.wait_rw()
box.schema.space.create("locks", {
is_sync = true,
if_not_exists = true
})
box.space.locks:format({ { name = "name", type = "string" },
{ name = 'token', type = 'unsigned' },
{ name = 'expire', type = 'unsigned' } })
box.space.locks:create_index('name', {
parts = { { field = "name", type = "string" } },
if_not_exists = true
})
box.space.locks:create_index('expire', {
parts = { { field = "expire", type = "unsigned" } },
unique = false,
if_not_exists = true
})
while true do
box.ctl.wait_rw()
local now = clock.time64()
for _, t in box.space.locks.index.expire:pairs({ 0 }, { iterator = "GT" }) do
if t[3] < now then
local rc, err = pcall(box.space.locks.update, box.space.locks, { t["name"] }, { { "=", "expire", 0 } })
if not rc then
log.info(err)
break
end
else
break
end
end
fiber.sleep(1)
end
end)
Запуск локального кластера
Для запуска кластера из узлов Tarantool 3 воспользуемся командой tt:
tt start
Для проверки статуса узлов выполним команду:
tt status
Для проверки того, что кластер собрался, необходимо подключиться по очереди на узлы и проверить состояние репликации:
tt connect app:instance-001
> box.info.replication
Скрытый текст
---
- 1:
id: 1
uuid: 09652d7e-6b1d-4304-aad0-6ae058c847c8
lsn: 16
upstream:
status: follow
idle: 0.92459100019187
peer: 127.0.0.1:3301
lag: 7.9154968261719e-05
name: instance-001
downstream:
status: follow
idle: 0.91285500023514
vclock: {1: 16, 2: 5186, 3: 5013}
lag: 0
2:
id: 2
uuid: 8e62d6ea-badf-490d-9153-a342d0e48fd8
lsn: 5186
name: instance-002
3:
id: 3
uuid: 7f40d300-e9e6-4916-adaa-b669b672256b
lsn: 5013
upstream:
status: follow
idle: 0.91279700025916
peer: 127.0.0.1:3303
lag: 5.7220458984375e-05
name: instance-003
downstream:
status: follow
idle: 0.91363300010562
vclock: {1: 16, 2: 5186, 3: 5013}
lag: 0
...
Пример использования кластерных блокировок на Golang
Tarantool общается с приложениями с помощью msgpack-формата, который иногда допускает изменения размера целочисленного типа. Чтобы привести всё к uint64, сделаем утилитарную функцию.
func toUint64(v any) uint64 {
switch t := v.(type) {
case int, int8, int16, int32, int64:
return uint64(reflect.ValueOf(t).Int())
case uint, uint8, uint16, uint32, uint64:
return reflect.ValueOf(t).Uint()
default:
panic("type error")
}
}
Для общения с кластером Tarantool воспользуемся готовым пулом подключений, который умеет автоматически вычислять, какой узел является лидером:
instances := []pool.Instance{
{
Name: "instance-001",
Dialer: tarantool.NetDialer{
Address: "127.0.0.1:3301",
},
},
{
Name: "instance-002",
Dialer: tarantool.NetDialer{
Address: "127.0.0.1:3302",
},
},
{
Name: "instance-003",
Dialer: tarantool.NetDialer{
Address: "127.0.0.1:3303",
},
},
}
p, err := pool.Connect(context.Background(), instances)
if err != nil {
panic(err)
}
Создадим структуру для хранения блокировки:
type Lock struct {
name string
token uint64
expire uint64
}
Создадим функция для взятия блокировки. Эта функция:
Через пул подключений обращается к кластеру и вызывает хранимую процедуру acquireLock.
Принимает в качестве параметров контекст для выполнения запроса, пул соединений с кластером, имя блокировки и тайм-аут блокировки.
В случае успеха вернет объект блокировки, иначе вернет ошибку.
func acquireLock(ctx context.Context, p pool.Pooler, name string, timeout uint64) (*Lock, error) {
resp, err := p.Do(tarantool.NewCallRequest("acquireLock").Context(ctx).Args([]any{name, timeout}), pool.RW).Get()
if err != nil {
return nil, err
}
if len(resp) == 0 {
return nil, fmt.Errorf("no response")
}
if resp[0] == nil {
return nil, fmt.Errorf("failed")
}
data := resp[0].([]any)
result := Lock{
name: name,
token: toUint64(data[1]),
expire: toUint64(data[2]),
}
return &result, nil
}
Создадим функцию для отпускания блокировки. Эта функция:
Принимает контекст, пул соединений к кластеру, объект с блокировкой.
Через пул подключений обращается к кластеру и вызывает хранимую процедуру acquireLock.
Вернет true, если блокировка успешно отпущена.
Вернет false в случае тех или иных ошибок.
func releaseLock(ctx context.Context, p pool.Pooler, l *Lock) (bool, error) {
resp, err := p.Do(tarantool.NewCallRequest("releaseLock").Context(ctx).Args([]any{l.name, l.token}), pool.RW).Get()
if err != nil {
return false, err
}
if len(resp) == 0 {
return false, fmt.Errorf("no response")
}
return resp[0].(bool), nil
}
Скрытый текст
instances := []pool.Instance{
{
Name: "instance-001",
Dialer: tarantool.NetDialer{
Address: "127.0.0.1:3301",
},
},
{
Name: "instance-002",
Dialer: tarantool.NetDialer{
Address: "127.0.0.1:3302",
},
},
{
Name: "instance-003",
Dialer: tarantool.NetDialer{
Address: "127.0.0.1:3303",
},
},
}
p, err := pool.Connect(context.Background(), instances)
if err != nil {
panic(err)
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
var l *Lock
for i := 0; i < 3; i++ {
l, err = acquireLock(ctx, p, name, 10)
if err != nil {
time.Sleep(100 * time.Millisecond)
continue
}
break
}
if l == nil {
fmt.Println(name, "already locked")
return
}
defer func() {
ok, _ := releaseLock(ctx, p, l)
if ok {
fmt.Println(name, "success unlock")
} else {
fmt.Println(name, "lock expired")
}
}()
fmt.Println(name, "success lock")
Весь код main.go
package main
import (
"context"
"fmt"
"reflect"
"sync"
"time"
"github.com/tarantool/go-tarantool/v2"
_ "github.com/tarantool/go-tarantool/v2/datetime"
_ "github.com/tarantool/go-tarantool/v2/decimal"
"github.com/tarantool/go-tarantool/v2/pool"
_ "github.com/tarantool/go-tarantool/v2/uuid"
"github.com/tjarratt/babble"
)
func toUint64(v any) uint64 {
switch t := v.(type) {
case int, int8, int16, int32, int64:
return uint64(reflect.ValueOf(t).Int()) // a has type int64
case uint, uint8, uint16, uint32, uint64:
return reflect.ValueOf(t).Uint() // a has type uint64
default:
panic("type error")
}
}
type Lock struct {
name string
token uint64
expire uint64
}
func acquireLock(ctx context.Context, p pool.Pooler, name string, timeout uint64) (*Lock, error) {
resp, err := p.Do(tarantool.NewCallRequest("acquireLock").Context(ctx).Args([]any{name, timeout}), pool.RW).Get()
if err != nil {
return nil, err
}
if len(resp) == 0 {
return nil, fmt.Errorf("no response")
}
if resp[0] == nil {
return nil, fmt.Errorf("failed")
}
data := resp[0].([]any)
result := Lock{
name: name,
token: toUint64(data[1]),
expire: toUint64(data[2]),
}
return &result, nil
}
func releaseLock(ctx context.Context, p pool.Pooler, l *Lock) (bool, error) {
resp, err := p.Do(tarantool.NewCallRequest("releaseLock").Context(ctx).Args([]any{l.name, l.token}), pool.RW).Get()
if err != nil {
return false, err
}
if len(resp) == 0 {
return false, fmt.Errorf("no response")
}
return resp[0].(bool), nil
}
func main() {
instances := []pool.Instance{
{
Name: "instance-001",
Dialer: tarantool.NetDialer{
Address: "127.0.0.1:3301",
},
},
{
Name: "instance-002",
Dialer: tarantool.NetDialer{
Address: "127.0.0.1:3302",
},
},
{
Name: "instance-003",
Dialer: tarantool.NetDialer{
Address: "127.0.0.1:3303",
},
},
}
p, err := pool.Connect(context.Background(), instances)
if err != nil {
panic(err)
}
babbler := babble.NewBabbler()
babbler.Count = 1
wg := sync.WaitGroup{}
for i := 0; i < 1000; i++ {
name := babbler.Babble()
wg.Add(1)
go func(name string) {
defer wg.Done()
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
var l *Lock
for i := 0; i < 3; i++ {
l, err = acquireLock(ctx, p, name, 10)
if err != nil {
time.Sleep(100 * time.Millisecond)
continue
}
break
}
if l == nil {
fmt.Println(name, "already locked")
return
}
defer func() {
ok, _ := releaseLock(ctx, p, l)
if ok {
fmt.Println(name, "success unlock")
} else {
fmt.Println(name, "lock expired")
}
}()
fmt.Println(name, "success lock")
time.Sleep(50 * time.Millisecond)
}(name)
}
wg.Wait()
}
Итоги
Примерно за 100 строк мы сделали на Tarantool 3 приложения для управления кластерными блокировками. Такое приложение может состоять из одного или нескольких узлов. Для регулирования количества узлов достаточно только редактирования yaml-файла. Если один из узлов Tarantool упадет, то сработает механизм автоматического выбора лидера, и приложение восстановит свою работоспособность.