Пройдя много собеседований, выяснилось, что довольно приличная часть собеседующих, спрашивавших или как-то затрагивавших тему транзакций и их работы, не знают как работают транзакции и что означает для разработчика термин изоляция. Вплоть до архитектора в одной очень большой российской компании, для которого выводы, использованные мною для формулирования решения при прохождении архитектурной секции оказались чем-то вроде бреда. Пока готовится вторая статья (Миллиард абитуриентов МИРЭА 2), можно отвлечься и разобрать тему, продемонстрировать разработчикам что означает для них I в ACID.

Короткий пример

Изоляция позволяет нам гарантировать, что в случае конкурентного доступа к ресурсу, мы не видим, что делают другие транзакции. Но что это означает на практике?

Возьмем простой пример. Есть два потока и таблица1 с записями, так же есть таблица2, в которой могут быть записи с ссылками на первую. Поток1 добавляет в таблицу2 запись с ссылкой на таблицу1. Поток2 удаляет ту же самую запись из таблицы1. Оба закрывают свои транзакции единомоментно, вплоть до такта процессора. Вопрос, какой будет результат?

Подсказка

Целостность базы данных при любых раскладах должна быть сохранена, это основное требование к любой реляционной СУБД.

Еще пример

На самом деле изоляция позволяет нам реализовать механизм межсерверных блокировок. Это может быть очень полезно, когда нужно гарантировать состояние, обеспечить, что какая-то операция будет точно выполнена хотя бы один раз. Предлагаю рассмотреть на примере отправки уведомлений пользователю.

Необходимо отправлять уведомления массово нескольким пользователям. Можно конечно использовать кафку, но кафка не гарантирует, что сообщение будет точно обработано. Тем более что во время отправки сообщений в кафку может часть уйти, а часть не отправится - в итоге вы считаете, что сообщение ушло, а на деле - нет. Получите-распишитесь в испорченном состоянии, которое невозможно исправить. Кафка по сути своей вообще ничего не гарантирует и является ненадежным источником/приемником данных. Не тешьте себя иллюзиями. У разработчика микросервисов все кроме его стейта всегда является ненадежным, а стейт в БД. Почему бы не сохранить записи об уведомлении в СУБД, а дальше уже по одному отправлять и контроллировать процесс отправки? Так не только будет видно, сколько нужно еще отправить уведомлений, но и в случае необходимости перезапустить этот процесс.

Так как это сделать?

Изоляция дает нам возможность осуществлять конкурентный доступ к ресурсу, гарантируя что состояние БД будет валидным с точки зрения описанных вами правил.

Одним из простейших способов реализации блокировки является дополнительная колонка статуса записи. Например целое. Что бы заблокировать запись, нужно сделать следующий запрос:

UPDATE update_lock.lock_table t SET state = 1 WHERE t.id = ? AND t.state = 0

Обратите внимание, что нужно обязательно проверять, что t.state = 0, иначе работать не будет. Так же нужно обязательно закрыть транзакцию, что бы изменения были применены СУБД и остальные потоки увидели эти изменения.

Хорошо, вы можете не поверить, и сказать, что автор ничего не понимает в СУБД, но вот вам пример, который 100% надежен. Он не может никаким образом сломаться и работает быстрее, чем приведенный выше вариант.

Можно блокировать записи при помощи инструкции INSERT. Добавляется таблица, ссылки на оригинальную таблицу не обязательны, но вот существование главного ключа для уникальности - обязательно, все что остается - это сделать вставку:

INSERT INTO insert_lock.lock_table(id) VALUES(?)

Обязательно закрываем транзакцию. Если другой поток сделает вставку, то мы не сможем закрыть транзакцию и получим исключение - тогда откатываем изменения и пытаемся блокировать другую задачу.

А протестировать?

Для того, что бы убедится, что это действительно работает, напишем простейшую программу работающую напрямую с jdbc.

Для упрощения написания различных тестов сразу сделаем два абстрактаных класса, они помогут нам при написании тестов.

AbstractDBTest.java
package com.lastrix.dblock;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.CountDownLatch;

public abstract class AbstractDBTest {
    public static final int THREAD_COUNT = 8;

    private final int threadCount;

    protected AbstractDBTest() {
        this(THREAD_COUNT);
    }

    protected AbstractDBTest(int threadCount) {
        this.threadCount = threadCount;
    }

    /**
     * Execute test plan according to your configuration
     */
    public void run(){
        try (var connection = createConnection()) {
            setup(connection);
        } catch (SQLException e) {
            throw new RuntimeException(e);
        }

        var start = Instant.now();
        doTest();
        System.out.println("Time taken = " + Duration.between(start, Instant.now()).toMillis() + " ms");
    }

    /**
     * This test creates N threads each of them will wait until each ready to execute plan.
     * The method will wait until all threads finished
     */
    private void doTest() {
        CountDownLatch latch = new CountDownLatch(threadCount);
        var complete = new CountDownLatch(threadCount);
        var runnable = new Runnable() {
            @Override
            public void run() {
                try (var connection = createConnection()) {
                    latch.countDown();
                    latch.await();
                    runJob(connection, getThreadId());
                } catch (Throwable e) {
                    throw new RuntimeException(e);
                } finally {
                    complete.countDown();
                }
            }
        };

        for (int i = 0; i < threadCount; i++) {
            new Thread(runnable, "test-thread-" + i).start();
        }

        try {
            complete.await();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    private static int getThreadId() {
        var n = Thread.currentThread().getName();
        var idx = n.lastIndexOf('-');
        return Integer.parseInt(n.substring(idx + 1));
    }

    private static Connection createConnection() throws SQLException {
        Connection connection = DriverManager.getConnection("jdbc:postgresql://localhost:5432/test", "test", "test2002");
        connection.setAutoCommit(false);
        return connection;
    }

    /**
     * Create schema, tables and rows needed for tests
     * @param connection
     * @throws SQLException
     */
    protected abstract void setup(Connection connection) throws SQLException;

    /**
     * This method should perform actual testing
     * @param connection
     * @param tid
     * @throws Exception
     */
    protected abstract void runJob(Connection connection, int tid) throws Exception;
}

AbstractJobRunner.java
package com.lastrix.dblock;

import java.sql.Connection;
import java.sql.SQLException;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * Abstract class for handling locking/unlocking testing
 */
public abstract class AbstractJobRunner implements Runnable {
    protected final Connection connection;
    protected final int tid;
    private final int totalLockCount;

    protected AbstractJobRunner(Connection connection, int tid, int totalLockCount) {
        this.connection = connection;
        this.tid = tid;
        this.totalLockCount = totalLockCount;
    }

    @Override
    public final void run() {
        int locked = 0;
        while (locked < totalLockCount) {
            if (tryLock()) {
                locked++;
                System.out.println("Lock acquired by thread " + Thread.currentThread().getName());
                int lockedCount = getLockCounter().incrementAndGet();
                if (lockedCount > 1) {
                    System.out.println("Locked by multiple threads! " + lockedCount);
                }
                if (unlock()) {
                    System.out.println("Released lock by " + Thread.currentThread().getName());
                } else {
                    System.out.println("Failed to unlock by " + Thread.currentThread().getName());
                }
            }
        }
    }

    protected final void rollbackSafely() {
        try {
            connection.rollback();
        } catch (SQLException ignored) {
        }
    }

    /**
     * Try to lock row in database, return true if success
     *
     * @return boolean
     */
    protected abstract boolean tryLock();

    /**
     * Try to unlock row in database, return true if success, this method should be successful all the time
     *
     * @return boolean
     */
    protected abstract boolean unlock();

    /**
     * Get current lock counter for checking that only single thread acquired our lock
     *
     * @return AtomicInteger
     */
    protected abstract AtomicInteger getLockCounter();
}

Первым протестируем блокировки на основе запроса update. Что бы заблокировать запись, потребуется 2 запроса (да знаю, что есть select for update, не будьте душнилами).

            try (var selStmt = connection.prepareStatement("SELECT id FROM db_locks.update_lock t WHERE t.state = 0 ORDER BY id LIMIT 1;");
                 var updStmt = connection.prepareStatement("UPDATE db_locks.update_lock t SET state = 1 WHERE t.id = ? AND t.state = 0")
            ) {
                try (var rs = selStmt.executeQuery()) {
                    if (rs.next()) {
                        id = rs.getInt(1);
                    } else {
                        rollbackSafely();
                        return false;
                    }
                }
                updStmt.setInt(1, id);
                boolean success = updStmt.executeUpdate() > 0;
                connection.commit();
                return success;
            } catch (SQLException e) {
                throw new RuntimeException(e);
            }

Если убрать в запросе update проверку, что state = 0, то в консоли можно будет увидеть предупреждение, что больше, чем один поток захватил лок. Такое происходит, потому что между двумя запросами была завершена транзакция другого потока, захватившего лок и мы видим эти изменения, при выполнении update.

Что бы разблокировать, нужно выполнить запрос, аналогичный блокировке:

            try (var stmt = connection.prepareStatement("UPDATE db_locks.update_lock t SET state = 0 WHERE t.id = ? AND t.state = 1")) {
                stmt.setInt(1, id);
                if (stmt.executeUpdate() == 0) {
                    rollbackSafely();
                    return false;
                }
                // we need to do this before commit, otherwise race-condition may occur
                getLockCounter().decrementAndGet();
                connection.commit();
                return true;
            } catch (SQLException e) {
                throw new RuntimeException(e);
            }

Несмотря на то, что в нашем примере возможны только два состояния лока, использовать такую блокировку можно для много-этапных обработок, поэтому желательно обновлять запись, задавая исходное значение, которое мы ожидаем увидеть в момент изменения. Так можно избежать всяких странностей, да и отлаживать будет проще, так как сразу увидите проблему в логах.

Вариант с вставкой записи в спец-таблицу является более быстрым, чем обновление (можете объяснить в комментариях почему так), однако такой подход более ограничен. Что бы получить блокировку - осуществляем вставку записи в таблицу:

            try (var stmt = connection.prepareStatement("INSERT INTO db_locks.insert_lock(id) VALUES(?)")) {
                stmt.setInt(1, id);
                boolean success = stmt.executeUpdate() > 0;
                connection.commit();
                return success;
            } catch (SQLException e) {
                rollbackSafely();
                if (isValidFailure(e)) {
                    return false;
                }
                throw new RuntimeException(e);
            }

Что бы этот способ заработал как надо, нужно контроллировать исключения. В данном случае может произойти 2 исключения, а именно: ошибка нарушения целостности БД из-за дубликата главного ключа, а так же ошибка завершения транзакции из-за нарушения целостости БД из-за дубликата главного ключа. Ошибки эти разные, и проверять их надо отдельно:

            if (e.getMessage().contains("duplicate key value violates unique constraint")) {
                return true;
            }
            return e.getMessage().contains("current transaction is aborted, commands ignored until end of transaction block");

При появлении таких ошибок нужно обязательно сделать rollback, иначе в дальнейшем подключение будет некорректным - транзакция не закрыта, а значит все наши попытки добавить запись сразу будут завершаться первым исключением.

Что бы освободить такой лок нужно удалить запись из таблицы.

Хотя в статье не приводятся результаты производительности для таких блокировок, все же второй вариант работает намного быстрее (примерно в два раза), попробуйте самостоятельно выяснить причину.

Недостатки такой блокировки

Она работает довольно медленно, как и любой запрос к СУБД. 10-200 мкс на этом спокойно теряются, если осуществляется конкурентный доступ множеством потоков. В случае, если ваши фоновые задачи очень долгие (от 0,5 мс) - например запись в кафку или обращение к другому сервису, то данный подход уже является более предпочтительным, чем например костыль в виде межсервисных блокировок redis, особенно если вы его только для этого и используете.

Вторым недостатком является необходимость сбрасывать состояние блокировки по прохождении определенного таймаута (хотя в redis же то же самое должно быть). Все что требуется - это раз в определенный период выполнять запрос, который все "зависшие" записи сбросит в начальное состояние, что заставит задачу выполняться заново.

Заключение

В данной работе рассмотрен способ применения изолированности транзакций для реализации межсервисных блокировок с гарантированной целостностью состояния. Такой подход позволяет гарантировать, что данные не будут испорчены, из-за использования каких-то сторонних систем. Так же это позволяет сэкономить на оборудовании, так как не требуется ставить дополнительный софт, ради ненужного костыля.

Основным преимуществом является возможность написания таких алгоритмов, которые при обработке пользователя все эффекты сохраняют в БД, а уже затем они в фоне обрабатываются и применяются к системе в целом. Это гарантирует, что запрос пользователя будет обработан полностью, а если что-то серьезное произойдет и нужно будет освободить ресурсы, то у вас уже есть план для выполнения, так как все хранится в БД, что может оказаться полезным при обработке много-этапных и/или многокомпонентных эффектов запросов пользователя.

Исходный код можно найти в репозитории.

Для работодателей

https://hh.ru/resume/b33504daff020c31070039ed1f77794a774336

Спасите котика от изоляции!

Комментарии (10)


  1. gmixo
    25.08.2022 15:24
    +2

    по-моему такие проблемы решаются с помощью настройки уровня изолированности транзакций, а не лишними столбцами в таблицах


    1. lastrix Автор
      25.08.2022 17:00
      -3

      Не совсем, уровни изоляции про другое. Смотри. У тебя есть задача по резервированию товаров заказа, причем работать приходится с разными магазинами, т.к. ты интегратор. Ты можешь сделать так, что бы каждый товар резервировался по отдельности.
      Что получаешь в этом случае? Допустим, что резервирование чего-то не удалось. Тогда ты сможешь спокойно отследить что именно нужно отменить, на каком этапе процесс резервирования заказа остановился. Или наоборот, предложить пользователю изменить заказ, например отправив письмо или как-то еще уведомив, что данный товар не удалось для него найти.
      Даже если в процессе резервирования что-то упадет, то ты о том факте, что уже пытался - будешь знать. Ведь можно в процессе сброса задачи установить флаг "пытался, но что-то не получилось".
      Тут не просто блокировки, а полный контроль процесса.


  1. boopiz
    25.08.2022 16:50
    +1

    О чем вообще речь? Что это за субд? "Есть два потока и таблица1 с сущностями, так же есть таблица2, в которой могут быть сущности с ссылками на первую."

    Батенька. Вы с термтнологией для начала сами разберитесь. Прежде чем выдавать оценочное суждение о других. Что такое в вашем пониманит "таблица" и что такое "сущность".. если в термтнах стандартной ER модели (судя по использованию acid принципа для реляционных субд) то сущность это и есть то, что вы называете "таблицей"... хотя правильнее называть: "отношение". под вашей же "сущностью" (что бы это ни значило) подразумевается, скорее всего кортеж.

    редис и прочие ему подобные программы не являются субд в классическом понимании этого термина. и по этому к ним пишутся подобные костыли.

    Что же будет дальше?


    1. lastrix Автор
      25.08.2022 16:56

      поправил, так лучше?


  1. buratino
    25.08.2022 16:58
    +2

    чисто формально, в заголовке есть утверждение "Что означает I в ACID и как это можно использовать", в тексте статьи это не раскрывается, как и фактически не употребляется заявленный в заголовке термин (ACID). Даже если это вам кажется очевидным, что заявляется в заголовке обязано раскрываться в тексте статьи, хотя бы ссылкой на статью в Википедии.


  1. onets
    25.08.2022 18:39

    Мне кажется ACID - это более фундаментальные вещи. В статье описана логическая блокировка записей. Например порция данных, которую мы взяли в обработку.

    Но даже если продолжать в том же духе - как всегда "все зависит от".

    В Oracle и MS SQL я мог, используя Serializable или Repeatable read, сделать обновление логического флага/статуса. А вот в MySQL такой финт не прошел, база начинала выдавать разные записи, и вроде даже дубли. Я дальше не стал раскапывать, так как в том проекте уже была еще одна внешняя блокировка, кстати в виде таблицы в том же MySQL. А вот когда я эту таблицу заменил на RedLock на основе Redis - проекту в целом похорошело.


    1. lastrix Автор
      26.08.2022 09:24

      Насколько мне известно, в MySQL есть 2 движка - myisam, innodb, какой именно вы использовали? Первый даже не поддерживает внешние ключи и по сути своей не подходит для серьезной работы. Иннодб получше и скорее всего там такой проблемы быть не должно.


  1. DonTsipa
    27.08.2022 18:40

    Использовать запись в реляционную базу только для установки лока ?

    Я не спец по оптимизации, но тот же Redis будет работать куда быстрее и лучше (установка лока и его максимального времени существования)

    И если вам нужно отслеживать процесс, можете выкинуть тот же exception и уже после этого отправлять запрос в основную БД о.о


    1. lastrix Автор
      27.08.2022 18:47

      Речь в статье не просто про лок, а про контроль целостности.

      В статье уже написано, что для задач, которые выполняются быстрее, чем 0.5 мс такой лок не имеет особо смысла, если только не является важным контроль факта выполнения задачи.

      Использовать редис или любой другой инструмент только потому, что хочется - не является инженерныv подходом, а любительством. Вам действительно нужен инструмент, который при падении теряет все ваши данные? Редис же в памяти работает. А ваша БД - она даже если упадет, целостность гарантирует.


  1. lastrix Автор
    27.08.2022 18:45

    deleted