Распределенная блокировка — очень удобный инструмент в кластере, который помогает обеспечивать эксклюзивный доступ к некоторому общему ресурсу. Цель такой блокировки — обеспечить доступ к ресурсу лишь одному сервису или запросу в данный момент времени. Так предотвращается гонка за данными и их неконсистентность. Распределенная (или кластерная) блокировка называется так потому, что она обеспечивается несколькими узлами, и выход из строя одного из них не повлияет на приложение. В этой статье я расскажу, как реализовать этот инструмент с помощью Tarantool 3.

Tarantool 3

Tarantool 3 — это новая версия in-memory базы данных, которая лежит в основе промежуточного ПО для хранения и обработки данных Tarantool. Она полностью совместима с файлами и репликацией второй версии.

Перечислю основные характеристики Tarantool 3 в сравнении с Tarantool 2:

  • Cервер с конфигом.

  • Ориентация на кластерность.

  • Удобная система триггеров.

  • Переопределение сетевого API (IPROTO).

  • Упрощение настройки репликации.

  • Имена узлов вместо UUID.

  • Расширенная статистика по потреблению памяти.

  • Значения полей по умолчанию.

Подробнее можно почитать в другой статье на Хабре

Принцип работы

Блокировки будут храниться в таблице (спейсе) с такой структурой:

name

token

expire

Взятой блокировкой будет считаться наличие строки в таблице c ненулевым expire. Отпущенной блокировкой будет считаться или отсутствие строки в таблице, или нулевой expire.

Для надежного хранения состояния блокировки настроим синхронную репликацию. Это делается при создании таблицы (спейса). Для операций с блокировками будем использовать транзакции в режиме linearizable. Для автоматического переключения лидера настроим raft-фейловер.

Так будет выглядеть топология приложения.

Три узла с одним лидером и golang приложение
Три узла с одним лидером и golang приложение

Напишем хранимую процедуру на Lua для взятия блокировки. Процедура работает следующим образом:

  • Принимает параметры: имя и таймаут.

  • Проверяет, существует ли блокировка. Если нет — создает новую. 

  • Если блокировка существовала, то проверяет, отпущена ли она (expire == 0). 

  • Если отпущена, то наращивает token и устанавливает время expire. 

  • Если уже была взята, процедура возвращает nil.

function _G.acquireLock(name, timeout)
    box.begin({ txn_isolation = "linearizable", timeout = timeout })


    local lock = box.space.locks:get(name)
    if lock == nil then
        lock = { name, 0, clock.time64() + timeout * 1e9 }
        box.space.locks:insert(lock)
        box.commit()
        return lock
    end

    if lock['expire'] == 0 then
        box.space.locks:update({ name }, { "=", "token", lock['token'] + 1 },
            { "=", "expire", clock.time64() + timeout * 1e9 })
        box.commit()
        return lock
    end

    box.commit()
    return nil
end

Важно учесть, что наш алгоритм запущен в кластере и использует время. Для корректности работы важно держать время синхронизированным. Как это сделать.

Теперь напишем процедуру на  Lua для отпускания блокировки. Процедура принимает параметры: имя и токен — и проверяет, что токен блокировки совпадает и expire не равен нулю. Тогда блокировка отпускается и процедура возвращает true, иначе — false.

function _G.releaseLock(name, token)

    box.begin({ txn_isolation = "linearizable" })

    local lock = box.space.locks:get(name)
    if lock == nil then
        box.commit()
        return false
    end

    if lock['token'] == token and lock['expire'] ~= 0 then
        box.space.locks:update({ name }, { { "=", "expire", 0 } })
        box.commit()
        return true
    end

    box.commit()
    return false
end

Для создания таблицы (спейса) с блокировками используется фоновая процедура. Она:

  1. Ждет, что узел станет лидером. 

  2. Создает таблицу с необходимыми полями.

  3. Запускает цикл для обработки тайм-аутов блокировок.

fiber.create(function()

    fiber.name("expire-lock-fiber")

    box.ctl.wait_rw()

    box.schema.space.create("locks", {
        is_sync=true,
        if_not_exists=true})

    box.space.locks:format({{name="name", type="string"},
        {name='token', type='unsigned'},
        {name='expire', type='unsigned'}})

    box.space.locks:create_index('name', {
        parts={{field="name", type="string"}},
        if_not_exists=true})

    box.space.locks:create_index('expire', {
        parts={{field="expire", type="unsigned"}},
        unique=false,
        if_not_exists=true})

    while true do

        box.ctl.wait_rw()

        local now = clock.time64()
        for _, t in box.space.locks.index.expire:pairs({0}, {iterator="GT"}) do
            if t[3] < now then
                local rc, err = pcall(box.space.locks.update, box.space.locks, {t["name"]}, {{"=", "expire", 0}})
                if not rc then
                    log.info(err)
                    break
                end
            end
        end

        fiber.sleep(1)
    end
end)

Инициализация приложения

Сначала инициализируем рабочее окружение для разработки:

tt init

Создадим директорию будущего приложения. Приложения располагаются в instances.enabled:

mkdir instances.enabled/app

Создадим файл конфигурации будущего локального кластера. Этот файл будет содержать топологию и настройки узлов кластера:

touch instances.enabled/app/config.yml

В конфигурации укажем:

  • Что в кластере должно быть три узла.

  • Параметры фейловера для работы кластера.

  • Включение mvcc-режима работы.

  • Файл с хранимыми процедурами.

# Настраиваем пользователей и их секреты
credentials:
  users:
    client:
      password: "secret"
    replicator:
      password: "topsecret"
      roles: [replication]

# Указываем под каким пользователем узлы Tarantool
# будут подключаться друг к другу
iproto:
  advertise:
    peer:
      login: replicator

# Настраиваем raft failover для автоматического и
# и ручного консистентного переключения лидера
replication:
  failover: election

# Использует мультиверсионный движок базы данных
# для решения проблем грязных чтений
database:
  use_mvcc_engine: true

# Указываем исходный файл для создания API блокировок
app:
  file: init.lua

# Топология кластера
# 3 узла объединённых репликацией
groups:
  group-001:
    replicasets:
      replicaset-001:
        replication:
          bootstrap_strategy: config
        bootstrap_leader: instance-001
        instances:
          instance-001:
            iproto:
              listen:
                - uri: 127.0.0.1:3301
          instance-002:
            iproto:
              listen:
                - uri: 127.0.0.1:3302
          instance-003:
            iproto:
              listen:
                - uri: 127.0.0.1:3303

Создадим файл, управляющий запуском локальных узлов кластера:

touch instances.enabled/app/instances.yaml

Укажем три узла для запуска:

instance-001:
instance-002:
instance-003:

Приложение на Lua

Создадим файл init.lua:

touch instances.enabled/app/init.lua

Полный листинг приложения:

local log = require('log')
local fiber = require('fiber')
local clock = require('clock')

log.info("starting application")

function _G.acquireLock(name, timeout)
    box.begin({ txn_isolation = "linearizable", timeout = timeout })
    local lock = box.space.locks:get(name)
    if lock == nil then
        lock = { name, 0, clock.time64() + timeout * 1e9 }
        box.space.locks:insert(lock)
        box.commit()
        return lock
    end

    if lock['expire'] == 0 then
        box.space.locks:update({ name }, { "=", "token", lock['token'] + 1 },
            { "=", "expire", clock.time64() + timeout * 1e9 })
        box.commit()
        return lock
    end

    box.commit()
    return nil
end

function _G.releaseLock(name, token)
    box.begin({ txn_isolation = "linearizable" })
    local lock = box.space.locks:get(name)
    if lock == nil then
        box.commit()
        return false
    end

    if lock['token'] == token and lock['expire'] ~= 0 then
        box.space.locks:update({ name }, { { "=", "expire", 0 } })
        box.commit()
        return true
    end

    box.commit()
    return false
end

fiber.create(function()
    fiber.name("expire-lock-fiber")
    box.ctl.wait_rw()

    box.schema.space.create("locks", {
        is_sync = true,
        if_not_exists = true
    })
    box.space.locks:format({ { name = "name", type = "string" },
        { name = 'token',  type = 'unsigned' },
        { name = 'expire', type = 'unsigned' } })

    box.space.locks:create_index('name', {
        parts = { { field = "name", type = "string" } },
        if_not_exists = true
    })
    box.space.locks:create_index('expire', {
        parts = { { field = "expire", type = "unsigned" } },
        unique = false,
        if_not_exists = true
    })

    while true do
        box.ctl.wait_rw()
        local now = clock.time64()
        for _, t in box.space.locks.index.expire:pairs({ 0 }, { iterator = "GT" }) do
            if t[3] < now then
                local rc, err = pcall(box.space.locks.update, box.space.locks, { t["name"] }, { { "=", "expire", 0 } })
                if not rc then
                    log.info(err)
                    break
                end
            else
                break
            end
        end
        fiber.sleep(1)
    end
end)

Запуск локального кластера

Для запуска кластера из узлов Tarantool 3 воспользуемся командой tt:

tt start

Для проверки статуса узлов выполним команду:

tt status

Для проверки того, что кластер собрался, необходимо подключиться по очереди на узлы и проверить состояние репликации:

tt connect app:instance-001

> box.info.replication
Скрытый текст
---
- 1:
    id: 1
    uuid: 09652d7e-6b1d-4304-aad0-6ae058c847c8
    lsn: 16
    upstream:
      status: follow
      idle: 0.92459100019187
      peer: 127.0.0.1:3301
      lag: 7.9154968261719e-05
    name: instance-001
    downstream:
      status: follow
      idle: 0.91285500023514
      vclock: {1: 16, 2: 5186, 3: 5013}
      lag: 0
  2:
    id: 2
    uuid: 8e62d6ea-badf-490d-9153-a342d0e48fd8
    lsn: 5186
    name: instance-002
  3:
    id: 3
    uuid: 7f40d300-e9e6-4916-adaa-b669b672256b
    lsn: 5013
    upstream:
      status: follow
      idle: 0.91279700025916
      peer: 127.0.0.1:3303
      lag: 5.7220458984375e-05
    name: instance-003
    downstream:
      status: follow
      idle: 0.91363300010562
      vclock: {1: 16, 2: 5186, 3: 5013}
      lag: 0
...

Пример использования кластерных блокировок на Golang

Tarantool общается с приложениями с помощью msgpack-формата, который иногда допускает изменения размера целочисленного типа. Чтобы привести всё к uint64, сделаем утилитарную функцию.

func toUint64(v any) uint64 {
	switch t := v.(type) {
	case int, int8, int16, int32, int64:
		return uint64(reflect.ValueOf(t).Int())
	case uint, uint8, uint16, uint32, uint64:
		return reflect.ValueOf(t).Uint()
	default:
		panic("type error")
	}
}

Для общения с кластером Tarantool воспользуемся готовым пулом подключений, который умеет автоматически вычислять, какой узел является лидером:

instances := []pool.Instance{
	{
		Name: "instance-001",
		Dialer: tarantool.NetDialer{
			Address: "127.0.0.1:3301",
		},
	},
	{
		Name: "instance-002",
		Dialer: tarantool.NetDialer{
			Address: "127.0.0.1:3302",
		},
	},
	{
		Name: "instance-003",
		Dialer: tarantool.NetDialer{
			Address: "127.0.0.1:3303",
		},
	},
}

p, err := pool.Connect(context.Background(), instances)
if err != nil {
	panic(err)
}

Создадим структуру для хранения блокировки:

type Lock struct {
	name   string
	token  uint64
	expire uint64
}

Создадим функция для взятия блокировки. Эта функция:

  1. Через пул подключений обращается к кластеру и вызывает хранимую процедуру acquireLock.

  2. Принимает в качестве параметров контекст для выполнения запроса, пул соединений с кластером, имя блокировки и тайм-аут блокировки.

  3. В случае успеха вернет объект блокировки, иначе вернет ошибку.

func acquireLock(ctx context.Context, p pool.Pooler, name string, timeout uint64) (*Lock, error) {
	resp, err := p.Do(tarantool.NewCallRequest("acquireLock").Context(ctx).Args([]any{name, timeout}), pool.RW).Get()
	if err != nil {
		return nil, err
	}

	if len(resp) == 0 {
		return nil, fmt.Errorf("no response")
	}

	if resp[0] == nil {
		return nil, fmt.Errorf("failed")
	}

	data := resp[0].([]any)

	result := Lock{
		name:   name,
		token:  toUint64(data[1]),
		expire: toUint64(data[2]),
	}

	return &result, nil
}

Создадим функцию для отпускания блокировки. Эта функция:

  1. Принимает контекст, пул соединений к кластеру, объект с блокировкой. 

  2. Через пул подключений обращается к кластеру и вызывает хранимую процедуру acquireLock.

  3. Вернет true, если блокировка успешно отпущена.

  4. Вернет false в случае тех или иных ошибок.

func releaseLock(ctx context.Context, p pool.Pooler, l *Lock) (bool, error) {

	resp, err := p.Do(tarantool.NewCallRequest("releaseLock").Context(ctx).Args([]any{l.name, l.token}), pool.RW).Get()
	if err != nil {
		return false, err
	}

	if len(resp) == 0 {
		return false, fmt.Errorf("no response")
	}

	return resp[0].(bool), nil
}
Скрытый текст
instances := []pool.Instance{
		{
			Name: "instance-001",
			Dialer: tarantool.NetDialer{
				Address: "127.0.0.1:3301",
			},
		},
		{
			Name: "instance-002",
			Dialer: tarantool.NetDialer{
				Address: "127.0.0.1:3302",
			},
		},
		{
			Name: "instance-003",
			Dialer: tarantool.NetDialer{
				Address: "127.0.0.1:3303",
			},
		},
	}
p, err := pool.Connect(context.Background(), instances)
if err != nil {
	panic(err)
}

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

var l *Lock

for i := 0; i < 3; i++ {
	l, err = acquireLock(ctx, p, name, 10)
	if err != nil {
		time.Sleep(100 * time.Millisecond)
		continue
	}
	break
}

if l == nil {
	fmt.Println(name, "already locked")
	return
}

defer func() {
	ok, _ := releaseLock(ctx, p, l)
	if ok {
		fmt.Println(name, "success unlock")
	} else {
		fmt.Println(name, "lock expired")
	}
}()

fmt.Println(name, "success lock")

Весь код main.go

package main

import (
	"context"
	"fmt"
	"reflect"
	"sync"
	"time"
	"github.com/tarantool/go-tarantool/v2"
	_ "github.com/tarantool/go-tarantool/v2/datetime"
	_ "github.com/tarantool/go-tarantool/v2/decimal"
	"github.com/tarantool/go-tarantool/v2/pool"
	_ "github.com/tarantool/go-tarantool/v2/uuid"
	"github.com/tjarratt/babble"
)

func toUint64(v any) uint64 {
	switch t := v.(type) {
	case int, int8, int16, int32, int64:
		return uint64(reflect.ValueOf(t).Int()) // a has type int64
	case uint, uint8, uint16, uint32, uint64:
		return reflect.ValueOf(t).Uint() // a has type uint64
	default:
		panic("type error")
	}
}

type Lock struct {
	name   string
	token  uint64
	expire uint64
}

func acquireLock(ctx context.Context, p pool.Pooler, name string, timeout uint64) (*Lock, error) {
	resp, err := p.Do(tarantool.NewCallRequest("acquireLock").Context(ctx).Args([]any{name, timeout}), pool.RW).Get()
	if err != nil {
		return nil, err
	}
	if len(resp) == 0 {
		return nil, fmt.Errorf("no response")
	}
	if resp[0] == nil {
		return nil, fmt.Errorf("failed")
	}

	data := resp[0].([]any)

	result := Lock{
		name:   name,
		token:  toUint64(data[1]),
		expire: toUint64(data[2]),
	}

	return &result, nil
}

func releaseLock(ctx context.Context, p pool.Pooler, l *Lock) (bool, error) {
	resp, err := p.Do(tarantool.NewCallRequest("releaseLock").Context(ctx).Args([]any{l.name, l.token}), pool.RW).Get()
	if err != nil {
		return false, err
	}

	if len(resp) == 0 {
		return false, fmt.Errorf("no response")
	}

	return resp[0].(bool), nil
}

func main() {
	instances := []pool.Instance{
		{
			Name: "instance-001",
			Dialer: tarantool.NetDialer{
				Address: "127.0.0.1:3301",
			},
		},
		{
			Name: "instance-002",
			Dialer: tarantool.NetDialer{
				Address: "127.0.0.1:3302",
			},
		},
		{
			Name: "instance-003",
			Dialer: tarantool.NetDialer{
				Address: "127.0.0.1:3303",
			},
		},
	}

	p, err := pool.Connect(context.Background(), instances)
	if err != nil {
		panic(err)
	}

	babbler := babble.NewBabbler()
	babbler.Count = 1
  
	wg := sync.WaitGroup{}
  
	for i := 0; i < 1000; i++ {
		name := babbler.Babble()

		wg.Add(1)
		go func(name string) {
			defer wg.Done()

			ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
			defer cancel()

			var l *Lock
			for i := 0; i < 3; i++ {
				l, err = acquireLock(ctx, p, name, 10)
				if err != nil {
					time.Sleep(100 * time.Millisecond)
					continue
				}
				break
			}
			if l == nil {
				fmt.Println(name, "already locked")
				return
			}
			defer func() {
				ok, _ := releaseLock(ctx, p, l)
				if ok {
					fmt.Println(name, "success unlock")
				} else {
					fmt.Println(name, "lock expired")
				}
			}()

			fmt.Println(name, "success lock")
			time.Sleep(50 * time.Millisecond)

		}(name)
	}
	wg.Wait()
}

Итоги

Примерно за 100 строк мы сделали на Tarantool 3 приложения для управления кластерными блокировками. Такое приложение может состоять из одного или нескольких узлов. Для регулирования количества узлов достаточно только редактирования yaml-файла. Если один из узлов Tarantool упадет, то сработает механизм автоматического выбора лидера, и приложение восстановит свою работоспособность.

Полезные ссылки

Комментарии (0)