Привет, Хабр! На связи Андрей Зяблин, Java разработчик компании «Магнит». Несколько лет назад, продляя дочери подписку на Netflix, я заинтересовался, как они поддерживают обслуживание сотен миллионов клиентов. Изучив вопрос, я увлёкся реактивным подходом в программировании. Через некоторое время это увлечение принесло практическую пользу при разработке сервисов в «Магните». В статье я расскажу про три решения, которые позволяют реализовать потоковый обмен данными из БД между распределёнными приложениями:

  1. Реализация с использованием hibernate

  2. Реализация с использованием mybatis

  3. Ограничение скорости обмена с использованием механизма обратного давления («backpressure») и библиотеки Bucket4j

Постановка задачи

Наша команда разрабатывала приложение для интеграции различных сервисов сети, а также для интеграции с государственными сервисами в области ветеринарии «ВетИС».

Упрощенно структура приложения выглядит так:

Бизнес‑запросы — запросы в формате json, предназначенные для обмена на уровне бизнес‑данных (операции, позиции операций, пр.) и раскладывающиеся в более низкоуровневые запросы в форматах, поддерживающихся госсервисами.

ЦСМ (Централизованный сервис сети «Меркурий») — разрабатываемое нами приложение интеграции.

BDSM‑CLIENT — работа с бизнес‑запросами БД СМ (базы данных сети магазинов).

MERCURY — обеспечивает взаимодействие со шлюзом к «ВетИС».

PROCESSING — выполняет обработку бизнес‑запросов до отправки запросов в шлюз.

Проблемным местом оказалась передача данных из ЦСМ в БД СМ: объёмы передаваемых данных огромны, а на стороне БД СМ напрашивался тонкий клиент, пишущий непосредственно в базу.

Требования к обмену:

  • Данные извлекаются из БД

  • Объём данных — миллионы записей за сеанс

  • Передача ведётся через интернет по http

  • Необходимо использовать ORM

Изучив различные технологии, мы пришли к мнению, что в нашем случае оптимально использовать реактивность.

Доработки на стороне ЦСМ и БД СМ:

  • В синхронный код ЦСМ встраивается реактивный код обмена

  • Для приёма данных на стороне БД СМ пишется отдельное реактивное приложение

Используемые технологии:

  • Java streams 

  • Hibernate streamResult 

  • Reactive streams 

  • Spring webflux(reactor) 

  • Spring webclient 

  • Rsocket (для демонстрации) 

Граничные условия:

  • Не всегда имеется возможность использовать R2DBC. На момент разработки для Oracle не было стабильного реактивного драйвера.

  • Hibernate streamResult требует активную транзакцию на всё время фетча. Нужно отслеживать завершение фетча, ошибки, закрытие соединения, чтобы завершить транзакцию. Стандартные решения в виде try‑catch не подходят в данной ситуации.

Решение с использованием hibernate

Основная сложность реализации заключалась в корректной работе с транзакциями в асинхронном режиме. Для этого пришлось создавать отдельный механизм, благо Reactor это позволяет. Основу решения составляют:

  • Stateless session 

  • StreamResult 

  • Генерация Flux из Stream 

Далее рассмотрим код. 

Сущности

  • Customer — справочник заказчиков. 

  • Operation — операции с заказчиками, например, продажи. 

public class Customer { 
 
    @Id 
    @GeneratedValue(strategy = GenerationType.SEQUENCE, generator = "sequence") 
    @SequenceGenerator(name = "sequence", sequenceName = "SQ_CUSTOMER", allocationSize = 1) 
    private Long id; 
 
    @Column(name = "NAME") 
    private String name; 
 
} 
public class Operation { 
 
    @Id 
    @GeneratedValue(strategy = GenerationType.SEQUENCE, generator = "sequence") 
    @SequenceGenerator(name = "sequence", sequenceName = "SQ_OPERATION", allocationSize = 1) 
    private Long id; 
 
    @Column(name = "TOTAL_SUM") 
    private Long totalSum; 
 
    @ManyToOne 
    @JoinColumn(name = "customer_id") 
    private Customer customer; 
 
    @ManyToMany 
    @JoinTable( 
        name = "operation_detail", 
        joinColumns = @JoinColumn(name = "operation_id"), 
        inverseJoinColumns = @JoinColumn(name = "product_id")) 
    private List<Product> products; 
 
} 
 

Загрузка данных 

Загрузка данных достаточно проста и не совсем имеет отношение к теме статьи, здесь демонстрируется мощь функционального программирования. На данном примере показано, насколько функциональна библиотека reactor и как она может облегчить разработку.

Класс com.magnit.flux.common.service.InitDataService

private void fillCustomer() { 
    Flux.range(0, CUSTOMER_RECORD_LIMIT) 
        .map(i -> Customer.builder().name("Customer " + i).build()) 
        .subscribe(customerRepository::save); 
} 
 
private void fillProduct() { 
    Flux.range(0, PRODUCT_RECORD_LIMIT) 
        .map(i -> Product.builder().name("Product " + i).build()) 
        .subscribe(productRepository::save); 
} 
 
private void fillOperation() { 
    Supplier<List<Product>> productSupplier = () -> new Random().ints(5, 1, 8) 
        .boxed().distinct() 
        .map(productId -> Product.builder().id((long) productId).build()) 
        .collect(Collectors.toList()); 
    List<Customer> customers = (List<Customer>) customerRepository.findAll(); 
    Flux.range(0, OPERATION_RECORD_LIMIT) 
        .map(i -> Operation.builder() 
            .totalSum((long) random.nextInt(100)) 
            .customer(customers.get(random.nextInt(customers.size() - 1))) 
            .products(productSupplier.get()) 
            .build()) 
        .buffer(1000) 
        .subscribe(operationRepository::saveAll); 
} 

Использование hibernate для стриминга 

Класс выполняет основную работу по управлению транзакциями в асинхронном режиме. Это что‑то вроде AutoCloseable в синхронных приложениях.

Класс com.magnit.flux.hibernate.dao.HibernateFluxResultProducer

@RequiredArgsConstructor 
public class HibernateFluxResultProducer<T> { 
     
   private final EntityManagerFactory entityManagerFactory; 
 
    private StatelessSession statelessSession; 
 
    /** 
     * Функция выполняет hql запрос и создаёт Flux из Result Stream 
     */ 
    public Flux<T> execute(String qlString, Class<T> resultClass) { 
        Optional.ofNullable(statelessSession).ifPresent(e -> { 
            throw new RuntimeException("Cursor already open"); 
        }); 
        val sessionFactory = entityManagerFactory.unwrap(SessionFactory.class); 
        //Использование Stateless Session позволяет ускорить чтение за счёт использования detached объектов 
        statelessSession = sessionFactory.openStatelessSession(); 
        statelessSession.getTransaction().begin(); 
        val query = statelessSession.createQuery(qlString, resultClass); 
        //Используется возможность hibernate для потокового чтения из БД - getResultStream 
        return Flux.fromStream(query.getResultStream()); 
    } 
 
    public Mono<Void> commit() { 
        return close(true); 
    } 
 
    public Mono<Void> rollback() { 
        return close(false); 
    } 
 
    public Mono<Void> close(boolean commitTran) { 
        if (statelessSession.getTransaction().isActive()) { 
            if (commitTran) { 
                statelessSession.getTransaction().commit(); 
            } else { 
                statelessSession.getTransaction().rollback(); 
            } 
 
        } 
        statelessSession.close(); 
        return Mono.empty(); 
    } 
 
 
} 

Данный класс стартует транзакцию и выполняет hql запрос. Также он имеет методы commit и rollback.

Контроллер, работающий с hibernate 

Основной функционал работы с потоками реализован в приватном методе getOperationFlux. Генерация потока осуществляется вызовом метода Flux.usingWhen, который можно назвать асинхронным AutoCloseable. Контроллер предоставляет два эндпоинта:

  • getOperationsStream — json стриминг 

  • getOperationsWs — веб-сокет 

Класс com.magnit.flux.hibernate.controller.HibernateOperationController

public class HibernateOperationController { 
 
    private final ObjectFactory<HibernateFluxResultProducer<Operation>> streamResultProducerObjectFactory; 
 
    @GetMapping(path = "/operations-stream", produces = "application/stream+json") 
    public Flux<Operation> getOperationsStream() { 
        return getOperationFlux(); 
    } 
 
    @MessageMapping("operations") 
    public Flux<Operation> getOperationsWs() { 
        return getOperationFlux(); 
    } 
 
    private Flux<Operation> getOperationFlux() { 
        Mono<HibernateFluxResultProducer<Operation>> streamResultExecutorMono = Mono 
            .just(streamResultProducerObjectFactory.getObject()); 
        return Flux.usingWhen(streamResultExecutorMono, 
            se -> se.execute("select o from Operation o JOIN FETCH o.customer c", Operation.class), 
            HibernateFluxResultProducer::commit); 
    } 
 
} 

Результат работы первого метода можно посмотреть непосредственно в браузере:

Можно останавить загрузки или даже закрыть браузер. Можно убедиться, что метод close всегда вызывается.

Второй метод — более интерсный. На его основе можно реализовать обмен по протоколу Rsocket. Для тестирования Rsocket клиент реализован на javascript. Также данный пример демонстрирует использование механизма backpressure (обратное давление).

statis/index.js

client.connect().subscribe({ 
    onError: error => console.error(error), 
    onSubscribe: cancel => { 
    }, 
    onComplete: socket => { 
        socket.requestStream({ 
            data: null, 
            metadata: String.fromCharCode("operations".length) + "operations" 
        }) 
        .subscribe({ 
            onComplete: () => console.log("requestStream done"), 
            onError: error => { 
                console.log("got error with requestStream"); 
                console.error(error); 
            }, 
            onNext: value => { 
                document.getElementById( 
                    "operation").innerText = "Operation ID: " 
                    + value.data.id + ". Customer: " + value.data.customer.name; 
                //Управление обратным давлением вручную. Можно поставить на паузу (функция pause), можно возобновить чтение (функция suspend) 
                if (!paused) { 
                    subscription.request(1); 
                } 
            }, 
            onSubscribe: sub => { 
                subscription = sub; 
                subscription.request(1); 
            } 
        }); 
    } 
}); 

Результаты работы также можно посмотреть в браузере. 

Кнопки Pause и Suspend позволяют остановить и возобновить загрузку потока. 

Отношения один ко многим и многие ко многим (OneToMany &ManyToMany) 

Теперь рассмотрим достаточно частую ситуацию, когда со стороны заказчика выдвигаются новые требования. В данном случае упрощенный пример: заказчик выдвинул новые требования к обмену: теперь нужно выгружать товары, содержащиеся в операциях. Теоретически это можно сделать и с hibernate, но возникают проблемы с производительностью. Как альтернативу мы рассмотрели MyBatis. Ниже сравниваем MyBatis и hiberabte.

MyBatis как более быстрая альтернатива hibernate

MyBatis 

Hibernate 

Простая разработка, так как включает в основном написание SQL запросов 

Разработка более сложная, так как hibernate более громоздкий и сложный для понимания 

MyBatis использует SQL, который зависит от БД 

Hibernate в общем случае не зависит от БД 

Очень просто использовать хранимые процедуры, особенно с курсорами в качестве out параметров 

В некоторых случаях хранимые процедуры использовать непросто 

Так как SQL зависит от БД, базу данных сменить очень сложно 

Hibernate позволяет легко изменить БД 

Механизмы кэширования в Mybatis не развиты 

Hibernate имеет очень хорошие механизмы кэширования 

MyBatis позволяет обрабатывать отношения «один ко многим» и «многие ко многим» в стримах 

Hibernate не позволяет обрабатывать отношения один ко многим и многие ко многим в стримах 

В нашем случае важна способность MyBatis обрабатывать отношения один ко многим и многие ко многим в стримах. 

Стриминг данных в MyBatis 

  • В MyBatis 3.4.0 добавлен тип курсор — org.apache.ibatis.cursor.Cursor, аналогичный классу Resultset в JDBC 

  • Курсор реализует интерфейс итератора, поэтому использовать его очень просто 

  • Курсор позволяет использовать коллекции в результатах, поэтому можно реализовать отображение отношений one-to-many и many-to-many 

Изменения в сущностях 

  • Product — справочник продукции 

  • Operation — добавилась связь @ManyToMany со справочником продукции 

@Entity 
@Table(name = "PRODUCT") 
@Getter 
@Setter 
@Builder 
@NoArgsConstructor 
@AllArgsConstructor 
@ToString 
public class Product { 
 
    @Id 
    @GeneratedValue(strategy = GenerationType.SEQUENCE, generator = "sequence") 
    @SequenceGenerator(name = "sequence", sequenceName = "SQ_OPERATION_DETAIL", allocationSize = 1) 
    private Long id; 
 
    @Column(name = "NAME") 
    private String name; 
 
} 
 

 

@ManyToMany 
@JoinTable( 
    name = "operation_detail", 
    joinColumns = @JoinColumn(name = "operation_id"), 
    inverseJoinColumns = @JoinColumn(name = "product_id")) 
private List<Product> products; 

Маппинг сущностей 

Основная тонкость работы с MyBatis заключается в маппинге .

<mapper namespace="com.magnit.flux.mybatis.mapper.OperationMapper"> 
 
    <select id="getAllOperations" resultMap="operationResult" resultOrdered="true"> 
        select d.operation_id id, d.product_id, p.name product_name from operation_detail d inner 
        join product p on p.id = d.product_id order by d.operation_id 
    </select> 
 
 
    <resultMap id="operationResult" type="com.magnit.flux.model.entity.Operation"> 
        <id property="id" column="id"/> 
        <collection property="products" ofType="com.magnit.flux.model.entity.Product"> 
            <id property="id" column="product_id"/> 
            <result property="name" column="product_name"/> 
        </collection> 
    </resultMap> 
 
</mapper>

Здесь getAllOperations определяет отсортированный набор данных (resultOrdered=”true”), а MyBatis автоматом группирует данные по operation.id и возвращает операции с вложенными коллекциями продукции. 

Использование MyBatis для стриминга 

Класс аналогичный HibernateFluxResultProducer. Он имеет пару отличий: 

  • В качестве параметра передаётся не hql запрос, а лямбда-выражение, возвращающее курсор 

  • В данном подходе не используются долгоиграющие транзакции, что даёт небольшое преимущество перед hibernate. По крайней мере DBA будут спокойнее).

Класс com.magnit.flux.mybatis.dao.MyBatisFluxResultProducer

public class MyBatisFluxResultProducer<T> { 
     
   private final SqlSessionFactory sqlSessionFactory; 
 
    private Cursor<T> cursor; 
 
    private SqlSession sqlSession; 
 
    /** 
     * @param cursorFunction функция для продюсирования курсора 
     * @return Flux соответсвующего типа 
     */ 
    public Flux<T> execute(Function<SqlSession, Cursor<T>> cursorFunction) { 
        Optional.ofNullable(sqlSession).ifPresent(e -> { 
            throw new RuntimeException("Cursor already open"); 
        }); 
 
        sqlSession = sqlSessionFactory.openSession(); 
        cursor = cursorFunction.apply(sqlSession); 
        return Flux.fromStream(StreamSupport.stream(cursor.spliterator(), false)); 
    } 
 
    @SneakyThrows 
    public Mono<Void> close() { 
        if (cursor.isOpen()) { 
            cursor.close(); 
 
        } 
        sqlSession.close(); 
        return Mono.empty(); 
    } 
 
} 

Контроллер, работающий с MyBatis 

Контроллер выглядит практически аналогично контроллеру, работающему с hibrernate.

Класс com.magnit.flux.mybatis.controller.MyBatisOperationController

public class MyBatisOperationController { 
     
   private final SqlSessionFactory sqlSessionFactory; 
 
    private final ObjectFactory<MyBatisFluxResultProducer<Operation>> streamResultProducerObjectFactory; 
 
    @GetMapping(path = "/operations-stream", produces = "application/stream+json") 
    public Flux<Operation> getOperationsStream() { 
        return getOperationFlux(); 
    } 
 
    private Flux<Operation> getOperationFlux() { 
        Function<SqlSession, Cursor<Operation>> cursorFunction = sqlSession -> sqlSession 
            .getMapper(OperationMapper.class).getAllOperations(); 
 
        Mono<MyBatisFluxResultProducer<Operation>> streamResultExecutorMono = Mono 
            .just(streamResultProducerObjectFactory.getObject()); 
        return Flux.usingWhen(streamResultExecutorMono, 
            se -> se.execute(cursorFunction), 
            MyBatisFluxResultProducer::close); 
    } 
 
} 

Результат также можно посмотреть в браузере.

Проблема приёма данных на стороне БД СМ

Проблема с передачей больших объёмов данных была решена, но возникла проблема перегрузки БД запросами. Необходимо было ограничить количество запросов на запись в БД. На помощь приходит backpressure (обратное давление).  

Использование backet4j для реализации backpressure 

  • Bucket4j — это библиотека ограничения скорости, основанная на алгоритме token-bucket 

  • Bucket4j — это потокобезопасная библиотека, которую можно использовать как в автономном приложении JVM, так и в кластерной среде 

  • Bucket4j поддерживает кэширование в памяти или распределенное кэширование с помощью спецификации JCache (JSR107) 

Алгоритм Token Bucket

  • Создаётся корзина, вместимость которого определяется количеством токенов, которое она может вместить 

  • Всякий раз, когда потребитель хочет получить доступ к конечной точке API, он должен получить токен из корзины 

  • Токен из корзины удаляется, если он доступен, и запрос принимается 

  • Если в корзине нет токенов, то запрос отклоняется 

  • Токены пополняются с фиксированной скоростью, которая и является ограничением 

Подробнее о bucket4j здесь.

Bucket4j и Flux 

В данной статье представлены два варианта реализации связки Bucket4j и Flux 

  • Блокирующая с использованием методов Flux.delayUntil и Bucket.asBlocking 

  • Асинхронная, использующая механизм управления подписчиком (Subscription) и Bucket.asScheduler 

Реактивный клиента на java с использованием Bucket4j 

В классе RateLimitService реализованы два соответствующих метода, которые с заданной скоростью фетчат записи из реактивного http-сервиса, который описан выше. 

Использование блокирующего API bucket4j для backpressure 

Класс com.magnit.fluex.client.service.RateLimitService.executeBlocking

public void executeBlocking(long rate) { 
    AtomicLong counter = new AtomicLong(0); 
    val limit = Bandwidth.simple(rate, Duration.ofMinutes(1)); 
    val bucket = Bucket.builder().addLimit(limit).build(); 
    val flux = client.get().retrieve().bodyToFlux(Operation.class); 
    flux.delayUntil(operation -> { 
        try { 
            bucket.asBlocking().consume(1); 
        } catch (InterruptedException e) { 
            e.printStackTrace(); 
        } 
        return Mono.just("two"); 
    }).subscribe(value ->  System.out.println(counter.incrementAndGet() + " " + value)); 
} 

Основу логики составляет метод Flux.delayUntil в связке bucket.asBlocking.  

При вызове bucket.asBlocking токен либо возвращается, либо поток блокируется до пополнения корзины. 

Использование шедулинга bucket4j для backpressure 

Класс com.magnit.flux.client.service.RateLimitService.executeScheduling

public void executeScheduling(long rate) { 
    AtomicLong counter = new AtomicLong(0); 
    val service = Executors.newSingleThreadScheduledExecutor(); 
    val limit = Bandwidth.simple(rate, Duration.ofMinutes(1)); 
    val bucket = Bucket.builder().addLimit(limit).build(); 
    final long watTime = 50; 
    BaseSubscriber<Operation> subscriber = new BaseSubscriber<Operation>() { 
        private void handleFutureResult(Boolean result) { 
            if (result) { 
                this.request(1); 
            } else { 
                service.schedule(() -> asyncRequest(), watTime, TimeUnit.NANOSECONDS); 
            } 
        } 
 
        @SneakyThrows 
        private void asyncRequest() { 
            val future = bucket.asScheduler().tryConsume(50, watTime, service); 
            if (future.isDone()) { 
                handleFutureResult(future.get()); 
            } else { 
                future.thenAcceptAsync(value -> { 
                    handleFutureResult(value); 
                }); 
            } 
        } 
 
        @Override 
        protected void hookOnSubscribe(Subscription subscription) { 
            asyncRequest(); 
        } 
 
        @Override 
        protected void hookOnNext(Operation value) { 
            System.out.println(counter.incrementAndGet() + " " + value); 
            asyncRequest(); 
        } 
    }; 
    val flux = client.get().retrieve().bodyToFlux(Operation.class); 
    flux.subscribe(subscriber); 
} 

В данном методе основной поток не блокируется. Создаётся подписчик, наследованный от класса из библиотеки reactor — BaseSubscriber. Токен получается методом bucket.asScheduler, который возвращает объект CompletableFuture. Если токен удалось получить (CompletableFuture .isDone() == true), тогда фетчится очередная запись, иначе фетч запускается после пополнения корзины. Ожидание пополнения происходит асинхронно — метод CompletableFuture.thenAcceptAsync. 

Выводы

  • В сервисе нам удалось совместить блокирующую и реактивную архитектуры 

  • В выборе между hibernate и mybatis мы остановились на hibernate, так как, во-первых, приложение было уже написано с использованием hibernate, а во-вторых, нам важно кеширование данных. 

  • Помимо реактивности, мы получили возможность использовать в приложении весь богатый функционал flux. 

Комментарии (1)


  1. dmarsentev
    00.00.0000 00:00
    +4

    "BDSM‑CLIENT — работа с бизнес‑запросами БД СМ (базы данных сети магазинов)."

    Приятно, когда в аббревиатурах разрабы позволяют себе иронию. Спасибо!