Автор статьи: Рустем Галиев

IBM Senior DevOps Engineer & Integration Architect

Привет Хабр!

Сегодня мы узнаем, как написать микросервис, который сохраняет данные в Apache Cassandra с помощью Java-драйвера DataStax и генерирует события для изменений данных в Apache Kafka.

Этот пост основан на дизайне простого микросервиса для управления данными бронирования отелей, который называется Reservation Service. Вы можете выполнить серию упражнений по записи и чтению данных в Cassandra с помощью службы резервирования в наборе «Cassandra: разработка приложений с помощью Java-драйвера DataStax». Исходный код службы бронирования доступен на GitHub.

Служба бронирования использует модель данных бронирования отелей, которая включает приведенные ниже таблицы.

Cassandra и Kafka часто используются вместе в микросервисных архитектурах, как показано ниже. В этом дизайне служба бронирования получает запрос API для выполнения некоторых действий, таких как создание, обновление или удаление бронирования. После сохранения изменения в Cassandra служба резервирования отправляет сообщение в топик reservation в кластере Kafka.

Другие службы используют топик reservation и выполняют различные действия в ответ на каждое сообщение: например, служба инвентаризации обновляет таблицы инвентаризации, чтобы отметить даты как зарезервированные, или, возможно, служба электронной почты отправляет электронное письмо с благодарностью гостю за бронирование. В этом стиле взаимодействия Кассандра и Кафка не связаны напрямую, а используются взаимодополняющим образом.

Настройка службы бронирования


Начнем с клонирования исходного кода Reservation Service с GitHub и проверки ветки, которая станет отправной точкой:

git clone -b kafka https://github.com/jeffreyscarpenter/reservation-service

После того, как это будет завершено, мы должны увидеть директорию reservation_service.

Давайте рассмотрим часть содержимого в коде.

Во-первых, мы рассмотрим схему, которую будем использовать для таблиц резервирования по пути reservation-service/src/main/resources/reservation.cql:

/*
 * Copyright (C) 2016-2020 Jeff Carpenter
 */

/* This file contains a slightly modified version of the "reservation" keyspace and table definitions
 * for the example defined in Chapter 5 of Cassandra: The Definitive Guide, 2nd and 3nd Editions.
 * The changes are to facilitate development exercises.
 */

CREATE KEYSPACE reservation
    WITH replication = {'class': 'SimpleStrategy', 'replication_factor' : 1};

CREATE TABLE reservation.reservations_by_hotel_date (
    hotel_id text,
    start_date date,
    end_date date,
    room_number smallint,
    confirm_number text,
    guest_id uuid,
    PRIMARY KEY ((hotel_id, start_date), room_number)
);

CREATE TABLE reservation.reservations_by_confirmation (
    confirm_number text PRIMARY KEY,
    hotel_id text,
    start_date date,
    end_date date,
    room_number smallint,
    guest_id uuid
);

/*
 The following tables are provided for completeness with the book text, but they are not used in the current
 implementation of the Reservation Service
 */

CREATE TABLE reservation.reservations_by_guest (
    guest_last_name text,
    hotel_id text,
    start_date date,
    end_date date,
    room_number smallint,
    confirm_number text,
    guest_id uuid,
    PRIMARY KEY ((guest_last_name), hotel_id)
);

CREATE TYPE reservation.address (
    street text,
    city text,
    state_or_province text,
    postal_code text,
    country text
);

CREATE TABLE reservation.guests (
    guest_id uuid PRIMARY KEY,
    first_name text,
    last_name text,
    title text,
    emails set<text>,
    phone_numbers list<text>,
    addresses map<text, frozen<address>>
);

Мы также хотим отметить зависимости для драйвера DataStax Java и клиентских библиотек Kafka по пути reservation-service/pom.xml:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>

	<groupId>com.cassandraguide</groupId>
	<artifactId>reservation-service</artifactId>
	<version>1.0.0</version>
	<packaging>jar</packaging>

	<name>reservation-service</name>
	<description>Demo service using Cassandra driver and Spring Boot</description>

	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.2.6.RELEASE</version>
		<relativePath />
	</parent>

	<properties>
		<java.version>11</java.version>
		<swagger.version>2.9.2</swagger.version>
		<junit-jupiter.version>5.4.2</junit-jupiter.version>
		<junit-platform.version>1.7.0-M1</junit-platform.version>
		<oss-java-driver.version>4.5.1</oss-java-driver.version>
		<kafka-client.version>2.5.0</kafka-client.version>
		<testcontainers.version>1.14.1</testcontainers.version>

		<version.maven.plugin.compiler>3.8.0</version.maven.plugin.compiler>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<start-class>dev.cassandraguide.ReservationServiceApp</start-class>
		<dockerfile-maven-version>1.4.13</dockerfile-maven-version>

	</properties>

	<dependencies>

		<!-- Create a RESTFul Service -->
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>

		<!-- Document for REST Service -->
		<dependency>
			<groupId>io.springfox</groupId>
			<artifactId>springfox-swagger2</artifactId>
			<version>${swagger.version}</version>
		</dependency>
		<dependency>
			<groupId>io.springfox</groupId>
			<artifactId>springfox-swagger-ui</artifactId>
			<version>${swagger.version}</version>
		</dependency>

		<!-- Cassandra Driver -->
		<dependency>
			<groupId>com.datastax.oss</groupId>
			<artifactId>java-driver-core</artifactId>
			<version>${oss-java-driver.version}</version>
		</dependency>
		<dependency>
			<groupId>com.datastax.oss</groupId>
			<artifactId>java-driver-query-builder</artifactId>
			<version>${oss-java-driver.version}</version>
		</dependency>

		<dependency>
			<groupId>org.apache.kafka</groupId>
			<artifactId>kafka-clients</artifactId>
			<version>${kafka-client.version}</version>
		</dependency>

		<dependency>
			<groupId>org.apache.commons</groupId>
			<artifactId>commons-lang3</artifactId>
		</dependency>
		<!-- Provides JSON serialization/deserialization for date/time types -->
		<dependency>
			<groupId>com.fasterxml.jackson.datatype</groupId>
			<artifactId>jackson-datatype-jsr310</artifactId>
		</dependency>

		<!-- Tests -->
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
			<exclusions>
				<exclusion>
					<groupId>junit</groupId>
					<artifactId>junit</artifactId>
				</exclusion>
			</exclusions>
		</dependency>
		<!-- Junit 5 -->
		<dependency>
			<groupId>org.junit.jupiter</groupId>
			<artifactId>junit-jupiter-api</artifactId>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>org.junit.jupiter</groupId>
			<artifactId>junit-jupiter-engine</artifactId>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>org.junit.jupiter</groupId>
			<artifactId>junit-jupiter-params</artifactId>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>org.junit.platform</groupId>
			<artifactId>junit-platform-engine</artifactId>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>org.junit.platform</groupId>
			<artifactId>junit-platform-launcher</artifactId>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>org.junit.platform</groupId>
			<artifactId>junit-platform-runner</artifactId>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>org.junit.platform</groupId>
			<artifactId>junit-platform-console-standalone</artifactId>
			<version>${junit-platform.version}</version>
			<scope>test</scope>
		</dependency>
		<!-- Test against Docker Containers -->
		<dependency>
			<groupId>org.testcontainers</groupId>
			<artifactId>cassandra</artifactId>
			<scope>test</scope>
			<version>${testcontainers.version}</version>
		</dependency>
		<dependency>
			<groupId>org.testcontainers</groupId>
			<artifactId>testcontainers</artifactId>
			<scope>test</scope>
			<version>${testcontainers.version}</version>
		</dependency>
		<dependency>
			<groupId>org.testcontainers</groupId>
			<artifactId>kafka</artifactId>
			<scope>test</scope>
			<version>${testcontainers.version}</version>
		</dependency>
		<!-- Add driver keys to spring-boot config file -->
		<dependency>
           <groupId>org.springframework.boot</groupId>
           <artifactId>spring-boot-configuration-processor</artifactId>
           <optional>true</optional>
        </dependency>
        
	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
				<configuration>
					<fork>true</fork>
					<mainClass>${start-class}</mainClass>
				</configuration>
				<executions>
					<execution>
						<goals>
							<goal>repackage</goal>
						</goals>
					</execution>
				</executions>
			</plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>${java.version}</source>
                    <target>${java.version}</target>
				</configuration>
            </plugin>

			<plugin>
				<groupId>com.spotify</groupId>
				<artifactId>dockerfile-maven-plugin</artifactId>
				<version>${dockerfile-maven-version}</version>
				<configuration>
					<repository>reservation-service</repository>
					<tag>${project.version}</tag>
					<buildArgs>
						<JAR_FILE>${project.build.finalName}.jar</JAR_FILE>
					</buildArgs>
				</configuration>
			</plugin>
        </plugins>
	</build>

</project>

В исходном коде есть несколько ключевых классов Java, которые мы могли бы также изучить:

Определение класса сущностей, используемое в HTTP API, которое представляет тип данных, которые мы сохраняем в Cassandra и публикуем в теме Kafka reservation-service/src/main/java/dev/cassandraguide/model/Reservation.java:

/*
 * Copyright (C) 2017-2020 Jeff Carpenter
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package dev.cassandraguide.model;

import java.io.Serializable;
import java.time.LocalDate;
import java.util.UUID;

/**
 * Entity working with Reservation on Cassandra.
 *
 * @author Jeff Carpenter
 */
public class Reservation implements Serializable {

    /** Serial. */
    private static final long serialVersionUID = -3392237616280919281L;
    
    /** Hotel identifier, as Text not UUID (for simplicity). */
    private String hotelId;
    
    /** Formated as YYYY-MM-DD in interfaces. */
    private LocalDate startDate;
    
    /** Formated as YYYY-MM-DD in interfaces. */
    private LocalDate endDate;
    
    /** Room number. */
    private short roomNumber;
    
    /** UUID. */
    private UUID guestId;
    
    /** Confirmation for this Reservation. */
    private String confirmationNumber;
    
    /**
     * Default constructor
     */
    public Reservation() {
    }
    
    /**
     * Default constructor
     */
    public Reservation(ReservationRequest form) {
        setStartDate(form.getStartDate());
        setEndDate(form.getEndDate());
        setHotelId(form.getHotelId());
        setGuestId(form.getGuestId());
        setRoomNumber(form.getRoomNumber());
    }
    
    /**
     * Default constructor
     */
    public Reservation(ReservationRequest form, String confirmationNumber) {
        this(form);
        this.confirmationNumber = confirmationNumber;
    }

    /**
     * Getter accessor for attribute 'hotelId'.
     *
     * @return
     *       current value of 'hotelId'
     */
    public String getHotelId() {
        return hotelId;
    }

    /**
     * Setter accessor for attribute 'hotelId'.
     * @param hotelId
     *      new value for 'hotelId '
     */
    public void setHotelId(String hotelId) {
        this.hotelId = hotelId;
    }

    /**
     * Getter accessor for attribute 'startDate'.
     *
     * @return
     *       current value of 'startDate'
     */
    public LocalDate getStartDate() {
        return startDate;
    }

    /**
     * Setter accessor for attribute 'startDate'.
     * @param startDate
     *      new value for 'startDate '
     */
    public void setStartDate(LocalDate startDate) {
        this.startDate = startDate;
    }

    /**
     * Getter accessor for attribute 'endDate'.
     *
     * @return
     *       current value of 'endDate'
     */
    public LocalDate getEndDate() {
        return endDate;
    }

    /**
     * Setter accessor for attribute 'endDate'.
     * @param endDate
     *      new value for 'endDate '
     */
    public void setEndDate(LocalDate endDate) {
        this.endDate = endDate;
    }

    /**
     * Getter accessor for attribute 'roomNumber'.
     *
     * @return
     *       current value of 'roomNumber'
     */
    public short getRoomNumber() {
        return roomNumber;
    }

    /**
     * Setter accessor for attribute 'roomNumber'.
     * @param roomNumber
     *      new value for 'roomNumber '
     */
    public void setRoomNumber(short roomNumber) {
        this.roomNumber = roomNumber;
    }

    /**
     * Getter accessor for attribute 'guestId'.
     *
     * @return
     *       current value of 'guestId'
     */
    public UUID getGuestId() {
        return guestId;
    }

    /**
     * Setter accessor for attribute 'guestId'.
     * @param guestId
     *      new value for 'guestId '
     */
    public void setGuestId(UUID guestId) {
        this.guestId = guestId;
    }

    /**
     * Getter accessor for attribute 'confirmationNumber'.
     *
     * @return
     *       current value of 'confirmationNumber'
     */
    public String getConfirmationNumber() {
        return confirmationNumber;
    }

    /**
     * Setter accessor for attribute 'confirmationNumber'.
     * @param confirmationNumber
     * 		new value for 'confirmationNumber '
     */
    public void setConfirmationNumber(String confirmationNumber) {
        this.confirmationNumber = confirmationNumber;
    }
    
    /** {@inheritDoc} */
    @Override
    public String toString() {
        return "Confirmation Number = " + confirmationNumber +
                ", Hotel ID: " + getHotelId() +
                ", Start Date = " + getStartDate() +
                ", End Date = " + getEndDate() +
                ", Room Number = " + getRoomNumber() +
                ", Guest ID = " + getGuestId();
    }
}

Логика хранения данных сервиса, где мы будем выполнять большую часть нашей работы reservation-service/src/main/java/dev/cassandraguide/repository/ReservationRepository.java:

ReservationRepository.java
/*
 * Copyright (C) 2017-2020 Jeff Carpenter
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package dev.cassandraguide.repository;

import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.bindMarker;
import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.deleteFrom;
import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.selectFrom;
import static com.datastax.oss.driver.api.querybuilder.SchemaBuilder.createTable;
import static com.datastax.oss.driver.api.querybuilder.SchemaBuilder.createType;
import static com.datastax.oss.driver.api.querybuilder.relation.Relation.column;

import java.time.LocalDate;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;

import javax.annotation.PreDestroy;

import com.fasterxml.jackson.core.JsonProcessingException;
import dev.cassandraguide.model.Reservation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Profile;
import org.springframework.lang.NonNull;
import org.springframework.stereotype.Repository;

import com.datastax.oss.driver.api.core.CqlIdentifier;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.BatchStatement;
import com.datastax.oss.driver.api.core.cql.BoundStatement;
import com.datastax.oss.driver.api.core.cql.DefaultBatchType;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.datastax.oss.driver.api.core.cql.Row;
import com.datastax.oss.driver.api.core.metadata.schema.ClusteringOrder;
import com.datastax.oss.driver.api.core.type.DataTypes;
import com.datastax.oss.driver.api.core.type.UserDefinedType;
import com.datastax.oss.driver.api.querybuilder.QueryBuilder;

// TODO: Review imports for publishing to Kafka
import org.apache.kafka.clients.producer.*;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;

/**
 * The goal of this project is to provide a minimally functional implementation of a microservice 
 * that uses Apache Cassandra for its data storage. The reservation service is implemented as a 
 * RESTful service using Spring Boot.
 *
 * @author Jeff Carpenter, Cedrick Lunven
 */
@Repository
@Profile("!unit-test") // When I do some 'unit-test' no connectivity to DB
public class ReservationRepository {

    /** Logger for the class. */
    private static final Logger logger = LoggerFactory.getLogger(ReservationRepository.class);
    
    // Reservation Schema Constants
    public static final CqlIdentifier TYPE_ADDRESS               = CqlIdentifier.fromCql("address");
    public static final CqlIdentifier TABLE_RESERVATION_BY_HOTEL_DATE =
            CqlIdentifier.fromCql("reservations_by_hotel_date");
    public static final CqlIdentifier TABLE_RESERVATION_BY_CONFI = CqlIdentifier.fromCql("reservations_by_confirmation");
    public static final CqlIdentifier TABLE_RESERVATION_BY_GUEST = CqlIdentifier.fromCql("reservations_by_guest");
    public static final CqlIdentifier TABLE_GUESTS               = CqlIdentifier.fromCql("guests");
    public static final CqlIdentifier STREET                     = CqlIdentifier.fromCql("street");
    public static final CqlIdentifier CITY                       = CqlIdentifier.fromCql("city");
    public static final CqlIdentifier STATE_PROVINCE             = CqlIdentifier.fromCql("state_or_province");
    public static final CqlIdentifier POSTAL_CODE                = CqlIdentifier.fromCql("postal_code");
    public static final CqlIdentifier COUNTRY                    = CqlIdentifier.fromCql("country");
    public static final CqlIdentifier HOTEL_ID                   = CqlIdentifier.fromCql("hotel_id");
    public static final CqlIdentifier START_DATE                 = CqlIdentifier.fromCql("start_date");
    public static final CqlIdentifier END_DATE                   = CqlIdentifier.fromCql("end_date");
    public static final CqlIdentifier ROOM_NUMBER                = CqlIdentifier.fromCql("room_number");
    public static final CqlIdentifier CONFIRM_NUMBER             = CqlIdentifier.fromCql("confirm_number");
    public static final CqlIdentifier GUEST_ID                   = CqlIdentifier.fromCql("guest_id");
    public static final CqlIdentifier GUEST_LAST_NAME            = CqlIdentifier.fromCql("guest_last_name");
    public static final CqlIdentifier FIRSTNAME                  = CqlIdentifier.fromCql("first_name");
    public static final CqlIdentifier LASTNAME                   = CqlIdentifier.fromCql("last_name");
    public static final CqlIdentifier TITLE                      = CqlIdentifier.fromCql("title");
    public static final CqlIdentifier EMAILS                     = CqlIdentifier.fromCql("emails");
    public static final CqlIdentifier PHONE_NUMBERS              = CqlIdentifier.fromCql("phone_numbers");
    public static final CqlIdentifier ADDRESSES                  = CqlIdentifier.fromCql("addresses");
    
    private PreparedStatement psExistReservation;
    private PreparedStatement psFindReservation;
    private PreparedStatement psInsertReservationByHotelDate;
    private PreparedStatement psInsertReservationByConfirmation;
    private PreparedStatement psDeleteReservationByHotelDate;
    private PreparedStatement psDeleteReservationByConfirmation;
    private PreparedStatement psSearchReservation;
    
    /** CqlSession holding metadata to interact with Cassandra. */
    private CqlSession     cqlSession;
    private CqlIdentifier  keyspaceName;

    // TODO: Review variables used for publishing to Kafka
    /** KafkaProducer for publishing messages to Kafka. */
    private KafkaProducer<String, String> kafkaProducer;
    private String kafkaTopicName;
    private ObjectMapper objectMapper;
    
    /** External Initialization. */
    public ReservationRepository(
            @NonNull CqlSession cqlSession, 
            @Qualifier("keyspace") @NonNull CqlIdentifier keyspaceName,
            @NonNull KafkaProducer<String, String> kafkaProducer,
            @NonNull String kafkaTopicName) {
        this.cqlSession   = cqlSession;
        this.keyspaceName = keyspaceName;

        // TODO: Review initialization of objects needed for publishing to Kafka
        this.kafkaProducer = kafkaProducer;
        this.kafkaTopicName = kafkaTopicName;

        objectMapper = new ObjectMapper();
        objectMapper.registerModule(new JavaTimeModule());
        objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
        objectMapper.enable(SerializationFeature.INDENT_OUTPUT);

        // Will create tables (if they do not exist)
        createReservationTables();
        
        // Prepare Statements of reservation
        prepareStatements();
        logger.info("Application initialized.");
    }
    
    /**
     * CqlSession is a stateful object handling TCP connections to nodes in the cluster.
     * This operation properly closes sockets when the application is stopped
     */
    @PreDestroy
    public void cleanup() {
        if (null != cqlSession) {
            cqlSession.close();
            logger.info("+ CqlSession has been successfully closed");
        }
    }
    
    /**
     * Testing existence is useful for building correct semantics in the RESTful API. To evaluate existence find the
     * table where confirmation number is partition key which is reservations_by_confirmation
     * 
     * @param confirmationNumber
     *      unique identifier for confirmation
     * @return
     *      true if the reservation exists, false if it does not
     */
    public boolean exists(String confirmationNumber) {
        return cqlSession.execute(psExistReservation.bind(confirmationNumber))
                         .getAvailableWithoutFetching() > 0;
    }
    
    /**
     * Similar to exists() but maps and parses results.
     * 
     * @param confirmationNumber
     *      unique identifier for confirmation
     * @return
     *      reservation if present or empty
     */
    @NonNull
    public Optional<Reservation> findByConfirmationNumber(@NonNull String confirmationNumber) {
        
        ResultSet resultSet = cqlSession.execute(psFindReservation.bind(confirmationNumber));
        
        // Hint: an empty result might not be an error as this method is sometimes used to check whether a
        // reservation with this confirmation number exists
        Row row = resultSet.one();
        if (row == null) {
            logger.debug("Unable to load reservation with confirmation number: " + confirmationNumber);
            return Optional.empty();
        }
        
        // Hint: If there is a result, create a new reservation object and set the values
        // Bonus: factor the logic to extract a reservation from a row into a separate method
        // (you will reuse it again later in getAllReservations())
        return Optional.of(mapRowToReservation(row));
    }
    
    /**
     * Create new entry in multiple tables for this reservation.
     *
     * @param reservation
     *      current reservation object
     * @return
     *      confirmation number for the reservation
     *      
     */
     public String upsert(Reservation reservation) {
        Objects.requireNonNull(reservation);
        if (null == reservation.getConfirmationNumber()) {
            // Generating a new reservation number if none has been provided
            reservation.setConfirmationNumber(UUID.randomUUID().toString());
        }
        // Insert into 'reservations_by_hotel_date'
        BoundStatement bsInsertReservationByHotel = 
                psInsertReservationByHotelDate.bind(reservation.getHotelId(), reservation.getStartDate(),
                        reservation.getEndDate(), reservation.getRoomNumber(), reservation.getConfirmationNumber(),
                        reservation.getGuestId());
        // Insert into 'reservations_by_confirmation'
        BoundStatement bsInsertReservationByConfirmation = 
                psInsertReservationByConfirmation.bind(reservation.getConfirmationNumber(), reservation.getHotelId(),
                        reservation.getStartDate(), reservation.getEndDate(), reservation.getRoomNumber(),
                        reservation.getGuestId());
        BatchStatement batchInsertReservation = BatchStatement
                    .builder(DefaultBatchType.LOGGED)
                    .addStatement(bsInsertReservationByHotel)
                    .addStatement(bsInsertReservationByConfirmation)
                    .build();
        cqlSession.execute(batchInsertReservation);

        // TODO: Publish message to Kafka containing reservation
         try {
             String reservationJson = objectMapper.writeValueAsString(reservation);
             // HINT: use the constructor ProducerRecord(String topic, K key, V value)
             // with the reservation confirmation number as the key, and the JSON string as the value
             ProducerRecord<String, String> record = null;
             kafkaProducer.send(record);
         } catch (Exception e) {
             logger.warn("Error publishing reservation message to Kafka: {}", e);
         }

        return reservation.getConfirmationNumber();
    }

    /**
     * We pick 'reservations_by_confirmation' table to list reservations
     * BUT we could have used 'reservations_by_hotel_date' (as no key provided in request)
     *  
     * @return
     *      list containing all reservations
     */
    public List<Reservation> findAll() {
        return cqlSession.execute(selectFrom(keyspaceName, TABLE_RESERVATION_BY_CONFI).all().build())
                  .all()                          // no paging we retrieve all objects
                  .stream()                       // because we are good people
                  .map(this::mapRowToReservation) // Mapping row as Reservation
                  .collect(Collectors.toList());  // Back to list objects
    }
      
    /**
     * Deleting a reservation.
     *
     * @param confirmationNumber
     *      unique identifier for confirmation.
     */
    public boolean delete(String confirmationNumber) {

        // Retrieving entire reservation in order to obtain the attributes we will need to delete from
        // reservations_by_hotel_date table
        Optional<Reservation> reservationToDelete = this.findByConfirmationNumber(confirmationNumber);

        if (reservationToDelete.isPresent()) {

            // Delete from 'reservations_by_hotel_date'
            Reservation reservation = reservationToDelete.get();
            BoundStatement bsDeleteReservationByHotelDate =
                    psDeleteReservationByHotelDate.bind(reservation.getHotelId(),
                            reservation.getStartDate(), reservation.getRoomNumber());

            // Delete from 'reservations_by_confirmation'
            BoundStatement bsDeleteReservationByConfirmation =
                    psDeleteReservationByConfirmation.bind(confirmationNumber);

            BatchStatement batchDeleteReservation = BatchStatement
                    .builder(DefaultBatchType.LOGGED)
                    .addStatement(bsDeleteReservationByHotelDate)
                    .addStatement(bsDeleteReservationByConfirmation)
                    .build();
            cqlSession.execute(batchDeleteReservation);

            // TODO: Publish message to Kafka with empty payload to indicate deletion
            // HINT: use the constructor ProducerRecord(String topic, K key, V value)
            // with the reservation confirmation number as the key, and an empty string as the value
            ProducerRecord<String, String> record = null;
            kafkaProducer.send(record);

            return true;
        }
        return false;
    }
    
    /**
     * Search all reservation for an hotel id and LocalDate.
     *
     * @param hotelId
     *      hotel identifier
     * @param date
     *      searched Date
     * @return
     *      list of reservations matching the search criteria
     */
    public List<Reservation> findByHotelAndDate(String hotelId, LocalDate date) {
        Objects.requireNonNull(hotelId);
        Objects.requireNonNull(date);
        return cqlSession.execute(psSearchReservation.bind(hotelId, date))
                         .all()                          // no paging we retrieve all objects
                         .stream()                       // because we are good people
                         .map(this::mapRowToReservation) // Mapping row as Reservation
                         .collect(Collectors.toList());  // Back to list objects
    }

    /**
     * Utility method to marshal a row as expected Reservation Bean.
     *
     * @param row
     *      current row from ResultSet
     * @return
     *      object
     */
    private Reservation mapRowToReservation(Row row) {
        Reservation reservation = new Reservation();
        reservation.setHotelId(row.getString(HOTEL_ID));
        reservation.setConfirmationNumber(row.getString(CONFIRM_NUMBER));
        reservation.setGuestId(row.getUuid(GUEST_ID));
        reservation.setRoomNumber(row.getShort(ROOM_NUMBER));
        reservation.setStartDate(row.getLocalDate(START_DATE));
        reservation.setEndDate(row.getLocalDate(END_DATE));
        return reservation;
    }
    
    /**
     * Create Keyspace and relevant tables as per defined in 'reservation.cql'
     */
    public void createReservationTables() {
        
        /**
         * Create TYPE 'Address' if not exists
         * 
         * CREATE TYPE reservation.address (
         *   street text,
         *   city text,
         *   state_or_province text,
         *   postal_code text,
         *   country text
         * );
         */
        cqlSession.execute(
                createType(keyspaceName, TYPE_ADDRESS)
                .ifNotExists()
                .withField(STREET, DataTypes.TEXT)
                .withField(CITY, DataTypes.TEXT)
                .withField(STATE_PROVINCE, DataTypes.TEXT)
                .withField(POSTAL_CODE, DataTypes.TEXT)
                .withField(COUNTRY, DataTypes.TEXT)
                .build());
        logger.debug("+ Type '{}' has been created (if needed)", TYPE_ADDRESS.asInternal());
        
        /** 
         * CREATE TABLE reservation.reservations_by_hotel_date (
         *  hotel_id text,
         *  start_date date,
         *  end_date date,
         *  room_number smallint,
         *  confirm_number text,
         *  guest_id uuid,
         *  PRIMARY KEY ((hotel_id, start_date), room_number)
         * );
         */
        cqlSession.execute(createTable(keyspaceName, TABLE_RESERVATION_BY_HOTEL_DATE)
                        .ifNotExists()
                        .withPartitionKey(HOTEL_ID, DataTypes.TEXT)
                        .withPartitionKey(START_DATE, DataTypes.DATE)
                        .withClusteringColumn(ROOM_NUMBER, DataTypes.SMALLINT)
                        .withColumn(END_DATE, DataTypes.DATE)
                        .withColumn(CONFIRM_NUMBER, DataTypes.TEXT)
                        .withColumn(GUEST_ID, DataTypes.UUID)
                        .withClusteringOrder(ROOM_NUMBER, ClusteringOrder.ASC)
                        .withComment("Q7. Find reservations by hotel and date")
                        .build());
        logger.debug("+ Table '{}' has been created (if needed)", TABLE_RESERVATION_BY_HOTEL_DATE.asInternal());
        
        /**
         * CREATE TABLE reservation.reservations_by_confirmation (
         *   confirm_number text PRIMARY KEY,
         *   hotel_id text,
         *   start_date date,
         *   end_date date,
         *   room_number smallint,
         *   guest_id uuid
         * );
         */
        cqlSession.execute(createTable(keyspaceName, TABLE_RESERVATION_BY_CONFI)
                .ifNotExists()
                .withPartitionKey(CONFIRM_NUMBER, DataTypes.TEXT)
                .withColumn(HOTEL_ID, DataTypes.TEXT)
                .withColumn(START_DATE, DataTypes.DATE)
                .withColumn(END_DATE, DataTypes.DATE)
                .withColumn(ROOM_NUMBER, DataTypes.SMALLINT)
                .withColumn(GUEST_ID, DataTypes.UUID)
                .build());
         logger.debug("+ Table '{}' has been created (if needed)", TABLE_RESERVATION_BY_CONFI.asInternal());
         
         /**
          * CREATE TABLE reservation.reservations_by_guest (
          *  guest_last_name text,
          *  hotel_id text,
          *  start_date date,
          *  end_date date,
          *  room_number smallint,
          *  confirm_number text,
          *  guest_id uuid,
          *  PRIMARY KEY ((guest_last_name), hotel_id)
          * );
          */
         cqlSession.execute(createTable(keyspaceName, TABLE_RESERVATION_BY_GUEST)
                 .ifNotExists()
                 .withPartitionKey(GUEST_LAST_NAME, DataTypes.TEXT)
                 .withClusteringColumn(HOTEL_ID, DataTypes.TEXT)
                 .withColumn(START_DATE, DataTypes.DATE)
                 .withColumn(END_DATE, DataTypes.DATE)
                 .withColumn(ROOM_NUMBER, DataTypes.SMALLINT)
                 .withColumn(CONFIRM_NUMBER, DataTypes.TEXT)
                 .withColumn(GUEST_ID, DataTypes.UUID)
                 .withComment("Q8. Find reservations by guest name")
                 .build());
          logger.debug("+ Table '{}' has been created (if needed)", TABLE_RESERVATION_BY_GUEST.asInternal());
          
          /**
           * CREATE TABLE reservation.guests (
           *   guest_id uuid PRIMARY KEY,
           *   first_name text,
           *   last_name text,
           *   title text,
           *   emails set<text>,
           *   phone_numbers list<text>,
           *   addresses map<text, frozen<address>>,
           *   confirm_number text
           * );
           */
          UserDefinedType  udtAddressType = 
                  cqlSession.getMetadata().getKeyspace(keyspaceName).get() // Retrieving KeySpaceMetadata
                            .getUserDefinedType(TYPE_ADDRESS).get();        // Looking for UDT (extending DataType)
          cqlSession.execute(createTable(keyspaceName, TABLE_GUESTS)
                  .ifNotExists()
                  .withPartitionKey(GUEST_ID, DataTypes.UUID)
                  .withColumn(FIRSTNAME, DataTypes.TEXT)
                  .withColumn(LASTNAME, DataTypes.TEXT)
                  .withColumn(TITLE, DataTypes.TEXT)
                  .withColumn(EMAILS, DataTypes.setOf(DataTypes.TEXT))
                  .withColumn(PHONE_NUMBERS, DataTypes.listOf(DataTypes.TEXT))
                  .withColumn(ADDRESSES, DataTypes.mapOf(DataTypes.TEXT, udtAddressType, true))
                  .withColumn(CONFIRM_NUMBER, DataTypes.TEXT)
                  .withComment("Q9. Find guest by ID")
                  .build());
           logger.debug("+ Table '{}' has been created (if needed)", TABLE_GUESTS.asInternal());
           logger.info("Schema has been successfully initialized.");
    }

    private void prepareStatements() {
        if (psExistReservation == null) {
            psExistReservation = cqlSession.prepare(selectFrom(keyspaceName, TABLE_RESERVATION_BY_CONFI).column(CONFIRM_NUMBER)
                                .where(column(CONFIRM_NUMBER).isEqualTo(bindMarker(CONFIRM_NUMBER)))
                                .build());
            psFindReservation = cqlSession.prepare(
                                selectFrom(keyspaceName, TABLE_RESERVATION_BY_CONFI).all()
                                .where(column(CONFIRM_NUMBER).isEqualTo(bindMarker(CONFIRM_NUMBER)))
                                .build());
            psSearchReservation = cqlSession.prepare(
                                selectFrom(keyspaceName, TABLE_RESERVATION_BY_HOTEL_DATE).all()
                                .where(column(HOTEL_ID).isEqualTo(bindMarker(HOTEL_ID)))
                                .where(column(START_DATE).isEqualTo(bindMarker(START_DATE)))
                                .build());
            psDeleteReservationByConfirmation = cqlSession.prepare(
                                deleteFrom(keyspaceName, TABLE_RESERVATION_BY_CONFI)
                                .where(column(CONFIRM_NUMBER).isEqualTo(bindMarker(CONFIRM_NUMBER)))
                                .build());
            psDeleteReservationByHotelDate = cqlSession.prepare(
                    deleteFrom(keyspaceName, TABLE_RESERVATION_BY_HOTEL_DATE)
                    .where(column(HOTEL_ID).isEqualTo(bindMarker(HOTEL_ID)))
                    .where(column(START_DATE).isEqualTo(bindMarker(START_DATE)))
                    .where(column(ROOM_NUMBER).isEqualTo(bindMarker(ROOM_NUMBER)))
                    .build());
            psInsertReservationByHotelDate = cqlSession.prepare(QueryBuilder.insertInto(keyspaceName, TABLE_RESERVATION_BY_HOTEL_DATE)
                    .value(HOTEL_ID, bindMarker(HOTEL_ID))
                    .value(START_DATE, bindMarker(START_DATE))
                    .value(END_DATE, bindMarker(END_DATE))
                    .value(ROOM_NUMBER, bindMarker(ROOM_NUMBER))
                    .value(CONFIRM_NUMBER, bindMarker(CONFIRM_NUMBER))
                    .value(GUEST_ID, bindMarker(GUEST_ID))
                    .build());
            psInsertReservationByConfirmation = cqlSession.prepare(QueryBuilder.insertInto(keyspaceName, TABLE_RESERVATION_BY_CONFI)
                    .value(CONFIRM_NUMBER, bindMarker(CONFIRM_NUMBER))
                    .value(HOTEL_ID, bindMarker(HOTEL_ID))
                    .value(START_DATE, bindMarker(START_DATE))
                    .value(END_DATE, bindMarker(END_DATE))
                    .value(ROOM_NUMBER, bindMarker(ROOM_NUMBER))
                    .value(GUEST_ID, bindMarker(GUEST_ID))
                    .build());
            logger.info("Statements have been successfully prepared.");
        }
    }
}

Код, который создает подключение к Kafka reservation-service/src/main/java/dev/cassandraguide/conf/KafkaConfiguration.java:

package dev.cassandraguide.conf;

import dev.cassandraguide.repository.ReservationRepository;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;

/**
 * Import Configuration from Configuration File
 *
 * @author Jeff Carpenter
 */
@Configuration
public class KafkaConfiguration {

    // Logger
    private static final Logger logger = LoggerFactory.getLogger(ReservationRepository.class);

    // Bootstrap servers
    @Value("${kafka.bootstrap-servers:localhost:9092}")
    protected String bootstrapServers;

    // Kafka Client ID
    @Value("${kafka.client-id:ReservationService}")
    protected String clientId = "ReservationService";

    // Topic Name
    @Value("${kafka.topicName:reservation}")
    public String topicName = "reservation";

    /**
     * Default configuration.
     */
    public KafkaConfiguration() {}

    /**
     * Initialization of Configuration.
     *
     * @param bootstrapServers
     * @param clientId
     * @param topicName
     */
    public KafkaConfiguration(
            String bootstrapServers, String clientId, String topicName) {
        super();
        this.bootstrapServers = bootstrapServers;
        this.clientId = clientId;
        this.topicName = topicName;
    }

    @Bean
    public KafkaProducer<String, String> kafkaProducer() {
        logger.info("Creating Kafka Producer.");

        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        return new KafkaProducer<>(props);
    }

    /**
     * Getter accessor for attribute 'bootstrapServers'.
     *
     * @return
     *       current value of 'bootstrapServers'
     */
    public String getBootstrapServers() {
        return bootstrapServers;
    }

    /**
     * Setter accessor for attribute 'bootstrapServers'.
     * @param bootstrapServers
     * 		new value for 'bootstrapServers'
     */
    public void setBootstrapServers(String bootstrapServers) {
        this.bootstrapServers = bootstrapServers;
    }

    /**
     * Getter accessor for attribute 'clientId'.
     *
     * @return
     *       current value of 'clientId'
     */
    public String getClientId() {
        return clientId;
    }

    /**
     * Setter accessor for attribute 'clientId'.
     * @param clientId
     * 		new value for 'clientId '
     */
    public void setClientId(String clientId) {
        this.clientId = clientId;
    }

    /**
     * Getter accessor for attribute 'topicName'.
     *
     * @return
     *       current value of 'topicName'
     */
    public String getTopicName() {
        return topicName;
    }

    /**
     * Setter accessor for attribute 'topicName'.
     * @param topicName
     * 		new value for 'topicName '
     */
    public void setTopicName(String topicName) {
        this.topicName = topicName;
    }

}

Обратите внимание, что служба резервирования использует файл конфигурации для хранения информации о конфигурации Cassandra и Kafka, которую мы можем просмотреть здесь reservation-service/src/main/resources/application.yml:

# ----------------------------------------------------------
# Spring Boot Config
# ----------------------------------------------------------
spring:
  application:
    name: Reservation Reservicess
  jackson:
    serialization:
      WRITE_DATES_AS_TIMESTAMPS: false
server:
  port: 8080

# ----------------------------------------------------------
# DataStax Enterprise Java Driver Config
# ----------------------------------------------------------
cassandra:
  contactPoint: 127.0.0.1
  port: 9042
  keyspaceName: reservation
  localDataCenterName: datacenter1
  dropSchema: false

# ----------------------------------------------------------
# Kafka Producer Config
# ----------------------------------------------------------
kafka:
  bootstrap-servers: localhost:9092
  client-id: ReservationService
  topicName: reservation

Таблицы Cassandra для хранения бронирований

В дистрибутив Cassandra входит cqlsh, оболочка для выдачи команд на CQL. Запустим оболочку:

apache-cassandra-4.0-alpha4/bin/cqlsh

Мы должны увидеть приглашение cqlsh> в терминале.

Как только подсказка будет доступна, мы можем загрузить файл схемы, который мы просмотрели ранее, чтобы создать пространство ключей, содержащее таблицы для резервирования:

SOURCE 'reservation-service/src/main/resources/reservation.cql';

Теперь мы можем просмотреть только что созданную схему с помощью команды DESCRIBE:

DESCRIBE KEYSPACE reservation;

Давайте установим это как кейспейс для будущих команд:

USE reservation;

Обратите внимание, что существуют три разные таблицы для хранения данных резервирования с похожими столбцами, но с разными первичными ключами.

Наша естественная тенденция как специалистов по моделированию данных заключалась бы в том, чтобы сначала сосредоточиться на разработке таблиц для хранения бронирований и других записей, таких как профили гостей, и только потом начинать думать о запросах, которые будут получать к ним доступ. Но в моделировании данных Cassandra мы начинаем с наших запросов. Эти таблицы предназначены для поддержки запросов, которые определяют, как ваши пользователи будут получать доступ к бронированиям, что приводит к денормализованному дизайну.

Во-первых, таблица reservations_by_confirmation поддерживает поиск бронирований по уникальному номеру подтверждения, предоставленному клиенту во время бронирования:

CREATE TABLE reservations_by_confirmation 
( 
confirm_number text, 
hotel_id text, 
start_date date, 
end_date date, 
room_number smallint, 
guest_id uuid, 
PRIMARY KEY (confirm_number) )

Вот пример загрузки строки данных в эту таблицу:

INSERT INTO reservations_by_confirmation (confirm_number, hotel_id, start_date, end_date, room_number, guest_id) VALUES ('RS2G0Z', 'NY456', '2020-06-08', '2020-06-10', 111, 1b4d86f4-ccff-4256-a63d-45c905df2677);

Чтобы получить эту строку позже, мы использовали бы такой запрос:

SELECT * FROM reservations_by_confirmation WHERE confirm_number = 'RS2G0Z';

Во-вторых, таблица reservations_by_hotel_date позволяет персоналу отеля просматривать записи предстоящих бронирований по датам, чтобы получить представление о том, как работает отель, например, когда отель был заполнен или не заполнен:

CREATE TABLE reservations_by_hotel_date ( hotel_id text, start_date date, end_date date, room_number smallint, confirm_number text, guest_id uuid, PRIMARY KEY ((hotel_id, start_date), room_number) );

Вот пример загрузки той же брони, которую мы только что загрузили в reservations_by_confirmation, в эту дополнительную таблицу:

INSERT INTO reservations_by_hotel_date (confirm_number, hotel_id, start_date, end_date, room_number, guest_id) VALUES ('RS2G0Z', 'NY456', '2020-06-08', '2020-06-10', 111, 1b4d86f4-ccff-4256-a63d-45c905df2677);

Эта таблица может поддерживать два запроса. Во-первых, поскольку ключ раздела содержит hotel_id и start_date, мы можем получить все бронирования для определенного отеля и даты:

SELECT * FROM reservations_by_hotel_date WHERE hotel_id = 'NY456' AND start_date = '2020-06-08';

Также мы можем найти бронь на конкретный номер по дате:

SELECT * FROM reservations_by_hotel_date WHERE hotel_id = 'NY456' AND start_date = '2020-06-08' AND room_number = 111;

Точно так же таблицу reservations_by_guest можно использовать для поиска бронирования по имени гостя для гостя, который забыл свой номер подтверждения. Для целей мы сосредоточимся на таблицах reservations_by_confirmation и reservations_by_hotel_date.

Мы заметим, что класс ReservationRepository использует инструкции Cassandra BATCH для группировки изменений в таблицах reservations_by_confirmation и reservations_by_hotel_date, чтобы они выполнялись как одна операция CQL reservation-service/src/main/java/dev/cassandraguide/repository/ReservationRepository.java:

ReservationRepository.java
/*
 * Copyright (C) 2017-2020 Jeff Carpenter
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package dev.cassandraguide.repository;

import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.bindMarker;
import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.deleteFrom;
import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.selectFrom;
import static com.datastax.oss.driver.api.querybuilder.SchemaBuilder.createTable;
import static com.datastax.oss.driver.api.querybuilder.SchemaBuilder.createType;
import static com.datastax.oss.driver.api.querybuilder.relation.Relation.column;

import java.time.LocalDate;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;

import javax.annotation.PreDestroy;

import com.fasterxml.jackson.core.JsonProcessingException;
import dev.cassandraguide.model.Reservation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Profile;
import org.springframework.lang.NonNull;
import org.springframework.stereotype.Repository;

import com.datastax.oss.driver.api.core.CqlIdentifier;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.BatchStatement;
import com.datastax.oss.driver.api.core.cql.BoundStatement;
import com.datastax.oss.driver.api.core.cql.DefaultBatchType;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.datastax.oss.driver.api.core.cql.Row;
import com.datastax.oss.driver.api.core.metadata.schema.ClusteringOrder;
import com.datastax.oss.driver.api.core.type.DataTypes;
import com.datastax.oss.driver.api.core.type.UserDefinedType;
import com.datastax.oss.driver.api.querybuilder.QueryBuilder;

// TODO: Review imports for publishing to Kafka
import org.apache.kafka.clients.producer.*;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;

/**
 * The goal of this project is to provide a minimally functional implementation of a microservice 
 * that uses Apache Cassandra for its data storage. The reservation service is implemented as a 
 * RESTful service using Spring Boot.
 *
 * @author Jeff Carpenter, Cedrick Lunven
 */
@Repository
@Profile("!unit-test") // When I do some 'unit-test' no connectivity to DB
public class ReservationRepository {

    /** Logger for the class. */
    private static final Logger logger = LoggerFactory.getLogger(ReservationRepository.class);
    
    // Reservation Schema Constants
    public static final CqlIdentifier TYPE_ADDRESS               = CqlIdentifier.fromCql("address");
    public static final CqlIdentifier TABLE_RESERVATION_BY_HOTEL_DATE =
            CqlIdentifier.fromCql("reservations_by_hotel_date");
    public static final CqlIdentifier TABLE_RESERVATION_BY_CONFI = CqlIdentifier.fromCql("reservations_by_confirmation");
    public static final CqlIdentifier TABLE_RESERVATION_BY_GUEST = CqlIdentifier.fromCql("reservations_by_guest");
    public static final CqlIdentifier TABLE_GUESTS               = CqlIdentifier.fromCql("guests");
    public static final CqlIdentifier STREET                     = CqlIdentifier.fromCql("street");
    public static final CqlIdentifier CITY                       = CqlIdentifier.fromCql("city");
    public static final CqlIdentifier STATE_PROVINCE             = CqlIdentifier.fromCql("state_or_province");
    public static final CqlIdentifier POSTAL_CODE                = CqlIdentifier.fromCql("postal_code");
    public static final CqlIdentifier COUNTRY                    = CqlIdentifier.fromCql("country");
    public static final CqlIdentifier HOTEL_ID                   = CqlIdentifier.fromCql("hotel_id");
    public static final CqlIdentifier START_DATE                 = CqlIdentifier.fromCql("start_date");
    public static final CqlIdentifier END_DATE                   = CqlIdentifier.fromCql("end_date");
    public static final CqlIdentifier ROOM_NUMBER                = CqlIdentifier.fromCql("room_number");
    public static final CqlIdentifier CONFIRM_NUMBER             = CqlIdentifier.fromCql("confirm_number");
    public static final CqlIdentifier GUEST_ID                   = CqlIdentifier.fromCql("guest_id");
    public static final CqlIdentifier GUEST_LAST_NAME            = CqlIdentifier.fromCql("guest_last_name");
    public static final CqlIdentifier FIRSTNAME                  = CqlIdentifier.fromCql("first_name");
    public static final CqlIdentifier LASTNAME                   = CqlIdentifier.fromCql("last_name");
    public static final CqlIdentifier TITLE                      = CqlIdentifier.fromCql("title");
    public static final CqlIdentifier EMAILS                     = CqlIdentifier.fromCql("emails");
    public static final CqlIdentifier PHONE_NUMBERS              = CqlIdentifier.fromCql("phone_numbers");
    public static final CqlIdentifier ADDRESSES                  = CqlIdentifier.fromCql("addresses");
    
    private PreparedStatement psExistReservation;
    private PreparedStatement psFindReservation;
    private PreparedStatement psInsertReservationByHotelDate;
    private PreparedStatement psInsertReservationByConfirmation;
    private PreparedStatement psDeleteReservationByHotelDate;
    private PreparedStatement psDeleteReservationByConfirmation;
    private PreparedStatement psSearchReservation;
    
    /** CqlSession holding metadata to interact with Cassandra. */
    private CqlSession     cqlSession;
    private CqlIdentifier  keyspaceName;

    // TODO: Review variables used for publishing to Kafka
    /** KafkaProducer for publishing messages to Kafka. */
    private KafkaProducer<String, String> kafkaProducer;
    private String kafkaTopicName;
    private ObjectMapper objectMapper;
    
    /** External Initialization. */
    public ReservationRepository(
            @NonNull CqlSession cqlSession, 
            @Qualifier("keyspace") @NonNull CqlIdentifier keyspaceName,
            @NonNull KafkaProducer<String, String> kafkaProducer,
            @NonNull String kafkaTopicName) {
        this.cqlSession   = cqlSession;
        this.keyspaceName = keyspaceName;

        // TODO: Review initialization of objects needed for publishing to Kafka
        this.kafkaProducer = kafkaProducer;
        this.kafkaTopicName = kafkaTopicName;

        objectMapper = new ObjectMapper();
        objectMapper.registerModule(new JavaTimeModule());
        objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
        objectMapper.enable(SerializationFeature.INDENT_OUTPUT);

        // Will create tables (if they do not exist)
        createReservationTables();
        
        // Prepare Statements of reservation
        prepareStatements();
        logger.info("Application initialized.");
    }
    
    /**
     * CqlSession is a stateful object handling TCP connections to nodes in the cluster.
     * This operation properly closes sockets when the application is stopped
     */
    @PreDestroy
    public void cleanup() {
        if (null != cqlSession) {
            cqlSession.close();
            logger.info("+ CqlSession has been successfully closed");
        }
    }
    
    /**
     * Testing existence is useful for building correct semantics in the RESTful API. To evaluate existence find the
     * table where confirmation number is partition key which is reservations_by_confirmation
     * 
     * @param confirmationNumber
     *      unique identifier for confirmation
     * @return
     *      true if the reservation exists, false if it does not
     */
    public boolean exists(String confirmationNumber) {
        return cqlSession.execute(psExistReservation.bind(confirmationNumber))
                         .getAvailableWithoutFetching() > 0;
    }
    
    /**
     * Similar to exists() but maps and parses results.
     * 
     * @param confirmationNumber
     *      unique identifier for confirmation
     * @return
     *      reservation if present or empty
     */
    @NonNull
    public Optional<Reservation> findByConfirmationNumber(@NonNull String confirmationNumber) {
        
        ResultSet resultSet = cqlSession.execute(psFindReservation.bind(confirmationNumber));
        
        // Hint: an empty result might not be an error as this method is sometimes used to check whether a
        // reservation with this confirmation number exists
        Row row = resultSet.one();
        if (row == null) {
            logger.debug("Unable to load reservation with confirmation number: " + confirmationNumber);
            return Optional.empty();
        }
        
        // Hint: If there is a result, create a new reservation object and set the values
        // Bonus: factor the logic to extract a reservation from a row into a separate method
        // (you will reuse it again later in getAllReservations())
        return Optional.of(mapRowToReservation(row));
    }
    
    /**
     * Create new entry in multiple tables for this reservation.
     *
     * @param reservation
     *      current reservation object
     * @return
     *      confirmation number for the reservation
     *      
     */
     public String upsert(Reservation reservation) {
        Objects.requireNonNull(reservation);
        if (null == reservation.getConfirmationNumber()) {
            // Generating a new reservation number if none has been provided
            reservation.setConfirmationNumber(UUID.randomUUID().toString());
        }
        // Insert into 'reservations_by_hotel_date'
        BoundStatement bsInsertReservationByHotel = 
                psInsertReservationByHotelDate.bind(reservation.getHotelId(), reservation.getStartDate(),
                        reservation.getEndDate(), reservation.getRoomNumber(), reservation.getConfirmationNumber(),
                        reservation.getGuestId());
        // Insert into 'reservations_by_confirmation'
        BoundStatement bsInsertReservationByConfirmation = 
                psInsertReservationByConfirmation.bind(reservation.getConfirmationNumber(), reservation.getHotelId(),
                        reservation.getStartDate(), reservation.getEndDate(), reservation.getRoomNumber(),
                        reservation.getGuestId());
        BatchStatement batchInsertReservation = BatchStatement
                    .builder(DefaultBatchType.LOGGED)
                    .addStatement(bsInsertReservationByHotel)
                    .addStatement(bsInsertReservationByConfirmation)
                    .build();
        cqlSession.execute(batchInsertReservation);

        // TODO: Publish message to Kafka containing reservation
         try {
             String reservationJson = objectMapper.writeValueAsString(reservation);
             // HINT: use the constructor ProducerRecord(String topic, K key, V value)
             // with the reservation confirmation number as the key, and the JSON string as the value
             ProducerRecord<String, String> record = null;
             kafkaProducer.send(record);
         } catch (Exception e) {
             logger.warn("Error publishing reservation message to Kafka: {}", e);
         }

        return reservation.getConfirmationNumber();
    }

    /**
     * We pick 'reservations_by_confirmation' table to list reservations
     * BUT we could have used 'reservations_by_hotel_date' (as no key provided in request)
     *  
     * @return
     *      list containing all reservations
     */
    public List<Reservation> findAll() {
        return cqlSession.execute(selectFrom(keyspaceName, TABLE_RESERVATION_BY_CONFI).all().build())
                  .all()                          // no paging we retrieve all objects
                  .stream()                       // because we are good people
                  .map(this::mapRowToReservation) // Mapping row as Reservation
                  .collect(Collectors.toList());  // Back to list objects
    }
      
    /**
     * Deleting a reservation.
     *
     * @param confirmationNumber
     *      unique identifier for confirmation.
     */
    public boolean delete(String confirmationNumber) {

        // Retrieving entire reservation in order to obtain the attributes we will need to delete from
        // reservations_by_hotel_date table
        Optional<Reservation> reservationToDelete = this.findByConfirmationNumber(confirmationNumber);

        if (reservationToDelete.isPresent()) {

            // Delete from 'reservations_by_hotel_date'
            Reservation reservation = reservationToDelete.get();
            BoundStatement bsDeleteReservationByHotelDate =
                    psDeleteReservationByHotelDate.bind(reservation.getHotelId(),
                            reservation.getStartDate(), reservation.getRoomNumber());

            // Delete from 'reservations_by_confirmation'
            BoundStatement bsDeleteReservationByConfirmation =
                    psDeleteReservationByConfirmation.bind(confirmationNumber);

            BatchStatement batchDeleteReservation = BatchStatement
                    .builder(DefaultBatchType.LOGGED)
                    .addStatement(bsDeleteReservationByHotelDate)
                    .addStatement(bsDeleteReservationByConfirmation)
                    .build();
            cqlSession.execute(batchDeleteReservation);

            // TODO: Publish message to Kafka with empty payload to indicate deletion
            // HINT: use the constructor ProducerRecord(String topic, K key, V value)
            // with the reservation confirmation number as the key, and an empty string as the value
            ProducerRecord<String, String> record = null;
            kafkaProducer.send(record);

            return true;
        }
        return false;
    }
    
    /**
     * Search all reservation for an hotel id and LocalDate.
     *
     * @param hotelId
     *      hotel identifier
     * @param date
     *      searched Date
     * @return
     *      list of reservations matching the search criteria
     */
    public List<Reservation> findByHotelAndDate(String hotelId, LocalDate date) {
        Objects.requireNonNull(hotelId);
        Objects.requireNonNull(date);
        return cqlSession.execute(psSearchReservation.bind(hotelId, date))
                         .all()                          // no paging we retrieve all objects
                         .stream()                       // because we are good people
                         .map(this::mapRowToReservation) // Mapping row as Reservation
                         .collect(Collectors.toList());  // Back to list objects
    }

    /**
     * Utility method to marshal a row as expected Reservation Bean.
     *
     * @param row
     *      current row from ResultSet
     * @return
     *      object
     */
    private Reservation mapRowToReservation(Row row) {
        Reservation reservation = new Reservation();
        reservation.setHotelId(row.getString(HOTEL_ID));
        reservation.setConfirmationNumber(row.getString(CONFIRM_NUMBER));
        reservation.setGuestId(row.getUuid(GUEST_ID));
        reservation.setRoomNumber(row.getShort(ROOM_NUMBER));
        reservation.setStartDate(row.getLocalDate(START_DATE));
        reservation.setEndDate(row.getLocalDate(END_DATE));
        return reservation;
    }
    
    /**
     * Create Keyspace and relevant tables as per defined in 'reservation.cql'
     */
    public void createReservationTables() {
        
        /**
         * Create TYPE 'Address' if not exists
         * 
         * CREATE TYPE reservation.address (
         *   street text,
         *   city text,
         *   state_or_province text,
         *   postal_code text,
         *   country text
         * );
         */
        cqlSession.execute(
                createType(keyspaceName, TYPE_ADDRESS)
                .ifNotExists()
                .withField(STREET, DataTypes.TEXT)
                .withField(CITY, DataTypes.TEXT)
                .withField(STATE_PROVINCE, DataTypes.TEXT)
                .withField(POSTAL_CODE, DataTypes.TEXT)
                .withField(COUNTRY, DataTypes.TEXT)
                .build());
        logger.debug("+ Type '{}' has been created (if needed)", TYPE_ADDRESS.asInternal());
        
        /** 
         * CREATE TABLE reservation.reservations_by_hotel_date (
         *  hotel_id text,
         *  start_date date,
         *  end_date date,
         *  room_number smallint,
         *  confirm_number text,
         *  guest_id uuid,
         *  PRIMARY KEY ((hotel_id, start_date), room_number)
         * );
         */
        cqlSession.execute(createTable(keyspaceName, TABLE_RESERVATION_BY_HOTEL_DATE)
                        .ifNotExists()
                        .withPartitionKey(HOTEL_ID, DataTypes.TEXT)
                        .withPartitionKey(START_DATE, DataTypes.DATE)
                        .withClusteringColumn(ROOM_NUMBER, DataTypes.SMALLINT)
                        .withColumn(END_DATE, DataTypes.DATE)
                        .withColumn(CONFIRM_NUMBER, DataTypes.TEXT)
                        .withColumn(GUEST_ID, DataTypes.UUID)
                        .withClusteringOrder(ROOM_NUMBER, ClusteringOrder.ASC)
                        .withComment("Q7. Find reservations by hotel and date")
                        .build());
        logger.debug("+ Table '{}' has been created (if needed)", TABLE_RESERVATION_BY_HOTEL_DATE.asInternal());
        
        /**
         * CREATE TABLE reservation.reservations_by_confirmation (
         *   confirm_number text PRIMARY KEY,
         *   hotel_id text,
         *   start_date date,
         *   end_date date,
         *   room_number smallint,
         *   guest_id uuid
         * );
         */
        cqlSession.execute(createTable(keyspaceName, TABLE_RESERVATION_BY_CONFI)
                .ifNotExists()
                .withPartitionKey(CONFIRM_NUMBER, DataTypes.TEXT)
                .withColumn(HOTEL_ID, DataTypes.TEXT)
                .withColumn(START_DATE, DataTypes.DATE)
                .withColumn(END_DATE, DataTypes.DATE)
                .withColumn(ROOM_NUMBER, DataTypes.SMALLINT)
                .withColumn(GUEST_ID, DataTypes.UUID)
                .build());
         logger.debug("+ Table '{}' has been created (if needed)", TABLE_RESERVATION_BY_CONFI.asInternal());
         
         /**
          * CREATE TABLE reservation.reservations_by_guest (
          *  guest_last_name text,
          *  hotel_id text,
          *  start_date date,
          *  end_date date,
          *  room_number smallint,
          *  confirm_number text,
          *  guest_id uuid,
          *  PRIMARY KEY ((guest_last_name), hotel_id)
          * );
          */
         cqlSession.execute(createTable(keyspaceName, TABLE_RESERVATION_BY_GUEST)
                 .ifNotExists()
                 .withPartitionKey(GUEST_LAST_NAME, DataTypes.TEXT)
                 .withClusteringColumn(HOTEL_ID, DataTypes.TEXT)
                 .withColumn(START_DATE, DataTypes.DATE)
                 .withColumn(END_DATE, DataTypes.DATE)
                 .withColumn(ROOM_NUMBER, DataTypes.SMALLINT)
                 .withColumn(CONFIRM_NUMBER, DataTypes.TEXT)
                 .withColumn(GUEST_ID, DataTypes.UUID)
                 .withComment("Q8. Find reservations by guest name")
                 .build());
          logger.debug("+ Table '{}' has been created (if needed)", TABLE_RESERVATION_BY_GUEST.asInternal());
          
          /**
           * CREATE TABLE reservation.guests (
           *   guest_id uuid PRIMARY KEY,
           *   first_name text,
           *   last_name text,
           *   title text,
           *   emails set<text>,
           *   phone_numbers list<text>,
           *   addresses map<text, frozen<address>>,
           *   confirm_number text
           * );
           */
          UserDefinedType  udtAddressType = 
                  cqlSession.getMetadata().getKeyspace(keyspaceName).get() // Retrieving KeySpaceMetadata
                            .getUserDefinedType(TYPE_ADDRESS).get();        // Looking for UDT (extending DataType)
          cqlSession.execute(createTable(keyspaceName, TABLE_GUESTS)
                  .ifNotExists()
                  .withPartitionKey(GUEST_ID, DataTypes.UUID)
                  .withColumn(FIRSTNAME, DataTypes.TEXT)
                  .withColumn(LASTNAME, DataTypes.TEXT)
                  .withColumn(TITLE, DataTypes.TEXT)
                  .withColumn(EMAILS, DataTypes.setOf(DataTypes.TEXT))
                  .withColumn(PHONE_NUMBERS, DataTypes.listOf(DataTypes.TEXT))
                  .withColumn(ADDRESSES, DataTypes.mapOf(DataTypes.TEXT, udtAddressType, true))
                  .withColumn(CONFIRM_NUMBER, DataTypes.TEXT)
                  .withComment("Q9. Find guest by ID")
                  .build());
           logger.debug("+ Table '{}' has been created (if needed)", TABLE_GUESTS.asInternal());
           logger.info("Schema has been successfully initialized.");
    }

    private void prepareStatements() {
        if (psExistReservation == null) {
            psExistReservation = cqlSession.prepare(selectFrom(keyspaceName, TABLE_RESERVATION_BY_CONFI).column(CONFIRM_NUMBER)
                                .where(column(CONFIRM_NUMBER).isEqualTo(bindMarker(CONFIRM_NUMBER)))
                                .build());
            psFindReservation = cqlSession.prepare(
                                selectFrom(keyspaceName, TABLE_RESERVATION_BY_CONFI).all()
                                .where(column(CONFIRM_NUMBER).isEqualTo(bindMarker(CONFIRM_NUMBER)))
                                .build());
            psSearchReservation = cqlSession.prepare(
                                selectFrom(keyspaceName, TABLE_RESERVATION_BY_HOTEL_DATE).all()
                                .where(column(HOTEL_ID).isEqualTo(bindMarker(HOTEL_ID)))
                                .where(column(START_DATE).isEqualTo(bindMarker(START_DATE)))
                                .build());
            psDeleteReservationByConfirmation = cqlSession.prepare(
                                deleteFrom(keyspaceName, TABLE_RESERVATION_BY_CONFI)
                                .where(column(CONFIRM_NUMBER).isEqualTo(bindMarker(CONFIRM_NUMBER)))
                                .build());
            psDeleteReservationByHotelDate = cqlSession.prepare(
                    deleteFrom(keyspaceName, TABLE_RESERVATION_BY_HOTEL_DATE)
                    .where(column(HOTEL_ID).isEqualTo(bindMarker(HOTEL_ID)))
                    .where(column(START_DATE).isEqualTo(bindMarker(START_DATE)))
                    .where(column(ROOM_NUMBER).isEqualTo(bindMarker(ROOM_NUMBER)))
                    .build());
            psInsertReservationByHotelDate = cqlSession.prepare(QueryBuilder.insertInto(keyspaceName, TABLE_RESERVATION_BY_HOTEL_DATE)
                    .value(HOTEL_ID, bindMarker(HOTEL_ID))
                    .value(START_DATE, bindMarker(START_DATE))
                    .value(END_DATE, bindMarker(END_DATE))
                    .value(ROOM_NUMBER, bindMarker(ROOM_NUMBER))
                    .value(CONFIRM_NUMBER, bindMarker(CONFIRM_NUMBER))
                    .value(GUEST_ID, bindMarker(GUEST_ID))
                    .build());
            psInsertReservationByConfirmation = cqlSession.prepare(QueryBuilder.insertInto(keyspaceName, TABLE_RESERVATION_BY_CONFI)
                    .value(CONFIRM_NUMBER, bindMarker(CONFIRM_NUMBER))
                    .value(HOTEL_ID, bindMarker(HOTEL_ID))
                    .value(START_DATE, bindMarker(START_DATE))
                    .value(END_DATE, bindMarker(END_DATE))
                    .value(ROOM_NUMBER, bindMarker(ROOM_NUMBER))
                    .value(GUEST_ID, bindMarker(GUEST_ID))
                    .build());
            logger.info("Statements have been successfully prepared.");
        }
    }
}

Теперь мы закончили использовать cqlsh, давайте выйдем:

Exit

Устанавливаем и запускаем Apache Kafka

Теперь давайте сосредоточимся на расширении службы резервирования, чтобы она публиковала события в Apache Kafka.

Сначала скачиваем и устанавливаем Kafka:

wget http://archive.apache.org/dist/kafka/2.5.0/kafka_2.12-2.5.0.tgz

tar xzf kafka_2.12-2.5.0.tgz

Теперь у вас есть папка в вашей файловой системе с именем _kafka_x.x-x.x.x_ (на основе версий Scala и Kafka соответственно).

Чтобы запустить кластер Kafka с одним узлом, вам нужно запустить два процесса: Zookeeper и брокер Kafka. Kafka использует Zookeeper для управления кластером. Он работает вместе с каждым брокером и гарантирует, что брокер может взаимодействовать с кластером. Zookeeper должен быть запущен, чтобы брокер мог получать или передавать сообщения. Запускаем Zookeeper командой:

kafka_2.12-2.5.0/bin/zookeeper-server-start.sh kafka_2.12-2.5.0/config/zookeeper.properties &> zookeeper_start.log &

Далее запускаем брокера Kafka:

kafka_2.12-2.5.0/bin/kafka-server-start.sh kafka_2.12-2.5.0/config/server.properties &> kafka_start.log &

Создаем Kafka Topic и публикуем сообщения

Kafka поддерживает стиль обмена сообщениями публикации и подписки, при котором сообщения публикуются в темах в формате ключ-значение. Подобно Cassandra, Kafka разделяет данные с помощью ключа и реплицирует данные между несколькими узлами, известными как брокеры в Kafka.

Теперь давайте создадим топик Kafka для данных бронирования:

kafka_2.12-2.5.0/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic reservation --config retention.ms=-1

Далее мы обновим службу резервирования, чтобы публиковать ее в разделе Kafka о бронировании каждый раз, когда резервирование изменяется или удаляется. Мы будем вводить элементы API понемногу, и сначала мы получим опыт работы с ними в jshell, Java REPL, прежде чем обновлять фактический код в службе бронирования:

cd reservation-service mvn -q compile com.github.johnpoth:jshell-maven-plugin:1.3:run

Давайте создадим KafkaProducer, который мы можем использовать для публикации сообщений в топике Kafka:

import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ProducerConfig.CLIENT_ID_CONFIG, "ReservationService"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); KafkaProducer<String, String> producer = new KafkaProducer<>(props);

Давайте создадим объект Reservation, используя класс, который служба Reservation использует для представления обрабатываемого нами бронирования:

import java.time.LocalDate; import java.util.UUID; import dev.cassandraguide.model.Reservation; String confirmationNumber = "RS2G0Z" Reservation reservation = new Reservation(); reservation.setConfirmationNumber(confirmationNumber); reservation.setStartDate(LocalDate.now()); reservation.setEndDate(LocalDate.now().plusDays(2)); reservation.setHotelId("NY456"); reservation.setGuestId(UUID.fromString("1b4d86f4-ccff-4256-a63d-45c905df2677")); reservation.setRoomNumber((short)111);

Теперь сериализуйте этот класс в строку JSON, используя Jackson ObjectMapper:

Параметры, выбранные в objectMapper, предназначены для упрощения форматирования даты и красивой печати JSON. Чтобы увидеть результирующую строку, мы выполняем:

System.out.println(reservationJson);

Теперь мы можем опубликовать сообщение в топике бронирования Kafka, используя confirmNumber в качестве ключа и reservationJson в качестве полезной нагрузки:

ProducerRecord<String, String> record = new ProducerRecord<>("reservation", confirmationNumber, reservationJson); producer.send(record);

Выйдем из jshell: exit

Давайте воспользуемся простым консьюмером командной строки, чтобы прочитать сообщение, которое мы только что опубликовали:

cd kafka_2.12-2.5.0/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic reservation --max-messages 10

Выйти из консьюмера можно с помощью Ctrl+C.

Обновление службы бронирования для публикации в Kafka

Теперь, когда у нас есть некоторый опыт публикации в Kafka, давайте обновим службу резервирования, чтобы публиковать события в Kafka при изменении или удалении резервирования. Взглянем на класс ReservationRepository:

ReservationRepository.java
/*
 * Copyright (C) 2017-2020 Jeff Carpenter
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package dev.cassandraguide.repository;

import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.bindMarker;
import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.deleteFrom;
import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.selectFrom;
import static com.datastax.oss.driver.api.querybuilder.SchemaBuilder.createTable;
import static com.datastax.oss.driver.api.querybuilder.SchemaBuilder.createType;
import static com.datastax.oss.driver.api.querybuilder.relation.Relation.column;

import java.time.LocalDate;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;

import javax.annotation.PreDestroy;

import com.fasterxml.jackson.core.JsonProcessingException;
import dev.cassandraguide.model.Reservation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Profile;
import org.springframework.lang.NonNull;
import org.springframework.stereotype.Repository;

import com.datastax.oss.driver.api.core.CqlIdentifier;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.BatchStatement;
import com.datastax.oss.driver.api.core.cql.BoundStatement;
import com.datastax.oss.driver.api.core.cql.DefaultBatchType;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.datastax.oss.driver.api.core.cql.Row;
import com.datastax.oss.driver.api.core.metadata.schema.ClusteringOrder;
import com.datastax.oss.driver.api.core.type.DataTypes;
import com.datastax.oss.driver.api.core.type.UserDefinedType;
import com.datastax.oss.driver.api.querybuilder.QueryBuilder;

// TODO: Review imports for publishing to Kafka
import org.apache.kafka.clients.producer.*;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;

/**
 * The goal of this project is to provide a minimally functional implementation of a microservice 
 * that uses Apache Cassandra for its data storage. The reservation service is implemented as a 
 * RESTful service using Spring Boot.
 *
 * @author Jeff Carpenter, Cedrick Lunven
 */
@Repository
@Profile("!unit-test") // When I do some 'unit-test' no connectivity to DB
public class ReservationRepository {

    /** Logger for the class. */
    private static final Logger logger = LoggerFactory.getLogger(ReservationRepository.class);
    
    // Reservation Schema Constants
    public static final CqlIdentifier TYPE_ADDRESS               = CqlIdentifier.fromCql("address");
    public static final CqlIdentifier TABLE_RESERVATION_BY_HOTEL_DATE =
            CqlIdentifier.fromCql("reservations_by_hotel_date");
    public static final CqlIdentifier TABLE_RESERVATION_BY_CONFI = CqlIdentifier.fromCql("reservations_by_confirmation");
    public static final CqlIdentifier TABLE_RESERVATION_BY_GUEST = CqlIdentifier.fromCql("reservations_by_guest");
    public static final CqlIdentifier TABLE_GUESTS               = CqlIdentifier.fromCql("guests");
    public static final CqlIdentifier STREET                     = CqlIdentifier.fromCql("street");
    public static final CqlIdentifier CITY                       = CqlIdentifier.fromCql("city");
    public static final CqlIdentifier STATE_PROVINCE             = CqlIdentifier.fromCql("state_or_province");
    public static final CqlIdentifier POSTAL_CODE                = CqlIdentifier.fromCql("postal_code");
    public static final CqlIdentifier COUNTRY                    = CqlIdentifier.fromCql("country");
    public static final CqlIdentifier HOTEL_ID                   = CqlIdentifier.fromCql("hotel_id");
    public static final CqlIdentifier START_DATE                 = CqlIdentifier.fromCql("start_date");
    public static final CqlIdentifier END_DATE                   = CqlIdentifier.fromCql("end_date");
    public static final CqlIdentifier ROOM_NUMBER                = CqlIdentifier.fromCql("room_number");
    public static final CqlIdentifier CONFIRM_NUMBER             = CqlIdentifier.fromCql("confirm_number");
    public static final CqlIdentifier GUEST_ID                   = CqlIdentifier.fromCql("guest_id");
    public static final CqlIdentifier GUEST_LAST_NAME            = CqlIdentifier.fromCql("guest_last_name");
    public static final CqlIdentifier FIRSTNAME                  = CqlIdentifier.fromCql("first_name");
    public static final CqlIdentifier LASTNAME                   = CqlIdentifier.fromCql("last_name");
    public static final CqlIdentifier TITLE                      = CqlIdentifier.fromCql("title");
    public static final CqlIdentifier EMAILS                     = CqlIdentifier.fromCql("emails");
    public static final CqlIdentifier PHONE_NUMBERS              = CqlIdentifier.fromCql("phone_numbers");
    public static final CqlIdentifier ADDRESSES                  = CqlIdentifier.fromCql("addresses");
    
    private PreparedStatement psExistReservation;
    private PreparedStatement psFindReservation;
    private PreparedStatement psInsertReservationByHotelDate;
    private PreparedStatement psInsertReservationByConfirmation;
    private PreparedStatement psDeleteReservationByHotelDate;
    private PreparedStatement psDeleteReservationByConfirmation;
    private PreparedStatement psSearchReservation;
    
    /** CqlSession holding metadata to interact with Cassandra. */
    private CqlSession     cqlSession;
    private CqlIdentifier  keyspaceName;

    // TODO: Review variables used for publishing to Kafka
    /** KafkaProducer for publishing messages to Kafka. */
    private KafkaProducer<String, String> kafkaProducer;
    private String kafkaTopicName;
    private ObjectMapper objectMapper;
    
    /** External Initialization. */
    public ReservationRepository(
            @NonNull CqlSession cqlSession, 
            @Qualifier("keyspace") @NonNull CqlIdentifier keyspaceName,
            @NonNull KafkaProducer<String, String> kafkaProducer,
            @NonNull String kafkaTopicName) {
        this.cqlSession   = cqlSession;
        this.keyspaceName = keyspaceName;

        // TODO: Review initialization of objects needed for publishing to Kafka
        this.kafkaProducer = kafkaProducer;
        this.kafkaTopicName = kafkaTopicName;

        objectMapper = new ObjectMapper();
        objectMapper.registerModule(new JavaTimeModule());
        objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
        objectMapper.enable(SerializationFeature.INDENT_OUTPUT);

        // Will create tables (if they do not exist)
        createReservationTables();
        
        // Prepare Statements of reservation
        prepareStatements();
        logger.info("Application initialized.");
    }
    
    /**
     * CqlSession is a stateful object handling TCP connections to nodes in the cluster.
     * This operation properly closes sockets when the application is stopped
     */
    @PreDestroy
    public void cleanup() {
        if (null != cqlSession) {
            cqlSession.close();
            logger.info("+ CqlSession has been successfully closed");
        }
    }
    
    /**
     * Testing existence is useful for building correct semantics in the RESTful API. To evaluate existence find the
     * table where confirmation number is partition key which is reservations_by_confirmation
     * 
     * @param confirmationNumber
     *      unique identifier for confirmation
     * @return
     *      true if the reservation exists, false if it does not
     */
    public boolean exists(String confirmationNumber) {
        return cqlSession.execute(psExistReservation.bind(confirmationNumber))
                         .getAvailableWithoutFetching() > 0;
    }
    
    /**
     * Similar to exists() but maps and parses results.
     * 
     * @param confirmationNumber
     *      unique identifier for confirmation
     * @return
     *      reservation if present or empty
     */
    @NonNull
    public Optional<Reservation> findByConfirmationNumber(@NonNull String confirmationNumber) {
        
        ResultSet resultSet = cqlSession.execute(psFindReservation.bind(confirmationNumber));
        
        // Hint: an empty result might not be an error as this method is sometimes used to check whether a
        // reservation with this confirmation number exists
        Row row = resultSet.one();
        if (row == null) {
            logger.debug("Unable to load reservation with confirmation number: " + confirmationNumber);
            return Optional.empty();
        }
        
        // Hint: If there is a result, create a new reservation object and set the values
        // Bonus: factor the logic to extract a reservation from a row into a separate method
        // (you will reuse it again later in getAllReservations())
        return Optional.of(mapRowToReservation(row));
    }
    
    /**
     * Create new entry in multiple tables for this reservation.
     *
     * @param reservation
     *      current reservation object
     * @return
     *      confirmation number for the reservation
     *      
     */
     public String upsert(Reservation reservation) {
        Objects.requireNonNull(reservation);
        if (null == reservation.getConfirmationNumber()) {
            // Generating a new reservation number if none has been provided
            reservation.setConfirmationNumber(UUID.randomUUID().toString());
        }
        // Insert into 'reservations_by_hotel_date'
        BoundStatement bsInsertReservationByHotel = 
                psInsertReservationByHotelDate.bind(reservation.getHotelId(), reservation.getStartDate(),
                        reservation.getEndDate(), reservation.getRoomNumber(), reservation.getConfirmationNumber(),
                        reservation.getGuestId());
        // Insert into 'reservations_by_confirmation'
        BoundStatement bsInsertReservationByConfirmation = 
                psInsertReservationByConfirmation.bind(reservation.getConfirmationNumber(), reservation.getHotelId(),
                        reservation.getStartDate(), reservation.getEndDate(), reservation.getRoomNumber(),
                        reservation.getGuestId());
        BatchStatement batchInsertReservation = BatchStatement
                    .builder(DefaultBatchType.LOGGED)
                    .addStatement(bsInsertReservationByHotel)
                    .addStatement(bsInsertReservationByConfirmation)
                    .build();
        cqlSession.execute(batchInsertReservation);

        // TODO: Publish message to Kafka containing reservation
         try {
             String reservationJson = objectMapper.writeValueAsString(reservation);
             // HINT: use the constructor ProducerRecord(String topic, K key, V value)
             // with the reservation confirmation number as the key, and the JSON string as the value
             ProducerRecord<String, String> record = null;
             kafkaProducer.send(record);
         } catch (Exception e) {
             logger.warn("Error publishing reservation message to Kafka: {}", e);
         }

        return reservation.getConfirmationNumber();
    }

    /**
     * We pick 'reservations_by_confirmation' table to list reservations
     * BUT we could have used 'reservations_by_hotel_date' (as no key provided in request)
     *  
     * @return
     *      list containing all reservations
     */
    public List<Reservation> findAll() {
        return cqlSession.execute(selectFrom(keyspaceName, TABLE_RESERVATION_BY_CONFI).all().build())
                  .all()                          // no paging we retrieve all objects
                  .stream()                       // because we are good people
                  .map(this::mapRowToReservation) // Mapping row as Reservation
                  .collect(Collectors.toList());  // Back to list objects
    }
      
    /**
     * Deleting a reservation.
     *
     * @param confirmationNumber
     *      unique identifier for confirmation.
     */
    public boolean delete(String confirmationNumber) {

        // Retrieving entire reservation in order to obtain the attributes we will need to delete from
        // reservations_by_hotel_date table
        Optional<Reservation> reservationToDelete = this.findByConfirmationNumber(confirmationNumber);

        if (reservationToDelete.isPresent()) {

            // Delete from 'reservations_by_hotel_date'
            Reservation reservation = reservationToDelete.get();
            BoundStatement bsDeleteReservationByHotelDate =
                    psDeleteReservationByHotelDate.bind(reservation.getHotelId(),
                            reservation.getStartDate(), reservation.getRoomNumber());

            // Delete from 'reservations_by_confirmation'
            BoundStatement bsDeleteReservationByConfirmation =
                    psDeleteReservationByConfirmation.bind(confirmationNumber);

            BatchStatement batchDeleteReservation = BatchStatement
                    .builder(DefaultBatchType.LOGGED)
                    .addStatement(bsDeleteReservationByHotelDate)
                    .addStatement(bsDeleteReservationByConfirmation)
                    .build();
            cqlSession.execute(batchDeleteReservation);

            // TODO: Publish message to Kafka with empty payload to indicate deletion
            // HINT: use the constructor ProducerRecord(String topic, K key, V value)
            // with the reservation confirmation number as the key, and an empty string as the value
            ProducerRecord<String, String> record = null;
            kafkaProducer.send(record);

            return true;
        }
        return false;
    }
    
    /**
     * Search all reservation for an hotel id and LocalDate.
     *
     * @param hotelId
     *      hotel identifier
     * @param date
     *      searched Date
     * @return
     *      list of reservations matching the search criteria
     */
    public List<Reservation> findByHotelAndDate(String hotelId, LocalDate date) {
        Objects.requireNonNull(hotelId);
        Objects.requireNonNull(date);
        return cqlSession.execute(psSearchReservation.bind(hotelId, date))
                         .all()                          // no paging we retrieve all objects
                         .stream()                       // because we are good people
                         .map(this::mapRowToReservation) // Mapping row as Reservation
                         .collect(Collectors.toList());  // Back to list objects
    }

    /**
     * Utility method to marshal a row as expected Reservation Bean.
     *
     * @param row
     *      current row from ResultSet
     * @return
     *      object
     */
    private Reservation mapRowToReservation(Row row) {
        Reservation reservation = new Reservation();
        reservation.setHotelId(row.getString(HOTEL_ID));
        reservation.setConfirmationNumber(row.getString(CONFIRM_NUMBER));
        reservation.setGuestId(row.getUuid(GUEST_ID));
        reservation.setRoomNumber(row.getShort(ROOM_NUMBER));
        reservation.setStartDate(row.getLocalDate(START_DATE));
        reservation.setEndDate(row.getLocalDate(END_DATE));
        return reservation;
    }
    
    /**
     * Create Keyspace and relevant tables as per defined in 'reservation.cql'
     */
    public void createReservationTables() {
        
        /**
         * Create TYPE 'Address' if not exists
         * 
         * CREATE TYPE reservation.address (
         *   street text,
         *   city text,
         *   state_or_province text,
         *   postal_code text,
         *   country text
         * );
         */
        cqlSession.execute(
                createType(keyspaceName, TYPE_ADDRESS)
                .ifNotExists()
                .withField(STREET, DataTypes.TEXT)
                .withField(CITY, DataTypes.TEXT)
                .withField(STATE_PROVINCE, DataTypes.TEXT)
                .withField(POSTAL_CODE, DataTypes.TEXT)
                .withField(COUNTRY, DataTypes.TEXT)
                .build());
        logger.debug("+ Type '{}' has been created (if needed)", TYPE_ADDRESS.asInternal());
        
        /** 
         * CREATE TABLE reservation.reservations_by_hotel_date (
         *  hotel_id text,
         *  start_date date,
         *  end_date date,
         *  room_number smallint,
         *  confirm_number text,
         *  guest_id uuid,
         *  PRIMARY KEY ((hotel_id, start_date), room_number)
         * );
         */
        cqlSession.execute(createTable(keyspaceName, TABLE_RESERVATION_BY_HOTEL_DATE)
                        .ifNotExists()
                        .withPartitionKey(HOTEL_ID, DataTypes.TEXT)
                        .withPartitionKey(START_DATE, DataTypes.DATE)
                        .withClusteringColumn(ROOM_NUMBER, DataTypes.SMALLINT)
                        .withColumn(END_DATE, DataTypes.DATE)
                        .withColumn(CONFIRM_NUMBER, DataTypes.TEXT)
                        .withColumn(GUEST_ID, DataTypes.UUID)
                        .withClusteringOrder(ROOM_NUMBER, ClusteringOrder.ASC)
                        .withComment("Q7. Find reservations by hotel and date")
                        .build());
        logger.debug("+ Table '{}' has been created (if needed)", TABLE_RESERVATION_BY_HOTEL_DATE.asInternal());
        
        /**
         * CREATE TABLE reservation.reservations_by_confirmation (
         *   confirm_number text PRIMARY KEY,
         *   hotel_id text,
         *   start_date date,
         *   end_date date,
         *   room_number smallint,
         *   guest_id uuid
         * );
         */
        cqlSession.execute(createTable(keyspaceName, TABLE_RESERVATION_BY_CONFI)
                .ifNotExists()
                .withPartitionKey(CONFIRM_NUMBER, DataTypes.TEXT)
                .withColumn(HOTEL_ID, DataTypes.TEXT)
                .withColumn(START_DATE, DataTypes.DATE)
                .withColumn(END_DATE, DataTypes.DATE)
                .withColumn(ROOM_NUMBER, DataTypes.SMALLINT)
                .withColumn(GUEST_ID, DataTypes.UUID)
                .build());
         logger.debug("+ Table '{}' has been created (if needed)", TABLE_RESERVATION_BY_CONFI.asInternal());
         
         /**
          * CREATE TABLE reservation.reservations_by_guest (
          *  guest_last_name text,
          *  hotel_id text,
          *  start_date date,
          *  end_date date,
          *  room_number smallint,
          *  confirm_number text,
          *  guest_id uuid,
          *  PRIMARY KEY ((guest_last_name), hotel_id)
          * );
          */
         cqlSession.execute(createTable(keyspaceName, TABLE_RESERVATION_BY_GUEST)
                 .ifNotExists()
                 .withPartitionKey(GUEST_LAST_NAME, DataTypes.TEXT)
                 .withClusteringColumn(HOTEL_ID, DataTypes.TEXT)
                 .withColumn(START_DATE, DataTypes.DATE)
                 .withColumn(END_DATE, DataTypes.DATE)
                 .withColumn(ROOM_NUMBER, DataTypes.SMALLINT)
                 .withColumn(CONFIRM_NUMBER, DataTypes.TEXT)
                 .withColumn(GUEST_ID, DataTypes.UUID)
                 .withComment("Q8. Find reservations by guest name")
                 .build());
          logger.debug("+ Table '{}' has been created (if needed)", TABLE_RESERVATION_BY_GUEST.asInternal());
          
          /**
           * CREATE TABLE reservation.guests (
           *   guest_id uuid PRIMARY KEY,
           *   first_name text,
           *   last_name text,
           *   title text,
           *   emails set<text>,
           *   phone_numbers list<text>,
           *   addresses map<text, frozen<address>>,
           *   confirm_number text
           * );
           */
          UserDefinedType  udtAddressType = 
                  cqlSession.getMetadata().getKeyspace(keyspaceName).get() // Retrieving KeySpaceMetadata
                            .getUserDefinedType(TYPE_ADDRESS).get();        // Looking for UDT (extending DataType)
          cqlSession.execute(createTable(keyspaceName, TABLE_GUESTS)
                  .ifNotExists()
                  .withPartitionKey(GUEST_ID, DataTypes.UUID)
                  .withColumn(FIRSTNAME, DataTypes.TEXT)
                  .withColumn(LASTNAME, DataTypes.TEXT)
                  .withColumn(TITLE, DataTypes.TEXT)
                  .withColumn(EMAILS, DataTypes.setOf(DataTypes.TEXT))
                  .withColumn(PHONE_NUMBERS, DataTypes.listOf(DataTypes.TEXT))
                  .withColumn(ADDRESSES, DataTypes.mapOf(DataTypes.TEXT, udtAddressType, true))
                  .withColumn(CONFIRM_NUMBER, DataTypes.TEXT)
                  .withComment("Q9. Find guest by ID")
                  .build());
           logger.debug("+ Table '{}' has been created (if needed)", TABLE_GUESTS.asInternal());
           logger.info("Schema has been successfully initialized.");
    }

    private void prepareStatements() {
        if (psExistReservation == null) {
            psExistReservation = cqlSession.prepare(selectFrom(keyspaceName, TABLE_RESERVATION_BY_CONFI).column(CONFIRM_NUMBER)
                                .where(column(CONFIRM_NUMBER).isEqualTo(bindMarker(CONFIRM_NUMBER)))
                                .build());
            psFindReservation = cqlSession.prepare(
                                selectFrom(keyspaceName, TABLE_RESERVATION_BY_CONFI).all()
                                .where(column(CONFIRM_NUMBER).isEqualTo(bindMarker(CONFIRM_NUMBER)))
                                .build());
            psSearchReservation = cqlSession.prepare(
                                selectFrom(keyspaceName, TABLE_RESERVATION_BY_HOTEL_DATE).all()
                                .where(column(HOTEL_ID).isEqualTo(bindMarker(HOTEL_ID)))
                                .where(column(START_DATE).isEqualTo(bindMarker(START_DATE)))
                                .build());
            psDeleteReservationByConfirmation = cqlSession.prepare(
                                deleteFrom(keyspaceName, TABLE_RESERVATION_BY_CONFI)
                                .where(column(CONFIRM_NUMBER).isEqualTo(bindMarker(CONFIRM_NUMBER)))
                                .build());
            psDeleteReservationByHotelDate = cqlSession.prepare(
                    deleteFrom(keyspaceName, TABLE_RESERVATION_BY_HOTEL_DATE)
                    .where(column(HOTEL_ID).isEqualTo(bindMarker(HOTEL_ID)))
                    .where(column(START_DATE).isEqualTo(bindMarker(START_DATE)))
                    .where(column(ROOM_NUMBER).isEqualTo(bindMarker(ROOM_NUMBER)))
                    .build());
            psInsertReservationByHotelDate = cqlSession.prepare(QueryBuilder.insertInto(keyspaceName, TABLE_RESERVATION_BY_HOTEL_DATE)
                    .value(HOTEL_ID, bindMarker(HOTEL_ID))
                    .value(START_DATE, bindMarker(START_DATE))
                    .value(END_DATE, bindMarker(END_DATE))
                    .value(ROOM_NUMBER, bindMarker(ROOM_NUMBER))
                    .value(CONFIRM_NUMBER, bindMarker(CONFIRM_NUMBER))
                    .value(GUEST_ID, bindMarker(GUEST_ID))
                    .build());
            psInsertReservationByConfirmation = cqlSession.prepare(QueryBuilder.insertInto(keyspaceName, TABLE_RESERVATION_BY_CONFI)
                    .value(CONFIRM_NUMBER, bindMarker(CONFIRM_NUMBER))
                    .value(HOTEL_ID, bindMarker(HOTEL_ID))
                    .value(START_DATE, bindMarker(START_DATE))
                    .value(END_DATE, bindMarker(END_DATE))
                    .value(ROOM_NUMBER, bindMarker(ROOM_NUMBER))
                    .value(GUEST_ID, bindMarker(GUEST_ID))
                    .build());
            logger.info("Statements have been successfully prepared.");
        }
    }
}

TODO в этом классе включают несколько пунктов, которые мы должны рассмотреть:

  • TODO: проверяет импорт для публикации в Kafka. Обратите внимание, что классы, которые вы использовали ранее, импортированы.

  • TODO: Это обзор переменных, используемых для публикации в Kafka. В репозитории хранятся ObjectMapper и KafkaProducer.

  • TODO: Проверка инициализации объектов, необходимых для публикации в Kafka, инициализация переменных.

Теперь самое интересное — реализация кода для публикации сообщений:

  • TODO: Это публикует сообщение Кафке, содержащее reservation. В методе upsert() опубликуйте сообщение, содержащее резервирование, в виде строки JSON.

  • TODO: публикует сообщение для Kafka с пустой полезной нагрузкой, указывающее на удаление. В методе delete() публикует сообщение об удалении резервирования.

Подправим наш код:

ReservationRepositorySolution.java
/*
 * Copyright (C) 2017-2020 Jeff Carpenter
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package dev.cassandraguide.repository;

import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.bindMarker;
import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.deleteFrom;
import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.selectFrom;
import static com.datastax.oss.driver.api.querybuilder.SchemaBuilder.createTable;
import static com.datastax.oss.driver.api.querybuilder.SchemaBuilder.createType;
import static com.datastax.oss.driver.api.querybuilder.relation.Relation.column;

import java.time.LocalDate;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;

import javax.annotation.PreDestroy;

import com.fasterxml.jackson.core.JsonProcessingException;
import dev.cassandraguide.model.Reservation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Profile;
import org.springframework.lang.NonNull;
import org.springframework.stereotype.Repository;

import com.datastax.oss.driver.api.core.CqlIdentifier;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.BatchStatement;
import com.datastax.oss.driver.api.core.cql.BoundStatement;
import com.datastax.oss.driver.api.core.cql.DefaultBatchType;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.datastax.oss.driver.api.core.cql.Row;
import com.datastax.oss.driver.api.core.metadata.schema.ClusteringOrder;
import com.datastax.oss.driver.api.core.type.DataTypes;
import com.datastax.oss.driver.api.core.type.UserDefinedType;
import com.datastax.oss.driver.api.querybuilder.QueryBuilder;

// TODO: Review imports for publishing to Kafka
import org.apache.kafka.clients.producer.*;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;

/**
 * The goal of this project is to provide a minimally functional implementation of a microservice 
 * that uses Apache Cassandra for its data storage. The reservation service is implemented as a 
 * RESTful service using Spring Boot.
 *
 * @author Jeff Carpenter, Cedrick Lunven
 */
@Repository
@Profile("!unit-test") // When I do some 'unit-test' no connectivity to DB
public class ReservationRepository {

    /** Logger for the class. */
    private static final Logger logger = LoggerFactory.getLogger(ReservationRepository.class);
    
    // Reservation Schema Constants
    public static final CqlIdentifier TYPE_ADDRESS               = CqlIdentifier.fromCql("address");
    public static final CqlIdentifier TABLE_RESERVATION_BY_HOTEL_DATE =
            CqlIdentifier.fromCql("reservations_by_hotel_date");
    public static final CqlIdentifier TABLE_RESERVATION_BY_CONFI = CqlIdentifier.fromCql("reservations_by_confirmation");
    public static final CqlIdentifier TABLE_RESERVATION_BY_GUEST = CqlIdentifier.fromCql("reservations_by_guest");
    public static final CqlIdentifier TABLE_GUESTS               = CqlIdentifier.fromCql("guests");
    public static final CqlIdentifier STREET                     = CqlIdentifier.fromCql("street");
    public static final CqlIdentifier CITY                       = CqlIdentifier.fromCql("city");
    public static final CqlIdentifier STATE_PROVINCE             = CqlIdentifier.fromCql("state_or_province");
    public static final CqlIdentifier POSTAL_CODE                = CqlIdentifier.fromCql("postal_code");
    public static final CqlIdentifier COUNTRY                    = CqlIdentifier.fromCql("country");
    public static final CqlIdentifier HOTEL_ID                   = CqlIdentifier.fromCql("hotel_id");
    public static final CqlIdentifier START_DATE                 = CqlIdentifier.fromCql("start_date");
    public static final CqlIdentifier END_DATE                   = CqlIdentifier.fromCql("end_date");
    public static final CqlIdentifier ROOM_NUMBER                = CqlIdentifier.fromCql("room_number");
    public static final CqlIdentifier CONFIRMATION_NUMBER        = CqlIdentifier.fromCql("confirmation_number");
    public static final CqlIdentifier GUEST_ID                   = CqlIdentifier.fromCql("guest_id");
    public static final CqlIdentifier GUEST_LAST_NAME            = CqlIdentifier.fromCql("guest_last_name");
    public static final CqlIdentifier FIRSTNAME                  = CqlIdentifier.fromCql("first_name");
    public static final CqlIdentifier LASTNAME                   = CqlIdentifier.fromCql("last_name");
    public static final CqlIdentifier TITLE                      = CqlIdentifier.fromCql("title");
    public static final CqlIdentifier EMAILS                     = CqlIdentifier.fromCql("emails");
    public static final CqlIdentifier PHONE_NUMBERS              = CqlIdentifier.fromCql("phone_numbers");
    public static final CqlIdentifier ADDRESSES                  = CqlIdentifier.fromCql("addresses");
    
    private PreparedStatement psExistReservation;
    private PreparedStatement psFindReservation;
    private PreparedStatement psInsertReservationByHotelDate;
    private PreparedStatement psInsertReservationByConfirmation;
    private PreparedStatement psDeleteReservationByHotelDate;
    private PreparedStatement psDeleteReservationByConfirmation;
    private PreparedStatement psSearchReservation;
    
    /** CqlSession holding metadata to interact with Cassandra. */
    private CqlSession     cqlSession;
    private CqlIdentifier  keyspaceName;

    // TODO: Review variables used for publishing to Kafka
    /** KafkaProducer for publishing messages to Kafka. */
    private KafkaProducer<String, String> kafkaProducer;
    private String kafkaTopicName;
    private ObjectMapper objectMapper;
    
    /** External Initialization. */
    public ReservationRepository(
            @NonNull CqlSession cqlSession, 
            @Qualifier("keyspace") @NonNull CqlIdentifier keyspaceName,
            @NonNull KafkaProducer<String, String> kafkaProducer,
            @NonNull String kafkaTopicName) {
        this.cqlSession   = cqlSession;
        this.keyspaceName = keyspaceName;

        // TODO: Review initialization of objects needed for publishing to Kafka
        this.kafkaProducer = kafkaProducer;
        this.kafkaTopicName = kafkaTopicName;

        objectMapper = new ObjectMapper();
        objectMapper.registerModule(new JavaTimeModule());
        objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS)
        objectMapper.enable(SerializationFeature.INDENT_OUTPUT)

        // Will create tables (if they do not exist)
        createReservationTables();
        
        // Prepare Statements of reservation
        prepareStatements();
        logger.info("Application initialized.");
    }
    
    /**
     * CqlSession is a stateful object handling TCP connections to nodes in the cluster.
     * This operation properly closes sockets when the application is stopped
     */
    @PreDestroy
    public void cleanup() {
        if (null != cqlSession) {
            cqlSession.close();
            logger.info("+ CqlSession has been successfully closed");
        }
    }
    
    /**
     * Testing existence is useful for building correct semantics in the RESTful API. To evaluate existence find the
     * table where confirmation number is partition key which is reservations_by_confirmation
     * 
     * @param confirmationNumber
     *      unique identifier for confirmation
     * @return
     *      true if the reservation exists, false if it does not
     */
    public boolean exists(String confirmationNumber) {
        return cqlSession.execute(psExistReservation.bind(confirmationNumber))
                         .getAvailableWithoutFetching() > 0;
    }
    
    /**
     * Similar to exists() but maps and parses results.
     * 
     * @param confirmationNumber
     *      unique identifier for confirmation
     * @return
     *      reservation if present or empty
     */
    @NonNull
    public Optional<Reservation> findByConfirmationNumber(@NonNull String confirmationNumber) {
        
        ResultSet resultSet = cqlSession.execute(psFindReservation.bind(confirmationNumber));
        
        // Hint: an empty result might not be an error as this method is sometimes used to check whether a
        // reservation with this confirmation number exists
        Row row = resultSet.one();
        if (row == null) {
            logger.debug("Unable to load reservation with confirmation number: " + confirmationNumber);
            return Optional.empty();
        }
        
        // Hint: If there is a result, create a new reservation object and set the values
        // Bonus: factor the logic to extract a reservation from a row into a separate method
        // (you will reuse it again later in getAllReservations())
        return Optional.of(mapRowToReservation(row));
    }
    
    /**
     * Create new entry in multiple tables for this reservation.
     *
     * @param reservation
     *      current reservation object
     * @return
     *      confirmation number for the reservation
     *      
     */
     public String upsert(Reservation reservation) {
        Objects.requireNonNull(reservation);
        if (null == reservation.getConfirmationNumber()) {
            // Generating a new reservation number if none has been provided
            reservation.setConfirmationNumber(UUID.randomUUID().toString());
        }
        // Insert into 'reservations_by_hotel_date'
        BoundStatement bsInsertReservationByHotel = 
                psInsertReservationByHotelDate.bind(reservation.getHotelId(), reservation.getStartDate(),
                        reservation.getEndDate(), reservation.getRoomNumber(), reservation.getConfirmationNumber(),
                        reservation.getGuestId());
        // Insert into 'reservations_by_confirmation'
        BoundStatement bsInsertReservationByConfirmation = 
                psInsertReservationByConfirmation.bind(reservation.getConfirmationNumber(), reservation.getHotelId(),
                        reservation.getStartDate(), reservation.getEndDate(), reservation.getRoomNumber(),
                        reservation.getGuestId());
        BatchStatement batchInsertReservation = BatchStatement
                    .builder(DefaultBatchType.LOGGED)
                    .addStatement(bsInsertReservationByHotel)
                    .addStatement(bsInsertReservationByConfirmation)
                    .build();
        cqlSession.execute(batchInsertReservation);

        // TODO: Publish message to Kafka containing reservation
         try {
             String reservationJson = objectMapper.writeValueAsString(reservation);
             ProducerRecord<String, String> record = new ProducerRecord<>(kafkaTopicName, reservation.getConfirmationNumber(), reservationJson);
             kafkaProducer.send(record);
         } catch (Exception e) {
             logger.warn("Error publishing reservation message to Kafka: {}", e);
         }

        return reservation.getConfirmationNumber();
    }

    /**
     * We pick 'reservations_by_confirmation' table to list reservations
     * BUT we could have used 'reservations_by_hotel_date' (as no key provided in request)
     *  
     * @return
     *      list containing all reservations
     */
    public List<Reservation> findAll() {
        return cqlSession.execute(selectFrom(keyspaceName, TABLE_RESERVATION_BY_CONFI).all().build())
                  .all()                          // no paging we retrieve all objects
                  .stream()                       // because we are good people
                  .map(this::mapRowToReservation) // Mapping row as Reservation
                  .collect(Collectors.toList());  // Back to list objects
    }
      
    /**
     * Deleting a reservation.
     *
     * @param confirmationNumber
     *      unique identifier for confirmation.
     */
    public boolean delete(String confirmationNumber) {

        // Retrieving entire reservation in order to obtain the attributes we will need to delete from
        // reservations_by_hotel_date table
        Optional<Reservation> reservationToDelete = this.findByConfirmationNumber(confirmationNumber);

        if (reservationToDelete.isPresent()) {

            // Delete from 'reservations_by_hotel_date'
            Reservation reservation = reservationToDelete.get();
            BoundStatement bsDeleteReservationByHotelDate =
                    psDeleteReservationByHotelDate.bind(reservation.getHotelId(),
                            reservation.getStartDate(), reservation.getRoomNumber());

            // Delete from 'reservations_by_confirmation'
            BoundStatement bsDeleteReservationByConfirmation =
                    psDeleteReservationByConfirmation.bind(confirmationNumber);

            BatchStatement batchDeleteReservation = BatchStatement
                    .builder(DefaultBatchType.LOGGED)
                    .addStatement(bsDeleteReservationByHotelDate)
                    .addStatement(bsDeleteReservationByConfirmation)
                    .build();
            cqlSession.execute(batchDeleteReservation);

            // TODO: Publish message to Kafka with empty payload to indicate deletion
            ProducerRecord<String, String> record = new ProducerRecord<>(kafkaTopicName, reservation.getConfirmationNumber(), "");
            kafkaProducer.send(record);

            return true;
        }
        return false;
    }
    
    /**
     * Search all reservation for an hotel id and LocalDate.
     *
     * @param hotelId
     *      hotel identifier
     * @param date
     *      searched Date
     * @return
     *      list of reservations matching the search criteria
     */
    public List<Reservation> findByHotelAndDate(String hotelId, LocalDate date) {
        Objects.requireNonNull(hotelId);
        Objects.requireNonNull(date);
        return cqlSession.execute(psSearchReservation.bind(hotelId, date))
                         .all()                          // no paging we retrieve all objects
                         .stream()                       // because we are good people
                         .map(this::mapRowToReservation) // Mapping row as Reservation
                         .collect(Collectors.toList());  // Back to list objects
    }

    /**
     * Utility method to marshal a row as expected Reservation Bean.
     *
     * @param row
     *      current row from ResultSet
     * @return
     *      object
     */
    private Reservation mapRowToReservation(Row row) {
        Reservation reservation = new Reservation();
        reservation.setHotelId(row.getString(HOTEL_ID));
        reservation.setConfirmationNumber(row.getString(CONFIRMATION_NUMBER));
        reservation.setGuestId(row.getUuid(GUEST_ID));
        reservation.setRoomNumber(row.getShort(ROOM_NUMBER));
        reservation.setStartDate(row.getLocalDate(START_DATE));
        reservation.setEndDate(row.getLocalDate(END_DATE));
        return reservation;
    }
    
    /**
     * Create Keyspace and relevant tables as per defined in 'reservation.cql'
     */
    public void createReservationTables() {
        
        /**
         * Create TYPE 'Address' if not exists
         * 
         * CREATE TYPE reservation.address (
         *   street text,
         *   city text,
         *   state_or_province text,
         *   postal_code text,
         *   country text
         * );
         */
        cqlSession.execute(
                createType(keyspaceName, TYPE_ADDRESS)
                .ifNotExists()
                .withField(STREET, DataTypes.TEXT)
                .withField(CITY, DataTypes.TEXT)
                .withField(STATE_PROVINCE, DataTypes.TEXT)
                .withField(POSTAL_CODE, DataTypes.TEXT)
                .withField(COUNTRY, DataTypes.TEXT)
                .build());
        logger.debug("+ Type '{}' has been created (if needed)", TYPE_ADDRESS.asInternal());
        
        /** 
         * CREATE TABLE reservation.reservations_by_hotel_date (
         *  hotel_id text,
         *  start_date date,
         *  end_date date,
         *  room_number smallint,
         *  confirmation_number text,
         *  guest_id uuid,
         *  PRIMARY KEY ((hotel_id, start_date), room_number)
         * ) WITH comment = 'Q7. Find reservations by hotel and date';
         */
        cqlSession.execute(createTable(keyspaceName, TABLE_RESERVATION_BY_HOTEL_DATE)
                        .ifNotExists()
                        .withPartitionKey(HOTEL_ID, DataTypes.TEXT)
                        .withPartitionKey(START_DATE, DataTypes.DATE)
                        .withClusteringColumn(ROOM_NUMBER, DataTypes.SMALLINT)
                        .withColumn(END_DATE, DataTypes.DATE)
                        .withColumn(CONFIRMATION_NUMBER, DataTypes.TEXT)
                        .withColumn(GUEST_ID, DataTypes.UUID)
                        .withClusteringOrder(ROOM_NUMBER, ClusteringOrder.ASC)
                        .withComment("Q7. Find reservations by hotel and date")
                        .build());
        logger.debug("+ Table '{}' has been created (if needed)", TABLE_RESERVATION_BY_HOTEL_DATE.asInternal());
        
        /**
         * CREATE TABLE reservation.reservations_by_confirmation (
         *   confirmation_number text PRIMARY KEY,
         *   hotel_id text,
         *   start_date date,
         *   end_date date,
         *   room_number smallint,
         *   guest_id uuid
         * );
         */
        cqlSession.execute(createTable(keyspaceName, TABLE_RESERVATION_BY_CONFI)
                .ifNotExists()
                .withPartitionKey(CONFIRMATION_NUMBER, DataTypes.TEXT)
                .withColumn(HOTEL_ID, DataTypes.TEXT)
                .withColumn(START_DATE, DataTypes.DATE)
                .withColumn(END_DATE, DataTypes.DATE)
                .withColumn(ROOM_NUMBER, DataTypes.SMALLINT)
                .withColumn(GUEST_ID, DataTypes.UUID)
                .build());
         logger.debug("+ Table '{}' has been created (if needed)", TABLE_RESERVATION_BY_CONFI.asInternal());
         
         /**
          * CREATE TABLE reservation.reservations_by_guest (
          *  guest_last_name text,
          *  hotel_id text,
          *  start_date date,
          *  end_date date,
          *  room_number smallint,
          *  confirmation_number text,
          *  guest_id uuid,
          *  PRIMARY KEY ((guest_last_name), hotel_id)
          * ) WITH comment = 'Q8. Find reservations by guest name';
          */
         cqlSession.execute(createTable(keyspaceName, TABLE_RESERVATION_BY_GUEST)
                 .ifNotExists()
                 .withPartitionKey(GUEST_LAST_NAME, DataTypes.TEXT)
                 .withClusteringColumn(HOTEL_ID, DataTypes.TEXT)
                 .withColumn(START_DATE, DataTypes.DATE)
                 .withColumn(END_DATE, DataTypes.DATE)
                 .withColumn(ROOM_NUMBER, DataTypes.SMALLINT)
                 .withColumn(CONFIRMATION_NUMBER, DataTypes.TEXT)
                 .withColumn(GUEST_ID, DataTypes.UUID)
                 .withComment("Q8. Find reservations by guest name")
                 .build());
          logger.debug("+ Table '{}' has been created (if needed)", TABLE_RESERVATION_BY_GUEST.asInternal());
          
          /**
           * CREATE TABLE reservation.guests (
           *   guest_id uuid PRIMARY KEY,
           *   first_name text,
           *   last_name text,
           *   title text,
           *   emails set<text>,
           *   phone_numbers list<text>,
           *   addresses map<text, frozen<address>>,
           *   confirmation_number text
           * ) WITH comment = 'Q9. Find guest by ID';
           */
          UserDefinedType  udtAddressType = 
                  cqlSession.getMetadata().getKeyspace(keyspaceName).get() // Retrieving KeySpaceMetadata
                            .getUserDefinedType(TYPE_ADDRESS).get();        // Looking for UDT (extending DataType)
          cqlSession.execute(createTable(keyspaceName, TABLE_GUESTS)
                  .ifNotExists()
                  .withPartitionKey(GUEST_ID, DataTypes.UUID)
                  .withColumn(FIRSTNAME, DataTypes.TEXT)
                  .withColumn(LASTNAME, DataTypes.TEXT)
                  .withColumn(TITLE, DataTypes.TEXT)
                  .withColumn(EMAILS, DataTypes.setOf(DataTypes.TEXT))
                  .withColumn(PHONE_NUMBERS, DataTypes.listOf(DataTypes.TEXT))
                  .withColumn(ADDRESSES, DataTypes.mapOf(DataTypes.TEXT, udtAddressType, true))
                  .withColumn(CONFIRMATION_NUMBER, DataTypes.TEXT)
                  .withComment("Q9. Find guest by ID")
                  .build());
           logger.debug("+ Table '{}' has been created (if needed)", TABLE_GUESTS.asInternal());
           logger.info("Schema has been successfully initialized.");
    }

    private void prepareStatements() {
        if (psExistReservation == null) {
            psExistReservation = cqlSession.prepare(selectFrom(keyspaceName, TABLE_RESERVATION_BY_CONFI).column(CONFIRMATION_NUMBER)
                                .where(column(CONFIRMATION_NUMBER).isEqualTo(bindMarker(CONFIRMATION_NUMBER)))
                                .build());
            psFindReservation = cqlSession.prepare(
                                selectFrom(keyspaceName, TABLE_RESERVATION_BY_CONFI).all()
                                .where(column(CONFIRMATION_NUMBER).isEqualTo(bindMarker(CONFIRMATION_NUMBER)))
                                .build());
            psSearchReservation = cqlSession.prepare(
                                selectFrom(keyspaceName, TABLE_RESERVATION_BY_HOTEL_DATE).all()
                                .where(column(HOTEL_ID).isEqualTo(bindMarker(HOTEL_ID)))
                                .where(column(START_DATE).isEqualTo(bindMarker(START_DATE)))
                                .build());
            psDeleteReservationByConfirmation = cqlSession.prepare(
                                deleteFrom(keyspaceName, TABLE_RESERVATION_BY_CONFI)
                                .where(column(CONFIRMATION_NUMBER).isEqualTo(bindMarker(CONFIRMATION_NUMBER)))
                                .build());
            psDeleteReservationByHotelDate = cqlSession.prepare(
                    deleteFrom(keyspaceName, TABLE_RESERVATION_BY_HOTEL_DATE)
                    .where(column(HOTEL_ID).isEqualTo(bindMarker(HOTEL_ID)))
                    .where(column(START_DATE).isEqualTo(bindMarker(START_DATE)))
                    .where(column(ROOM_NUMBER).isEqualTo(bindMarker(ROOM_NUMBER)))
                    .build());
            psInsertReservationByHotelDate = cqlSession.prepare(QueryBuilder.insertInto(keyspaceName, TABLE_RESERVATION_BY_HOTEL_DATE)
                    .value(HOTEL_ID, bindMarker(HOTEL_ID))
                    .value(START_DATE, bindMarker(START_DATE))
                    .value(END_DATE, bindMarker(END_DATE))
                    .value(ROOM_NUMBER, bindMarker(ROOM_NUMBER))
                    .value(CONFIRMATION_NUMBER, bindMarker(CONFIRMATION_NUMBER))
                    .value(GUEST_ID, bindMarker(GUEST_ID))
                    .build());
            psInsertReservationByConfirmation = cqlSession.prepare(QueryBuilder.insertInto(keyspaceName, TABLE_RESERVATION_BY_CONFI)
                    .value(CONFIRMATION_NUMBER, bindMarker(CONFIRMATION_NUMBER))
                    .value(HOTEL_ID, bindMarker(HOTEL_ID))
                    .value(START_DATE, bindMarker(START_DATE))
                    .value(END_DATE, bindMarker(END_DATE))
                    .value(ROOM_NUMBER, bindMarker(ROOM_NUMBER))
                    .value(GUEST_ID, bindMarker(GUEST_ID))
                    .build());
            logger.info("Statements have been successfully prepared.");
        }
    }
}

И переходим к тестам.

Служба резервирования также включает тесты, которые выполняются для контейнера Cassandra, запущенного в Docker, который запускается для целей теста, а затем уничтожается.

Перед запуском этого теста мы хотим отключить узел Cassandra:

kill `cat /tmp/cassandra-pid`

Теперь мы можем запустить тесты с помощью Maven:

mvn test

Мы узнали, как написать микросервис, который сохраняет данные в Apache Cassandra с помощью Java-драйвера DataStax и генерирует события для изменения данных в Apache Kafka.

Статья подготовлена в преддверии старта курса Software Architect. Также хочу поделиться с вами записью бесплатного урока по теме "Архитектурное свойство "Сопровождаемость" на примере сервисов k8s".

Комментарии (0)