Всем привет! Вот и он – наш заключительный пост из серии про Quarkus! (Кстати, смотрите наш вебинар «Это Quarkus – Kubernetes native Java фреймворк». Покажем, как начать «с нуля» или перенести готовые решения)



В предыдущем посте мы рассмотрели соответствующие инструменты, с помощью которых можно количественно оценить улучшения, полученные в результате модернизации Java-приложений.

Начиная с версии 0.17.0, Quarkus поддерживает использование Advanced Message Queuing Protocol (AMQP), который является открытым стандартом передачи бизнес-сообщений между приложениями или организациями.

Red Hat AMQ Online – это сервис, построенный на основе открытого проекта EnMasse и реализующий механизм обмена сообщений на базе платформы Red Hat OpenShift. Подробнее о том, как он устроен, см. здесь (EN). Сегодня мы покажем, как объединить AMQ Online и Quarkus, чтобы построить современную систему обмена сообщениями на базе OpenShift с использованием двух новых технологий, связанных с обработкой сообщений.

Предполагается, что вы уже развернули AMQ Online на платформе OpenShift (если нет, то см. руководство по установке).

Для начала мы создадим приложения Quarkus, которое будет представлять собой простую систему обработки заказов с использованием реактивного обмена сообщениями. Это приложение будет включать в себя генератор заказов, отправляющий заказы в очередь сообщений с фиксированным интервалом, а также обработчик заказов, который будет обрабатывать сообщения из очереди и формировать подтверждения, доступные для просмотра в браузере.

После создания приложения мы покажем, как внедрять в него конфигурацию системы обмена сообщениями и задействуем AMQ Online, чтобы инициализировать на этой системе нужные нам ресурсы.

Приложение Quarkus


Наше Quarkus-приложение выполняется на OpenShift и представляет собой модифицированную версию программы amqp-quickstart. Полный пример клиентской части можно найти здесь.

Генератор заказов


Генератор каждые 5 секунд просто монотонно отправляет растущие идентификаторы заказов на адрес «orders».

@ApplicationScoped
public class OrderGenerator {
 
    private int orderId = 1;
 
    @Outgoing("orders")
    public Flowable<Integer> generate() {
        return Flowable.interval(5, TimeUnit.SECONDS)
        .map(tick -> orderId++);
    }
}

Обработчик заказов


Обработчик заказов еще проще, он всего лишь возвращает идентификатор подтверждения на адрес «confirmations».

@ApplicationScoped
public class OrderProcessor {
    @Incoming("orders")
    @Outgoing("confirmations")
    public Integer process(Integer order) {
        // Идентификатор подтверждения равен удвоенному идентификатору заказа <img draggable="false" class="emoji" alt=":-)" src="https://s.w.org/images/core/emoji/11.2.0/svg/1f642.svg">
        return order * 2;
    }
}

Ресурсы подтверждения


Ресурс подтверждения – это конечная точка HTTP для листинга сформированных в результате работы нашего приложения подтверждений.

@Path("/confirmations")
public class ConfirmationResource {
 
    @Inject
    @Stream("confirmations") Publisher<Integer> orders;
 
    @GET
    @Produces(MediaType.TEXT_PLAIN)
    public String hello() {
        return "hello";
    }
 
 
    @GET
    @Path("/stream")
    @Produces(MediaType.SERVER_SENT_EVENTS)
    public Publisher<Integer> stream() {
        return orders;
    }
}

Настройка


Для подключения к AMQ Online нашему приложению понадобится некоторые конфигурационные данные, а именно: конфигурация Quarkus-коннектора, сведения о конечной точке AMQP и клиентские учетные данные. Лучше, конечно, держать все конфигурационные данные в одном месте, но мы специально разделим их, чтобы показать возможные варианты настройки приложения Quarkus.

Коннекторы


Конфигурацию коннектора можно предоставить на этапе компиляции с помощью файла свойств приложения:

mp.messaging.outgoing.orders.connector=smallrye-amqp
mp.messaging.incoming.orders.connector=smallrye-amqp

Чтобы не усложнять, мы будем использовать очередь сообщений только для адреса «orders». А адрес «confirmations» в нашем приложении будет использовать очередь в памяти.

Конечная точка AMQP


На этапе компиляции имя хоста и номер порта для конечной точки AMQP неизвестны, поэтому их нужно внедрить. Конечную точку можно задать в configmap, который создается AMQ Online, поэтому мы определим их через переменные среды в манифесте приложения:

spec:
  template:
    spec:
      containers:
      - env:
        - name: AMQP_HOST
          valueFrom:
            configMapKeyRef:
              name: quarkus-config
              key: service.host
        - name: AMQP_PORT
          valueFrom:
            configMapKeyRef:
              name: quarkus-config
              key: service.port.amqp

Учетные данные


Маркер учетной записи сервиса можно использовать для аутентификации нашего приложения в OpenShift. Для этого надо сначала создать пользовательский ConfigSource, который будет читать маркер аутентификации из файловой системы pod’а:

public class MessagingCredentialsConfigSource implements ConfigSource {
    private static final Set<String> propertyNames;
 
    static {
        propertyNames = new HashSet<>();
        propertyNames.add("amqp-username");
        propertyNames.add("amqp-password");
    }
 
    @Override
    public Set<String> getPropertyNames() {
        return propertyNames;
    }
 
    @Override
    public Map<String, String> getProperties() {
        try {
            Map<String, String> properties = new HashMap<>();
            properties.put("amqp-username", "@@serviceaccount@@");
            properties.put("amqp-password", readTokenFromFile());
            return properties;
        } catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }
 
    @Override
    public String getValue(String key) {
        if ("amqp-username".equals(key)) {
            return "@@serviceaccount@@";
        }
        if ("amqp-password".equals(key)) {
            try {
                return readTokenFromFile();
            } catch (IOException e) {
                throw new UncheckedIOException(e);
            }
        }
        return null;
    }
 
    @Override
    public String getName() {
        return "messaging-credentials-config";
    }
 
    private static String readTokenFromFile() throws IOException {
        return new String(Files.readAllBytes(Paths.get("/var/run/secrets/kubernetes.io/serviceaccount/token")), StandardCharsets.UTF_8);
    }
}

Сборка и развертывание приложения


Поскольку приложение надо скомпилировать в исполняемый файл, потребуется виртуальная машина GraalVM. Подробнее о том, как настроить для этого окружение, см. соответствующие инструкции в Quarkus Guide.

Затем, следуя приведенным там инструкциям, надо скачать исходник, провести сборку и выполнить развертывание нашего приложения:

git clone https://github.com/EnMasseProject/enmasse-example-clients
cd enmasse-example-clients/quarkus-example-client
oc new-project myapp
mvn -Pnative -Dfabric8.mode=openshift -Dfabric8.build.strategy=docker package fabric8:build fabric8:resource fabric8:apply

После этих команд приложение будет развернуто, но не запустится до тех пор, пока мы не настроим в AMQ Online необходимые нам ресурсы обмена сообщениями.

Настройка системы обмена сообщениями


Теперь осталось задать в системе обмена сообщениями ресурсы, которые нужны нашему приложению. Для этого надо создать: 1) адресное пространство, чтобы инициализировать конечную точку системы обмена сообщениями; 2) адрес, чтобы настроить адреса, которые мы используем в приложении; 3) пользователя системы обмена сообщениями, чтобы задать учетные данные клиента.

Пространство адресов


Объект AddressSpace в AMQ Online – это группа адресов, которые совместно используют конечные точки подключения, а также политики аутентификации и авторизации. При создании пространства адресов можно задать, как именно будут предоставляться конечные точки системы обмена сообщениями:

apiVersion: enmasse.io/v1beta1
kind: AddressSpace
metadata:
  name: quarkus-example
spec:
  type: brokered
  plan: brokered-single-broker
  endpoints:
  - name: messaging
    service: messaging
    exports:
    - name: quarkus-config
      kind: configmap

Адреса


Адреса используются для отправки и приема сообщений. У каждого адреса есть тип, который определяет его семантику, а также план, который задает количество резервируемых ресурсов. Адрес можно определить, например, вот так:

apiVersion: enmasse.io/v1beta1
kind: Address
metadata:
  name: quarkus-example.orders
spec:
  address: orders
  type: queue
  plan: brokered-queue

Пользователь системы обмена сообщениями


Чтобы отправлять и получать сообщения на ваши адреса могли только доверенные приложения, в системе обмена сообщениями необходимо создать пользователя. Для приложений, работающих на кластере, клиентов можно аутентифицировать с помощью учетной записи сервиса OpenShift. Пользователя «serviceaccount» можно определить, например, так:

apiVersion: user.enmasse.io/v1beta1
kind: MessagingUser
metadata:
  name: quarkus-example.app
spec:
  username: system:serviceaccount:myapp:default
  authentication:
    type: serviceaccount
  authorization:
  - operations: ["send", "recv"]
    addresses: ["orders"]

Разрешения для настройки приложения


Чтобы AMQ Online мог создать configmap, который мы использовали для внедрения сведений о конечной точке AMQP, необходимо задать роль и привязку роли (Role и RoleBinding):

---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
  name: quarkus-config
spec:
  rules:
  - apiGroups: [ "" ]
    resources: [ "configmaps" ]
    verbs: [ "create" ]
  - apiGroups: [ "" ]
    resources: [ "configmaps" ]
    resourceNames: [ "quarkus-config" ]
    verbs: [ "get", "update", "patch" ]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  name: quarkus-config
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: Role
  name: quarkus-config
subjects:
- kind: ServiceAccount
  name: address-space-controller
  namespace: amq-online-infra

Как применить конфигурации


Применить конфигурацию системы обмена сообщениями можно вот так:

cd enmasse-example-clients/quarkus-example-client
oc project myapp
oc apply -f src/main/resources/k8s/addressspace
oc apply -f src/main/resources/k8s/address

Верификация приложения


Чтобы убедиться, что приложение запустилось, первым делом проверим, создались ли и активны ли соответствующие адреса:

until [[ `oc get address quarkus-example.prices -o jsonpath='{.status.phase}'` == "Active" ]]; do echo "Not yet ready"; sleep 5; done

Затем проверим URL маршрута приложения (просто откроем это адрес в браузере):

echo "http://$(oc get route quarkus-example-client -o jsonpath='{.spec.host}')/prices.html"

В браузере должно быть видно, что тикеты периодически обновляются по мере того, как сообщения отправляются и принимаются AMQ Online.

Подводим итоги


Итак, мы написали приложение Quarkus, использующее AMQP для обмена сообщениями, настроили это приложение для работы на платформе Red Hat OpenShift, а также внедрили его конфигурацию на основе конфигурации AMQ Online. Затем мы создали манифесты, необходимые для инициализации системы обмена сообщениями под наше приложение.

На этом мы завершаем серию про Quarkus, но впереди много нового и интересного, оставайтесь с нами!