В прошлой работе автор представил описание архитектуры платформы приемной коммисии для обработки миллиарда абитуриентов за один день. В этой работе автор предлагает разработать сервисы, используя Spring + JDBCTemplate и Kafka микросервисы для предложенной архитектуры.

Проверка домашнего задания

В предыдущей часте читателю было предложено решить проблему шины данных, каким образом передавать данные между модулями записи и чтения. Существует простое решение, если учесть, что максимальный объем данных, передаваемых модулем записи, является константой, а так же тот факт, что при росте числа модулей в бесконечность количество обращений в один топик не будет больше чем константа определяемая числом записей принимаемых модулем записи помноженной на соотношение модулей записи к модулям чтения, то при правильной организации партиций и распределения топиков между ними можно добиться линейного масштабирования при сохранении доступности 24/7.

Рисунок 1. Пример партиций и распределения топиков между ними
Рисунок 1. Пример партиций и распределения топиков между ними

На рисунке 1 изображен вариант организации партиций и топиков для получения желаемой производительности. Очевидно, что при наличии N модулей, которые могут осуществлять отправку параллельно в L топиков, то количество подключений, которое необходимо обрабатывать шине данных будет равно произведению N на L. Чтобы упростить шину данных, запись осуществлять будем в одну партицию, так же это позволит максимизировать скорость отправки сообщений. Контроль целостности и гарантия доставки будут описаны позже. Итого, имея N*122 мб/с - объем общего потока данных, а так же необходимость обрабатывать N*L подключений в секунду, можно высчитывать нагрузку на шину данных формульно, так как подобные характеристики обычно являются известными величинами.

Прочитав данную информацию у читателя возможно возникнет вопрос, тогда какое количество пользователей в день сможет обрабатывать такая архитектура, если масштабируется она линейно от количества пользователей, а на каждый узел такой системы максимальная нагрузка равна константе (чем больше число пользователей, тем меньше будет влияние случайной величины, и больше проявление усреднения). После длительных вычислений и бессонных ночей в поисках этого числа, автор пришел к выводу, что единственный правильный ответ будет перевод одного известного мема на русский язык:

Рисунок 2. Ответ на самый главный вопрос
Рисунок 2. Ответ на самый главный вопрос

Если же ответить более развернуто, то количество пользователей и их запросов будет ограничено только техническими возможностями человечества осуществлять обработку соотвествующего числа запросов, а так же поддержания жизнедеятельности такого числа пользователей. Если допустить, что человечество способно производить бесконечное число серверов, сетей и передавать произвольное число данных, а так же содержать бесконечное число пользователей, то рано или поздно будет достигнут физический предел (согласно законам физики, а точнее радиусу Шварцшильда), после которого пространство-время схлопывается в сингулярность и информация теряется бесследно.

Однако одну оценку автор дать может. Данной архитектуры должно хватить человечеству на ближайшие пару млн лет +- сотню тысячелетий. Разумеется, если для вас такой горизонт планирования слишком короткий и неприемлимый - читатель всегда может обратиться к настоящим архитекторам. Автор уверен, что они смогут предложить более долгосрочное решение.

Чтобы не захламлять статью большими таблицами, список всех архитекторов, выполнивших домашнее задание, можете прочитать в спойлере ниже.

Список архитекторов, справившихся с домашним заданием

Все остальные получают банан!

Модель данных модуля записи

В статье была описана модель данных для одиночной БД, с которой будут работать все микросервисы. Однако такое решение невозможно масштабировать и не представляется возможным хранить нужные объемы данных. Сама БД потребует существенных мощностей для корректного функционирования.

Для распределенного решения, описываемого в данном цикле статей, требуется изменить модель данных. Начнем с модуля записи.

Для начала потребуется две таблицы, для хранения текущих приемных кампаний, а так же специальностей, на которые абитуриенты могут поступать:

CREATE TABLE scp_write_service.session
(
    session_id INT     NOT NULL,
    active     BOOLEAN NOT NULL DEFAULT TRUE,
    CONSTRAINT session_pk PRIMARY KEY (session_id)
);
CREATE TABLE scp_write_service.spec
(
    spec_id UUID    NOT NULL,
    active  BOOLEAN NOT NULL DEFAULT TRUE,
    CONSTRAINT spec_pk PRIMARY KEY (spec_id)
);

В первой таблице хранятся идентификаторы приемных кампаний, во второй идентификаторы специальностей. Обновляться эти таблицы должны по событию. Хотя для приемной кампании флаг активности не очень нужен, то для специальности - обязателен, так как иначе не будет возможности проверить на уровне БД, что пользователь может поступать на выбранную специальность, например посредством триггера. Этот флаг позволит хранить историю приемных компаний, не нарушая целостность БД.

CREATE TABLE scp_write_service.enrollee
(
    user_id          UUID      NOT NULL,
    session_id       INT       NOT NULL,
    -- if this user should no longer be able to perform any operation
    disabled         BOOLEAN   NOT NULL DEFAULT FALSE,
    -- selected speciality id
    selected_spec_id UUID               DEFAULT NULL,
    -- how much application may be sent by this enrollee
    selection_count  SMALLINT  NOT NULL DEFAULT 20,
    created_stamp    TIMESTAMP NOT NULL DEFAULT current_timestamp,
    modified_stamp   TIMESTAMP NOT NULL DEFAULT current_timestamp,
    CONSTRAINT enrollee_pk PRIMARY KEY (user_id, session_id),
    CONSTRAINT enrollee_session_id_fk FOREIGN KEY (session_id) REFERENCES scp_write_service.session (session_id),
    CONSTRAINT enrollee_selected_spec_id_fk FOREIGN KEY (selected_spec_id) REFERENCES scp_write_service.spec (spec_id),
    CONSTRAINT enrollee_selection_count_positive_chk CHECK ( selection_count >= 0 )
);

Таблица абитуриента хранит базовую информацию о нем, а так же является авторизующим доступ токеном (своего рода). Такая запись повляется после получения модулем записи события о том, что абитуриент принимает участие в приемной кампании и может подавать документы. Так как модули записи делят между собой пользователей, то данная таблица не будет расти бесконечно.

При наличии записи в таблице enrollee, абитуриент может поступать на специальности, для этого заведем таблицу с дополнительными полями, которые и позволят нам гарантировать целостность. Можно было бы ударится в схемотоз и вычислять контрольные суммы или еще какой-то безумный способ жечь электроэнергию в стиле блокчейнов, однако автор выбрал самый простой вариант.

CREATE TABLE scp_write_service.enrollee_select
(
    user_id         UUID      NOT NULL,
    session_id      INT       NOT NULL,
    spec_id         UUID      NOT NULL,
    -- current selection status, 0 for applied, 1 for confirmed and 2 for cancelled
    status          SMALLINT  NOT NULL DEFAULT 0,
    -- the assigned user score
    score           INT       NOT NULL DEFAULT 0,
    -- stamp for creation(application) date
    created_stamp   TIMESTAMP NOT NULL DEFAULT current_timestamp,
    -- stamp for confirmation date
    confirmed_stamp TIMESTAMP          DEFAULT NULL,
    -- stamp for cancelled date
    canceled_stamp  TIMESTAMP          DEFAULT NULL,

    ---------------------------------------
    --     integrity check fields        --
    ---------------------------------------
    -- each time modification occurs, this stamp should be updated
    modified_stamp  TIMESTAMP NOT NULL DEFAULT current_timestamp,
    -- should be treated like 'version', this way we will make
    -- possible to reduce amount of response messages sent by target module
    -- otherwise we'll deal with lots of duplicates
    ordinal         SMALLINT           DEFAULT 0,
    -- current integrity check state, 0 - not sent, 1 - sent but not confirmed, 2 - sent and confirmed
    state           SMALLINT  NOT NULL DEFAULT 0,

    CONSTRAINT enrollee_select_pk PRIMARY KEY (user_id, session_id, spec_id),
    CONSTRAINT enrollee_select_user_fk FOREIGN KEY (user_id, session_id) REFERENCES scp_write_service.enrollee (user_id, session_id),
    CONSTRAINT enrollee_select_spec_fk FOREIGN KEY (spec_id) REFERENCES scp_write_service.spec (spec_id)
);

После отправки сообщения об изменении (а пишется в шину данных и передается в модуль чтения именно записи из таблицы enrollee_select), поле state изменяется в значение 1, что означает, что запись отправлена, но подтверждение еще не получено. После прихода подтверждения, у которого поле ordinal равно такому же полю записи в таблице, то значение state изменяется на 2, что означает, что запись была отправлена и получено подтверждение о приеме модулем чтения последних изменений. При таком подходе можно всегда узнать текущее состояние целостности системы и статус.

Если в течении определенного промежутка времени не пришло подтверждение, тогда поле state сбрасывается в значение 0, чтобы отправить его еще раз. Таким образом не требуется писать сообщение сразу во все партиции, достаточно одной. Чтобы сбросить статус отправки, достаточно выполнить следующий запрос:

UPDATE scp_write_service.enrollee_select
SET 
  state = 0, 
  ordinal = ordinal + 1, -- we must change ordinal, otherwise read
                         -- module won't send confirmation again
  modified_stamp = CURRENT_TIMESTAMP
WHERE state = 1 
  AND modified_stamp < timezone('utc', now())::date - interval '1 hours'

Для его выполнения можно нанять специально обученного человека, который раз в 15 минут будет выполнять этот запрос в БД. Главное, чтобы его звали Кронос или у него была крон-содержащая фамилия, иначе не заработает.

Для корректной работы БД заведем несколько триггеров, которые будут проверять данные, прежде чем они попадут в таблицу.

CREATE FUNCTION scp_write_service.select_insert()
    RETURNS trigger AS
$select_insert$
BEGIN
    -- do not allow invalid parameters to be passed
    -- confirmation or cancelling must be performed by UPDATE statement
    -- rather than insert
    IF NEW.status <> 0 THEN
        RAISE EXCEPTION 'Invalid status value : %', NEW.status
            USING HINT = 'status = 0';
    END IF;
    UPDATE scp_write_service.enrollee e
    SET selection_count = selection_count - 1,
        modified_stamp  = current_timestamp
    WHERE e.user_id = NEW.user_id
      AND e.session_id = NEW.session_id;
    -- we do not modify data here
    RETURN NEW;
END;
$select_insert$ LANGUAGE plpgsql;

CREATE TRIGGER select_insert_trg
    BEFORE INSERT
    ON scp_write_service.enrollee_select
    FOR EACH ROW
EXECUTE FUNCTION scp_write_service.select_insert();

Задача триггера select_insert заключается в том, чтобы проверить, что абитуриент может выбрать специальность (в коде этого нет, потому что это домашняя работа читателю, присылайте пулл реквест на гитхаб). Второй задачей является изменение количества доступных подач документов на специальности. Разумеется это можно сделать через ограничение с подсчетом уже имеющегося числа поданных заявлений, но тогда никто не напишет в комментарии свое мнение по этому поводу, да и способ этот потребует большей нагрузки на СУБД.

Теперь имея такой триггер, можно регистрировать пользователя в системе используя только один запрос, по результатам которого вся необходимая работа будет выполнена:

INSERT INTO scp_write_service.enrollee_select(user_id, session_id, spec_id) 
VALUES (?, ?, ?) 
ON CONFLICT DO NOTHING

Выполнение этого запроса вернет 1 или 0, что и требуется, так как пользователю желательно отправлять ответ в формате: зарегистрирован, уже зарегистирован или отклонено (по причине отсутствия авторизации например).

Так же для корректной обработки подтверждения подачи документов и отказа от поступления нужен второй триггер, который обрабатывает обновление записей:

CREATE FUNCTION scp_write_service.select_update()
    RETURNS trigger AS
$select_update$
DECLARE
    previous UUID;
BEGIN
    IF OLD.status > NEW.status THEN
        RAISE EXCEPTION 'Invalid status: %', NEW.status
            USING HINT = 'status should be greater than previous value: ' || OLD.status;
    ELSEIF OLD.status <> NEW.status THEN
        IF NEW.status = 1 THEN
            NEW.confirmed_stamp = current_timestamp;

            SELECT selected_spec_id
            INTO previous
            FROM scp_write_service.enrollee e
            WHERE e.user_id = NEW.user_id
              AND e.session_id = NEW.session_id;

            IF previous IS NOT NULL THEN
                UPDATE scp_write_service.enrollee_select e
                SET canceled_stamp = current_timestamp,
                    modified_stamp = current_timestamp,
                    status         = 2,
                    ordinal        = ordinal + 1,
                    state          = 0
                WHERE e.user_id = NEW.user_id
                  AND e.session_id = NEW.session_id
                  AND e.spec_id = previous;
            END IF;
            UPDATE scp_write_service.enrollee e
            SET selected_spec_id = NEW.spec_id,
                modified_stamp   = current_timestamp
            WHERE e.user_id = NEW.user_id
              AND e.session_id = NEW.session_id;
        ELSEIF NEW.status = 2 THEN
            NEW.canceled_stamp = current_timestamp;
            UPDATE scp_write_service.enrollee e
            SET selected_spec_id = NULL,
                modified_stamp   = current_timestamp
            WHERE e.user_id = NEW.user_id
              AND e.session_id = NEW.session_id
              AND e.selected_spec_id = NEW.spec_id;
        ELSE
            -- prevent any status value different from 1 or 2
            RAISE EXCEPTION 'Invalid status: %', NEW.status
                USING HINT = 'status must be 1 or 2';
        END IF;
    END IF;
    IF OLD.status <> NEW.status OR OLD.score <> NEW.score THEN
        NEW.ordinal = NEW.ordinal + 1;
        NEW.state = 0;
        NEW.modified_stamp = current_timestamp;
    END IF;
    -- otherwise changes performed by DB directly
    -- and we should not affect those changes
    -- (timeout for confirmation reached and state dropped to 0)
    RETURN NEW;
END;
$select_update$ LANGUAGE plpgsql;

CREATE TRIGGER select_update_trg
    BEFORE UPDATE
    ON scp_write_service.enrollee_select
    FOR EACH ROW
EXECUTE FUNCTION scp_write_service.select_update();

Задачами данного триггера являются проверки, что статус записи только увеличивается, а так же если абитуриент отправит согласие на специальность, уже имея подтверждение на поступление на другую специальность, то последняя будет отменена автоматически. Разумеется можно сделать блокирование подтверждения, если уже имеется одно, но для отладки проще именно такой вариант, так как можно в таблице менять одно поле, а дальше триггер сам внесет все необходимые правки. Да и запрос на подтверждение или отмену выглядит совсем простым:

UPDATE scp_write_service.enrollee_select 
SET status = 1 -- 1 is confirmed, 2 is cancelled
WHERE user_id = ? AND spec_id = ? AND session_id = ?

Теперь, чтобы получить список записей, которые нужно отправить в шину данных, можно получить следующим запросом:

SELECT  user_id,
        session_id,
        spec_id,
        status,
        score,
        created_stamp,
        confirmed_stamp,
        canceled_stamp,
        ordinal
FROM scp_write_service.enrollee_select es
WHERE es.state = 0
ORDER BY es.modified_stamp
LIMIT 128
OFFSET ?

Так как при изменении дата модификации только увеличивается, то первыми всегда будут отправляться самые старые записи. После отправки в шину данных, нужно установить состояние (state) в значение 1, а после получения подтверждения уже в 2.

Модель данных модуля чтения

Для модуля чтения модель данных является куда как более простой, заточенной на множественное чтение, а не запись с необходимостью проверять все и вся. Так как данные приходят из доверенных источников, а правок со стороны любых пользователей не предполагается, то для модуля чтения нужна всего одна таблица, она очень похожа на ту же, что и в модуле записи, с небольшим различием в части обеспечения целостности:

CREATE TABLE scp_read_service.enrollee_select
(
    spec_id         UUID      NOT NULL,
    session_id      INT       NOT NULL,
    user_id         UUID      NOT NULL,
    -- current selection status, 0 for applied, 1 for confirmed and 2 for cancelled
    status          SMALLINT  NOT NULL DEFAULT 0,
    -- the assigned user score
    score           INT       NOT NULL DEFAULT 0,
    -- stamp for creation(application) date
    created_stamp   TIMESTAMP NOT NULL DEFAULT current_timestamp,
    -- stamp for confirmation date
    confirmed_stamp TIMESTAMP          DEFAULT NULL,
    -- stamp for cancelled date
    canceled_stamp  TIMESTAMP          DEFAULT NULL,

    ---------------------------------------
    --     integrity check fields        --
    ---------------------------------------
    -- last modification stamp received from write module
    modified_stamp  TIMESTAMP NOT NULL DEFAULT current_timestamp,
    -- should be treated like 'version', this way we will make
    -- possible to reduce amount of response messages sent by target module
    -- otherwise we'll deal with lots of duplicates
    -- we should not write changes to database unless this column
    -- has lower value than update
    ordinal         SMALLINT           DEFAULT 0,
    -- confirmation sent state, false - not sent, true - sent
    state           BOOLEAN   NOT NULL DEFAULT FALSE,
    -- we shift primary key column order to ensure that we may use primary key index for
    -- queries
    CONSTRAINT enrollee_select_pk PRIMARY KEY (spec_id, session_id, user_id)
);

Обратите внимание, что порядок следования колонок в главном ключе изменился, так как теперь важна специальность, а не пользователь, а главный ключ является индексом - следовательно можно использовать его префикс для наших запросов.

Поле состояния отправки теперь у нас имеет тип булев, потому что подтверждение отправляется только один раз для одного набора (spec_id,session_id,user_id,ordinal) и в дальнейшем уже не должно отправляться. При таком подходе запрос на добавление/обновление при получении из шины данных сообщения будет выглядеть следующим образом:

INSERT INTO scp_read_service.enrollee_select
    AS es(
        status,
        score,
        created_stamp,
        confirmed_stamp,
        canceled_stamp,
        user_id,
        session_id,
        spec_id,
        ordinal
        )
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) -- 9 params
ON CONFLICT (spec_id, session_id, user_id)
DO UPDATE SET state = false,
              modified_stamp = CURRENT_TIMESTAMP,
              status = ?,
              score = ?,
              created_stamp = ?,
              confirmed_stamp = ?,
              canceled_stamp = ?,
              ordinal = ?  -- 6 params
WHERE   es.user_id = ?
    AND es.session_id = ?
    AND es.spec_id = ?
    AND es.ordinal < ? -- 4 params

Так как делать проверку, что запись в БД уже есть - затратно и крайне непродуктивно, то выгоднее использовать конструкцию UPSERT, которая сама добавит или обновит поле за один запрос - и нет никаких исключений из СУБД. Это работает намного быстрее, чем проверка, что запись существует в таблице, с последущим разделением на вставку или обновление. Следует обратить внимание, что алиас на таблицу необходим, так как иначе в WHERE использование полей дает ошибку на проблему с определением поля, автор другим способом это победить не смог.

Наконец-то Java-код!

Как уже выше было сказано, разработка будет вестись с использованием Spring Boot. Никаких реактивных стразов или асинхронных труб в коде не будет, максимально простое и легкое для понимание решение. Использовать Hibernate для работы с БД нет смысла, так как модель данных очень простая, однако ударяться в jdbc и ручками контроллировать все вполоть до подключения к СУБД - тоже идея плохая. Именно поэтому JDBCTemplate+spring для данного проекта является наиболее оптимальным вариантом.

Организация проекта будет разделена на несколько проектов:

  1. scp-lib - общий код, используемый всеми сервисами для упрощения взаимодействия;

  2. scp-write-service - сервис приема команд в систему пользователями (абитуриенты и представители вуз вносят правки в этот сервис);

  3. scp-write-sender-service - сервис отправки изменений в шину данных;

  4. scp-read-service - сервис чтения списка поступающих на специальность с необходимыми запросами, фильтрами и т.д.;

  5. scp-read-receiver-service - сервис приема из шины данных сообщений об изменении записи абитуриента для последующего учета в таблице поступающих на специальность, а так же отправку подтверждения об успешном сохранении данных в БД.

Чтобы собирать проекты нужно выполнить mvn install в проекте scp-lib,а затем в проекте scp-write-sender-service.

Проект scp-lib

Проект разделен на 2 основные части - работа с rest и с бд. Последний нужен только ради конфигурации, которая будет создавать для сервиса схему в БД, чтобы все данные (включая liquibase), лежали в одном месте и их можно было снести одним скриптом, не трогая остальные сервисы. Основа взята со stackoverflow.

Работа с рестом делится на несколько основных частей:

  1. Обертки для передаваемых данных и обобщения потока;

  2. Обработка ошибок;

  3. Простейшая авторизация на основе проверки роли в jwt;

  4. Стартовые конфигурации, такие как сериализация времени в json и т.д.

Следует рассмотреть отдельно контроль ошибок. В Spring уже существует механизм конвертации ошибок в рест-объекты, однако указать как ControllerAdvice можно только один бин, наследовать или как-то расширять обработку довольно проблемно. Так же большой проблемой является тот факт, что почти невозможно сформировать библиотеку ошибок, чтобы стандартизовать их и все микросервисы (хотя бы платформы) работали в одном пространстве ошибок - так фронту не нужно будет гадать или что-то спрашивать, потому что общая таблица ошибок позволит ему лучше объяснять пользователю, что происходит. Конечно можно на любую ошибку выкидывать Unknown Error, а в браузере показывать Oops!, вот только вы правда желаете быть тем самым разработчиком на М или Ж о котором часто все говорят, слава о вас идет впереди вас и каждый жаждет поговорить с вами?

Общая библиотека ошибок хороша и для тех поддержки, потому что большую часть проблем они смогут решить самостоятельно без привлечения кого-либо.

Чтобы сделать расширяемую конвертацию ошибок в объекты с подробным описанием, предлагается следующий алгоритм действий:

  1. Берем класс исключения и получаем список обработчиков, зарегистрированных на данный класс;

  2. У каждого обработчика есть приоритет, по очереди вызывая каждый получаем результат или null, если есть результат, то ошибка сконвертирована и продолжать не требуется;

  3. Считаем классом исключения его суперкласс и повторяем пункт 1.

Сделаем более простую библиотеку, в которой предполагается только 1 обработчик на каждый класс исключения. Тогда базовый интерфейс будет:

public interface ErrorConverter<E extends Throwable> {
    ErrorObject toErrorObject(E e);
}

А чтобы получить карту, где ключем является класс исключения, а значением - обработчик, окажется достаточно следующего кода (если считать, что обработчики напрямую имплементируют интерфейс ErrorConverter):

    private static Map<Class<?>, ErrorConverter<Throwable>> createConverters() {
        HashMap<Class<?>, ErrorConverter<Throwable>> map = new HashMap<>();
        ServiceLoader.load(ErrorConverter.class)
                .forEach(c -> map.put(resolveExceptionType(c), c));
        return map;
    }

    private static Class<?> resolveExceptionType(ErrorConverter<?> c) {
        for (Type type : c.getClass().getGenericInterfaces()) {
            if (type instanceof ParameterizedType && ((ParameterizedType) type).getRawType().equals(ErrorConverter.class)) {
                return (Class<?>) ((ParameterizedType) type).getActualTypeArguments()[0];
            }
        }
        throw new IllegalStateException("Unable to determine handled error type for class: " + c.getClass().getTypeName());
    }

Сохраняем карту в статическое поле (изменять все равно ничего не нужно), и простейшими функциями осуществляем конвертацию:

    public static ErrorObject asErrorObject(Throwable e) {
        return tryConvertRecursive(e, e.getClass());
    }

    private static ErrorObject tryConvertRecursive(Throwable e, Class<?> aClass) {
        var c = CONVERTERS.get(aClass);
        if (c != null) {
            var o = c.toErrorObject(e);
            if (o != null) {
                return o;
            }
        }
        return tryConvertRecursive(e, aClass.getSuperclass());
    }

Теперь даже сложные исключения можно легко преобразовывать в человекочитаемый вид:

public class DataIntegrityViolationExceptionConverter implements ErrorConverter<DataIntegrityViolationException> {
    @Override
    public ErrorObject toErrorObject(DataIntegrityViolationException e) {
        if (e.getCause() instanceof ConstraintViolationException) {
            return parseConstraintViolationException((ConstraintViolationException) e.getCause());
        }
        return null;
    }

    private ErrorObject parseConstraintViolationException(ConstraintViolationException e) {
        if (e.getCause() instanceof BatchUpdateException) {
            return parseBatchUpdateException((BatchUpdateException) e.getCause());
        } else if (e.getCause() instanceof PSQLException) {
            return parsePSQLException((PSQLException) e.getCause());
        }
        return null;
    }

    private ErrorObject parsePSQLException(PSQLException e) {
        if (e.getMessage() != null && e.getMessage().startsWith("ERROR: duplicate key value violates unique constraint")) {
            ServerErrorMessage se = e.getServerErrorMessage();
            return ErrorObject.builder()
                    .id(UUID.randomUUID().toString())
                    .code(SystemError.CONSTRAINT_VIOLATION.getId())
                    .description(SystemError.CONSTRAINT_VIOLATION.getDescription())
                    .meta(se.getDetail())
                    .stackTrace(ErrorObjectUtil.stackTrace2List(e))
                    .build();
        }
        return ErrorObject.builder()
                .id(UUID.randomUUID().toString())
                .code(SystemError.COULD_NOT_EXECUTE_STATEMENT.getId())
                .description(SystemError.COULD_NOT_EXECUTE_STATEMENT.getDescription())
                .meta(e.getMessage())
                .stackTrace(ErrorObjectUtil.stackTrace2List(e))
                .build();
    }

    private ErrorObject parseBatchUpdateException(BatchUpdateException e) {
        if (e.getCause() instanceof PSQLException) {
            return parsePSQLException((PSQLException) e.getCause());
        }
        return null;
    }
}

Проекты scp-write-service и scp-read-service

Оба проекта обеспечивают взаимодействие пользователя с базой данных. Каждый из них по сути своей является сервисом для одного контроллера, в первом для работы с записями абитуриента, второй - для работы со списком поступающих на специальность в режиме для чтения.

В проекте scp-write-service достаточно всего одного контроллера, чтобы реализовать все необходимые ручки для фронта. Так как на собеседовании часто спрашивают про http методы, то вам все в одном месте - наслаждайтесь:

    @RequireRoles("USER")
    @Timed(value = "enrollee_info")
    @GetMapping
    @Schema(description = "Get full information about your selections")
    public Rest<EnrolleeInfo> info() {
        return Rest.of(srv.getInfo(jwt.getUserId(), null));
    }

    @RequireRoles("USER")
    @Timed(value = "enrollee_enroll")
    @PostMapping("/{spec}")
    @Schema(description = "Enroll to speciality, enrolling to same speciality multiple times gives false, status won't be changed")
    public Rest<Boolean> enroll(@PathVariable UUID spec) {
        return Rest.of(srv.enroll(jwt.getUserId(), spec, null));
    }

    @RequireRoles("USER")
    @Timed(value = "enrollee_confirm")
    @PutMapping("/{spec}")
    @Schema(description = "Confirm your application to speciality, you can not confirm cancelled application or if you're not applied to spec")
    public Rest<Boolean> confirm(@PathVariable UUID spec) {
        return Rest.of(srv.confirm(jwt.getUserId(), spec, null));
    }

    @RequireRoles("USER")
    @Timed(value = "enrollee_cancel")
    @DeleteMapping("/{spec}")
    @Schema(description = "Cancel your application to speciality, you won't be able to interact with this spec afterwards rendering this spec non-existent to you")
    public Rest<Boolean> cancel(@PathVariable UUID spec) {
        return Rest.of(srv.cancel(jwt.getUserId(), spec, null));
    }

Аннотация RequiresRoles - это кастомная аннотация для работы аспекта, который проверит в jwt токене наличие заданных ролей. Timed - позволяет регистрировать метрики выполнения ручки, чтобы потом можно было формировать статистику по работе сервиса. И не забываем про swagger - аннотация Schema позволяет добавить описание к генерируемому конфигу, который можно будет позднее получить по ендпоинту /documentation/v3/api-docs.

Сервис EnrolleeService нужен только для того, чтобы начать транзакцию, получить текущую сессию, а после этого выполнить соответствующий запрос.

    @Transactional
    @Override
    public boolean enroll(UUID userId, UUID specId, Integer sessionId) {
        if (sessionId == null) {
            sessionId = sessionDao.getLatestSessionId();
        }
        return enrolleeDao.enroll(userId, specId, sessionId);
    }

Очевидно, что делать два запроса в БД не очень эффективно каждый раз. В качестве домашней работы попробуйте исправить эту часть так, чтобы можно выполнять только один запрос к БД в большинстве случаев.

Самая сложная часть происходит в объекте доступа к данным (dao, repository). Чтобы не затачивать проект сразу же на PostgreSQL, репозиторий сделаем интерфейсом, а реализацию для Postgres имплементацией.

На примере подачи документов на специальность рассмотрим работу репозитория:

    public boolean enroll(UUID userId, UUID specId, int sessionId) {
        // we should prevent conflicts here and return false if nothing inserted
        return 1 == jdbcTemplate.update(
                "INSERT INTO scp_write_service.enrollee_select(user_id, session_id, spec_id) VALUES (?, ?, ?) ON CONFLICT DO NOTHING",
                userId, sessionId, specId);
    }

Данный метод должен возвращать ложь или истину, если абитуриент уже поступает или он зарегистрирован соответственно. Добиться этого можно добавив к запросу ON CONFLICT DO NOTHING, из-за чего выполнение запроса вернет 0, а проверка равенства единице выдаст ложь. Если не существует специальность или сессия, то произойдет ошибка.

Отдельно следует остановиться на конфигурации сервиса:

########## Framework related configuration goes here ################
db:
  schema: 'scp_write_service'

management:
  endpoints:
    web:
      exposure:
        include: 'prometheus,health,info,metric'

spring:
  profiles:
    active: 'prod'
  application:
    name: 'scp-write-service'
  liquibase:
    enabled: 'true'
    change-log: 'classpath:liquibase/changelog/changelog.xml'
    liquibase-schema: '${db.schema}'
    default-schema: '${db.schema}'
  datasource:
    driver-class-name: 'org.postgresql.Driver'
    url: "${SCP_WS_DB_URL}"
    username: "${SCP_WS_DB_USERNAME}"
    password: "${SCP_WS_DB_PASSWORD}"
    hikari:
      connection-timeout: 10000
  sql:
    init:
      mode: 'always'

server:
  port: "${SCP_WS_HTTP_PORT:8080}"
  servlet:
    encoding:
      charset: 'UTF-8'
      force-response: 'true'

########## Application related configuration goes here ##############
scp:
  debug: "${SCP_WS_DEBUG:false}"

Для сбора метрик в дальнейшем, нам потребуется включить актуатор для сбора данных prometheus, это делается через секцию management. Все основные параметры желательно позволять изменять посредством переменных окружения. Названия для текущего сервиса должны начинаться на префикс SCP_WS_ (если только переменная не является глобальной для всех сервисов, тогда префикс SCP_), это позволит проще запускать на одной машине все сервисы для отладки, да и в дальнейшем упростит взаимодействие для людей, так как все повинуется одному шаблону. Такой подход намного лучше, чем использование напрямую переменных окружения с именем свойства спринга (напомню, что server.port можно задать переменной окружения SERVER_PORT). У префикса есть и другие плюсы, особенно если вы часто работаете в консоли.

Так же имеет смысл в этом сервисе слушать события о создании новой специальности или сессии приемной коммисии, а так же регистрации абитуриента на участие в сессии. В рамках данной работы такой функционал реализовываться не будет, т.к. все исходные данные будут загружаться непосредственно в БД.

Для сервиса чтения так же достаточно только одного контроллера:

@RestController
@RequestMapping("/api/v1/{sessionId}/spec")
public class SpecController {
    // ...

    @RequireRoles("USER")
    @Timed("spec_slice")
    @PostMapping("/{specId}")
    @Schema(description = "Retrieve slice of application for selected speciality and selection campaign")
    public Rest<EnrolleeSelect> slice(
            @PathVariable UUID specId,
            @PathVariable int sessionId,
            @RequestBody Pagination pagination) {
        return Rest.of(srv.slice(specId, sessionId, pagination), pagination);
    }

    @RequireRoles("USER")
    @Timed("spec_total")
    @GetMapping("/{specId}/total")
    @Schema(description = "Get total number of applications for selected speciality and selection campaign")
    public Rest<Long> total(@PathVariable UUID specId, @PathVariable int sessionId) {
        return Rest.of(srv.getTotal(specId, sessionId));
    }

Так как в таблице может быть довольно большое число записей (до 625 млн), это может потребовать существенных затрат, чтобы вычислить точное количество абитуриентов для заданной специальности. Поэтому получение количества сделано в виде отдельно ручки, результат которой желательно кешировать, чтобы не обращаться в БД ради одних и тех же данных постоянно.

Фильтрация списка поступающих, например чтобы исключить всех, кто отказался от поступления, может быть сделана посредством объекта Pagination, в рамках цикла статей эта возможность не будет реализовываться.

Проекты scp-write-sender-service и scp-read-receiver-service

Оба проекта очень похожи и выполняют схожие функции, первый пишет в кафку измененные записи абитуриентов и читает сообщение о подтверждении. Второй читает сообщения - измененные записи абитуриентов и пишет, после сохранения в БД, подтверждения о сохраненнии.

Для того, чтобы уменьшить количество кода, для обоих проектов выделены общие части - отправка и прием. Из предыдущей статьи известно, что у каждого модуля чтения есть свой топик, определяется по маске, примененной к идентификатору специальности, аналогино для каждого модуля записи есть свой топик подтверждений, идентифицируемый по числу получаемому путем применения маски к идентификатору пользователя. Логика записи и чтения топиков одинакова, так же как и ожидаемая нагрузка.

Как ранее было сказано, попытка упаковывать записи в пакеты - приведет к замедлению и увеличению времени доставки сообщений по мере роста числа пользователей в системе (а если точнее, то модулей записи). Поэтому записи будут сами являтся сообщениями и писаться как есть. Это позволит убрать проблему с долгой доставкой сообщений.

Сам процесс записи происходить таким образом, что в несколько потоков сервис пытается записать столько записей, сколько сможет (но не более чем пороговое значение, чтобы другие топики могли получить свои записи, иначе может произойти такая ситуация, что одна или несколько задач будут выполняться почти бесконечно). Такой подход гарантирует, что максимальное время доставки будет расти до определенного уровня, но по мере роста числа пользователей остановится на определенной константе, при условии, что не будет расти время отклика сервера по сети. Очевидно, что максимальный период отправки сообщения будет равен M/L * T, где M - число модулей чтения, L - количество потоков, а T - период отправки в топик при максимальной нагрузке. Так как количество изменений в секунду равно константе, то очевидно, что T будет уменьшаться с увеличением M, потому что время записи должно быть пропорционально c/M, где c - константа задающая количество изменений поступающих в модуль записи в секунду.

Таким образом, можно разработать следующий алгоритм работы отправки сообщений в шину данных: сервис делит работу на поток чтения из БД и задачи записи в топики использующими общий пул потоков для обработки по очереди. Поток чтения забирает из БД все записи подлежащие отправке и распределяет их между соответствующими очередями. Потоки записи пытаются записать столько сообщений, сколько смогут с ограничением сверху, чтобы дать другим задачам выполнится. После записи в топик изменения добавляются в очередь потока чтения на сохранение состояния записи в БД.

Код проекта scp-write-sender-service

Можно выделить в абстрактный класс такой процесс обработки распределения сообщений между топиками.

    public void doBackground() {
        while (running) {
            try {
                commit();
                int i = 10;
                // try fetching 10 times if possible, we must commit afterwards
                while (running && i > 0 && shouldFetch() && fetch()) {
                    i--;
                }
                if (i == 10) LockSupport.parkNanos(sleepTime);
            } catch (Throwable e) {
                log.error("Failed to process", e);
                waitOnError();
            }
        }
    }

doBackground - это метод потока чтения из БД, который и пытается добавить в очередь на отправку столько сообщений, сколько сможет. Если по каким-то причинам сообщений добавить не удалось, то необходимо остановить работу потока, но не через Thread.sleep, а LockSupport.parkNanos - это более быстрый способ, который позволяет легко в другом потоке затем разбудить текущий. Такой подход намного лучше, чем использовать wait/notify.

Перед тем, как пытаться читать из БД данные, нужно закоммитить изменения, о которых сообщили задачи отправки изменений. Это достигается простейшим методом:

    private void commit() {
        // flush all changes to persistent storage
        while (!commitQueue.isEmpty()) {
            // 32 items updated in batch per transaction
            List<T> changes = fetchFromQueue(commitQueue, 32);
            source.commit(changes);
            fetchCount -= changes.size();
        }
    }

Очевидно, что этот метод может долго выполняться, поэтому возможно стоит ограничить количество циклов, например пока из очереди будет взято меньше, чем запрашиваемое число объектов. Однако он все равно должен будет рано или поздно остановиться, так как не происходит чтение из БД новых объектов.

    private boolean fetch() {
        boolean empty = false;
        int page = 0;
        int fetched = 0;
        // we collect messages from database till page contains no more
        // elements, or we reached our buffer size
        while (!empty && fetchCount < MAX_FETCH) {
            var c = source.fetch(page);
            registerChanges(c);
            page++;
            fetched += c.size();
            empty = c.isEmpty();
        }
        return fetched > 0;
    }

Метод чтения из БД представляет собой простейший алгоритм постраничного чтения из БД. Следует отметить, что в случае, если какая-то уже прочитанная запись будет изменена в БД, то она окажется на последней странице, т.е. порядок сползет и есть шанс пропустить в данном цикле регистрации какие-то изменения. Как это можно исправить и нужно ли вообще это исправлять - домашнее задание для читателя. Следует отметить, что в следующем цикле чтения запись все равно будет прочитана и отправлена, хоть и с нарушением порядка отправки.

Регистрация изменений только раскидывает в соответствующие очереди изменения согласно их целевому топику в соответствующую задачу записи.

При таком подходе задача записи в топик становится совсем простой:

private void doWork() {
            boolean _stopped = true;
            try {
                // we should process at max maxProcessingChunk messages
                // and then reschedule ourselves in order to let other
                // channels to be processed
                int processed = 0;
                while (running && !isEmpty() && processed < maxProcessingChunk) {
                    processed += sendMessages();
                }
                if (isEmpty()) {
                    notifyFetcher();
                } else {
                    workPool.submit(this::doWork);
                    _stopped = false;
                }
            } catch (Throwable e) {
                log.error("Unable to send messages", e);
                // this would block out fork join pool thread from processing further
                // in case of error it is okay
                waitOnError();
            } finally {
                stopped = _stopped;
            }
        }

        private int sendMessages() {
            List<T> w;
            synchronized (q) {
                w = fetchFromQueue(q, 128);
            }
            var r = sender.send(w, channel);
            commitQueue.addAll(r);
            // source messages used because we need to ensure that
            // if something goes bad with this channel - others will get their
            // place, otherwise we'll lock here indefinitely
            int processed = w.size();
            log.info("Successfully sent {} of {} messages to channel {}", r.size(), processed, channel);
            // if something was not sent - we must reschedule it
            if (r.size() != w.size()) {
                w.removeAll(r);
                addAll(w);
            }
            return processed;
        }

        public void addAll(List<T> l) {
            synchronized (q) {
                q.addAll(l);
                if (stopped && !q.isEmpty()) {
                    stopped = false;
                    workPool.submit(this::doWork);
                }
            }
        }

Так как отправка не обязательно осуществляется при помощи шины данных (можно и рестами долбить целевой сервис), то желательно выделить интерфейс для этого:

public interface ChangeSender<T> {
    List<T> send(List<T> changes, int channel);
}

В этом случае отправка в kafka будет иметь следущую реализацию:

    public List<T> send(List<T> changes, int channel) {
        var topic = topicTemplate + channel;
        List<T> results = Collections.synchronizedList(new ArrayList<>(changes.size()));
        CountDownLatch latch = new CountDownLatch(changes.size());
        // we should map messages before send
        // this way we may be sure that in case of json serialization exception
        // no message sent to kafka
        var list = changes.stream().map(x -> ImmutablePair.of(x, toJson(x))).toList();
        // essentially we are trying to send as many messages as possible, this method
        // should be called with multiple messages to work efficiently, otherwise kafka
        // may refuse to send them right away because buffer is not full enough or not
        // enough time passed from last send
        for (var p : list) {
            kafkaTemplate.send(topic, p.getValue())
                    // handle result, count down latch and register object in results
                    // if success
                    // <some code here>
                    ;
        }
        awaitLatch(latch);
        return results;
    }

Особенностью отправки сообщений в кафку является тот факт, что она не будет писать в топик сообщения до тех пор, пока не накопится достаточное количество байт в буфере отпаврки или не выйдет таймаут. По этой причине писать сообщения по одному невозможно с точки зрения производительности. Разумеется можно вообще игнорировать результат записи, но тогда будет большое число ложных срабатываний отправки и рано или поздно накопится существенное число неотправленных записей. Для этого они пишутся все подряд, а затем ждем, пока CountDownLatch дойдет до 0, что сигнализирует о завершении отправки всех сообщений.

Код проекта scp-read-receiver-service

Если отправка подразумевала однопоточное чтение и многопоточную запись, то прием делает все наоборот. Многопоточно читает из партиций и однопоточно пишет в БД.

    private void doSink() {
        while (running) {
            try {
                timer.update();
                while (shouldCommit()) {
                    doCommit();
                    timer.update();
                }
                LockSupport.parkNanos(DURATION_SINK_WAIT);
            } catch (Throwable e) {
                log.error("Failed to process", e);
                waitOnError();
            }
        }
    }

Все что делает метод - пока выполняется условие для записи в БД - а именно есть достаточное количество записей или прошло достаточно времени, цикл бесконечно пишет в БД. Если из источника сообщений данные будут постоянно приходить, то цикл будет срабатывать постоянно.

    private void doCommit() {
        Object slab = null;
        List<T> list = new ArrayList<>(maxSinkChunkSize);
        int processed = 0;
        synchronized (queue) {
            while (list.size() < sinkChunkSize && !queue.isEmpty()) {
                ChangeChunk<T> chunk = queue.poll();
                for (T c : chunk.changes()) {
                    // there is no need to commit same messages
                    // again, so we skip them and reduce our
                    // storage strain significantly
                    if (idSet.add(idOf(c))) list.add(c);
                }
                // when update is done we should send this object
                // back to receiver in order to commit state
                slab = chunk.slab();
                // because there is no way to understand
                // how many messages we may process when
                // queue registration occurs, we should
                // do it here, that is why we are not
                // using list.size() later
                processed += chunk.changes().size();
            }
        }
        sink.commit(list); // to database
        commitSlab.set(slab); // commit offsets to kafka
        synchronized (queue) {
            messageQueueSize -= processed;
        }
        timer.updateAndReset(DURATION_ONE_SECOND.toMillis());
    }

Сам процесс записи в БД подразумевает, что могут приходить дубликаты сообщений. Они фильтруются при помощи регистрации идентификатора сообщения в специальное множество. Данные из источника приходят в виде списка сообщений и некоего объекта, назовем его точка коммита, который характеризует параметры источника, которые позднее должны быть сохранены дабы не получать те же самые сообщения еще раз (для кафки это оффсеты партиций).

Хотя прием сообщений желательно делать таким образом, чтобы каждую партицию читать в отдельном потоке, автор не раскопал как это сделать аккуратно. Без консьюмера работать с кафкой не удается, поэтому в данной реализации используется один поток, а консьюмер уже считывает из всех партиций сразу, что не есть хорошо, но для демонстраций и комментариев под статьей - сойдет.

    private void doReceive() {
        while (running) {
            try {
                commitReceive();
                int i = 10;
                while (running && i > 0 && shouldReceive() && doReceiveUnsafe()) {
                    i--;
                }
                // if no receive occured
                if (i == 10) LockSupport.parkNanos(DURATION_SINK_WAIT);
            } catch (Throwable e) {
                log.error("Failed to process", e);
                waitOnError();
            }
        }
    }
    private void commitReceive() {
        Object o = commitSlab.get();
        if (o != null) {
            receiver.commit(o);
            // replace that value with null to prevent us
            // from sending same commit again
            commitSlab.compareAndSet(o, null);
        }
    }

Логика приема проста - попытаться сохранить оффсеты в кафку, а затем несколько раз осуществить чтение.

    private boolean doReceiveUnsafe() {
        ChangeChunk<T> chunk = receiver.receive(receiveBufferSize, RECEIVE_TIMEOUT);
        // we double-check running here to prevent further work
        if (running && !chunk.isEmpty()) {
            synchronized (queue) {
                queue.add(chunk);
                messageQueueSize += chunk.changes().size();
                if (messageQueueSize > sinkChunkSize) LockSupport.unpark(sinkThread);
            }
            return true;
        }
        return false;
    }

Каждое чтение из ресивера передает нам список сообщений и точку коммита объединенных оберткой, которую сразу регистрируем в очередь записи в БД, если эта обертка не пуста. Так же нужно обязательно разбудить поток записи в БД, если очередь достаточно заполнена для запуска записи посредством метода LockSupport.unpark.

Ресивер для кафки имплементирует похожий на ChangeSender интерфейс, только заточенный на прием:

    public ChangeChunk<T> receive(int maxSize, Duration duration) {
        var timer = Time.SYSTEM.timer(duration);
        List<T> changes = new ArrayList<>();
        Map<TopicPartition, OffsetAndMetadata> slab = new HashMap<>();
        // we should try to receive as many messages as possible in one go
        // this will help us afterwards by reducing commit calls to kafka
        while (timer.notExpired() && changes.size() < maxSize) {
            timer.update();
            var rs = consumer.poll(timer.remainingMs());
            for (ConsumerRecord<String, String> r : rs) {
                var v = parser.apply(r.value());
                // parser may fail to parse value and return null
                if (v != null) {
                    changes.add(v);
                }
                // for each message we must update our slab, that holds info
                // about each partition offset for commit
                slab.put(new TopicPartition(r.topic(), r.partition()), new OffsetAndMetadata(r.offset() + 1));
            }
        }
        return new ChangeChunk<>(changes, new Slab(slab));
    }

В течении определенного промежутка времени стараемся читать из всех доступных нам партиций посредством консьюмера. Точка коммита должна задавать оффсет сообщения после прочитанного нами сообщения, поэтому обновляем соответствующим образом ее. Если по каким-то причинам сообщение нельзя прочитать из json, то просто игнорируем его, считая, что оно пустое.

Заключение

В данной работе представлен вариант реализации сервисов для предложенной архитектуры и описаны способы решения возникающих проблем. Оптимизация и тюнинг сервисов оставлены на следущую статью, в которой автор расскажет о нелегкой доле дев-о-пса.

P.S.

Комментарии (0)