Автор статьи: Рустем Галиев
IBM Senior DevOps Engineer & Integration Architect
Привет Хабр!
Сегодня мы узнаем, как написать микросервис, который сохраняет данные в Apache Cassandra с помощью Java-драйвера DataStax и генерирует события для изменений данных в Apache Kafka.
Этот пост основан на дизайне простого микросервиса для управления данными бронирования отелей, который называется Reservation Service. Вы можете выполнить серию упражнений по записи и чтению данных в Cassandra с помощью службы резервирования в наборе «Cassandra: разработка приложений с помощью Java-драйвера DataStax». Исходный код службы бронирования доступен на GitHub.
Служба бронирования использует модель данных бронирования отелей, которая включает приведенные ниже таблицы.
Cassandra и Kafka часто используются вместе в микросервисных архитектурах, как показано ниже. В этом дизайне служба бронирования получает запрос API для выполнения некоторых действий, таких как создание, обновление или удаление бронирования. После сохранения изменения в Cassandra служба резервирования отправляет сообщение в топик reservation в кластере Kafka.
Другие службы используют топик reservation и выполняют различные действия в ответ на каждое сообщение: например, служба инвентаризации обновляет таблицы инвентаризации, чтобы отметить даты как зарезервированные, или, возможно, служба электронной почты отправляет электронное письмо с благодарностью гостю за бронирование. В этом стиле взаимодействия Кассандра и Кафка не связаны напрямую, а используются взаимодополняющим образом.
Настройка службы бронирования
Начнем с клонирования исходного кода Reservation Service с GitHub и проверки ветки, которая станет отправной точкой:git clone -b kafka
https://github.com/jeffreyscarpenter/reservation-service
После того, как это будет завершено, мы должны увидеть директорию reservation_service
.
Давайте рассмотрим часть содержимого в коде.
Во-первых, мы рассмотрим схему, которую будем использовать для таблиц резервирования по пути reservation-service/src/main/resources/reservation.cql
:
/*
* Copyright (C) 2016-2020 Jeff Carpenter
*/
/* This file contains a slightly modified version of the "reservation" keyspace and table definitions
* for the example defined in Chapter 5 of Cassandra: The Definitive Guide, 2nd and 3nd Editions.
* The changes are to facilitate development exercises.
*/
CREATE KEYSPACE reservation
WITH replication = {'class': 'SimpleStrategy', 'replication_factor' : 1};
CREATE TABLE reservation.reservations_by_hotel_date (
hotel_id text,
start_date date,
end_date date,
room_number smallint,
confirm_number text,
guest_id uuid,
PRIMARY KEY ((hotel_id, start_date), room_number)
);
CREATE TABLE reservation.reservations_by_confirmation (
confirm_number text PRIMARY KEY,
hotel_id text,
start_date date,
end_date date,
room_number smallint,
guest_id uuid
);
/*
The following tables are provided for completeness with the book text, but they are not used in the current
implementation of the Reservation Service
*/
CREATE TABLE reservation.reservations_by_guest (
guest_last_name text,
hotel_id text,
start_date date,
end_date date,
room_number smallint,
confirm_number text,
guest_id uuid,
PRIMARY KEY ((guest_last_name), hotel_id)
);
CREATE TYPE reservation.address (
street text,
city text,
state_or_province text,
postal_code text,
country text
);
CREATE TABLE reservation.guests (
guest_id uuid PRIMARY KEY,
first_name text,
last_name text,
title text,
emails set<text>,
phone_numbers list<text>,
addresses map<text, frozen<address>>
);
Мы также хотим отметить зависимости для драйвера DataStax Java и клиентских библиотек Kafka по пути reservation-service/pom.xml
:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.cassandraguide</groupId>
<artifactId>reservation-service</artifactId>
<version>1.0.0</version>
<packaging>jar</packaging>
<name>reservation-service</name>
<description>Demo service using Cassandra driver and Spring Boot</description>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.6.RELEASE</version>
<relativePath />
</parent>
<properties>
<java.version>11</java.version>
<swagger.version>2.9.2</swagger.version>
<junit-jupiter.version>5.4.2</junit-jupiter.version>
<junit-platform.version>1.7.0-M1</junit-platform.version>
<oss-java-driver.version>4.5.1</oss-java-driver.version>
<kafka-client.version>2.5.0</kafka-client.version>
<testcontainers.version>1.14.1</testcontainers.version>
<version.maven.plugin.compiler>3.8.0</version.maven.plugin.compiler>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<start-class>dev.cassandraguide.ReservationServiceApp</start-class>
<dockerfile-maven-version>1.4.13</dockerfile-maven-version>
</properties>
<dependencies>
<!-- Create a RESTFul Service -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- Document for REST Service -->
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger2</artifactId>
<version>${swagger.version}</version>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger-ui</artifactId>
<version>${swagger.version}</version>
</dependency>
<!-- Cassandra Driver -->
<dependency>
<groupId>com.datastax.oss</groupId>
<artifactId>java-driver-core</artifactId>
<version>${oss-java-driver.version}</version>
</dependency>
<dependency>
<groupId>com.datastax.oss</groupId>
<artifactId>java-driver-query-builder</artifactId>
<version>${oss-java-driver.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka-client.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<!-- Provides JSON serialization/deserialization for date/time types -->
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jsr310</artifactId>
</dependency>
<!-- Tests -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- Junit 5 -->
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-params</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.platform</groupId>
<artifactId>junit-platform-engine</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.platform</groupId>
<artifactId>junit-platform-launcher</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.platform</groupId>
<artifactId>junit-platform-runner</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.platform</groupId>
<artifactId>junit-platform-console-standalone</artifactId>
<version>${junit-platform.version}</version>
<scope>test</scope>
</dependency>
<!-- Test against Docker Containers -->
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>cassandra</artifactId>
<scope>test</scope>
<version>${testcontainers.version}</version>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<scope>test</scope>
<version>${testcontainers.version}</version>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
<scope>test</scope>
<version>${testcontainers.version}</version>
</dependency>
<!-- Add driver keys to spring-boot config file -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<fork>true</fork>
<mainClass>${start-class}</mainClass>
</configuration>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
</configuration>
</plugin>
<plugin>
<groupId>com.spotify</groupId>
<artifactId>dockerfile-maven-plugin</artifactId>
<version>${dockerfile-maven-version}</version>
<configuration>
<repository>reservation-service</repository>
<tag>${project.version}</tag>
<buildArgs>
<JAR_FILE>${project.build.finalName}.jar</JAR_FILE>
</buildArgs>
</configuration>
</plugin>
</plugins>
</build>
</project>
В исходном коде есть несколько ключевых классов Java, которые мы могли бы также изучить:
Определение класса сущностей, используемое в HTTP API, которое представляет тип данных, которые мы сохраняем в Cassandra и публикуем в теме Kafka reservation-service/src/main/java/dev/cassandraguide/model/Reservation.java
:
/*
* Copyright (C) 2017-2020 Jeff Carpenter
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dev.cassandraguide.model;
import java.io.Serializable;
import java.time.LocalDate;
import java.util.UUID;
/**
* Entity working with Reservation on Cassandra.
*
* @author Jeff Carpenter
*/
public class Reservation implements Serializable {
/** Serial. */
private static final long serialVersionUID = -3392237616280919281L;
/** Hotel identifier, as Text not UUID (for simplicity). */
private String hotelId;
/** Formated as YYYY-MM-DD in interfaces. */
private LocalDate startDate;
/** Formated as YYYY-MM-DD in interfaces. */
private LocalDate endDate;
/** Room number. */
private short roomNumber;
/** UUID. */
private UUID guestId;
/** Confirmation for this Reservation. */
private String confirmationNumber;
/**
* Default constructor
*/
public Reservation() {
}
/**
* Default constructor
*/
public Reservation(ReservationRequest form) {
setStartDate(form.getStartDate());
setEndDate(form.getEndDate());
setHotelId(form.getHotelId());
setGuestId(form.getGuestId());
setRoomNumber(form.getRoomNumber());
}
/**
* Default constructor
*/
public Reservation(ReservationRequest form, String confirmationNumber) {
this(form);
this.confirmationNumber = confirmationNumber;
}
/**
* Getter accessor for attribute 'hotelId'.
*
* @return
* current value of 'hotelId'
*/
public String getHotelId() {
return hotelId;
}
/**
* Setter accessor for attribute 'hotelId'.
* @param hotelId
* new value for 'hotelId '
*/
public void setHotelId(String hotelId) {
this.hotelId = hotelId;
}
/**
* Getter accessor for attribute 'startDate'.
*
* @return
* current value of 'startDate'
*/
public LocalDate getStartDate() {
return startDate;
}
/**
* Setter accessor for attribute 'startDate'.
* @param startDate
* new value for 'startDate '
*/
public void setStartDate(LocalDate startDate) {
this.startDate = startDate;
}
/**
* Getter accessor for attribute 'endDate'.
*
* @return
* current value of 'endDate'
*/
public LocalDate getEndDate() {
return endDate;
}
/**
* Setter accessor for attribute 'endDate'.
* @param endDate
* new value for 'endDate '
*/
public void setEndDate(LocalDate endDate) {
this.endDate = endDate;
}
/**
* Getter accessor for attribute 'roomNumber'.
*
* @return
* current value of 'roomNumber'
*/
public short getRoomNumber() {
return roomNumber;
}
/**
* Setter accessor for attribute 'roomNumber'.
* @param roomNumber
* new value for 'roomNumber '
*/
public void setRoomNumber(short roomNumber) {
this.roomNumber = roomNumber;
}
/**
* Getter accessor for attribute 'guestId'.
*
* @return
* current value of 'guestId'
*/
public UUID getGuestId() {
return guestId;
}
/**
* Setter accessor for attribute 'guestId'.
* @param guestId
* new value for 'guestId '
*/
public void setGuestId(UUID guestId) {
this.guestId = guestId;
}
/**
* Getter accessor for attribute 'confirmationNumber'.
*
* @return
* current value of 'confirmationNumber'
*/
public String getConfirmationNumber() {
return confirmationNumber;
}
/**
* Setter accessor for attribute 'confirmationNumber'.
* @param confirmationNumber
* new value for 'confirmationNumber '
*/
public void setConfirmationNumber(String confirmationNumber) {
this.confirmationNumber = confirmationNumber;
}
/** {@inheritDoc} */
@Override
public String toString() {
return "Confirmation Number = " + confirmationNumber +
", Hotel ID: " + getHotelId() +
", Start Date = " + getStartDate() +
", End Date = " + getEndDate() +
", Room Number = " + getRoomNumber() +
", Guest ID = " + getGuestId();
}
}
Логика хранения данных сервиса, где мы будем выполнять большую часть нашей работы reservation-service/src/main/java/dev/cassandraguide/repository/ReservationRepository.java
:
ReservationRepository.java
/*
* Copyright (C) 2017-2020 Jeff Carpenter
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dev.cassandraguide.repository;
import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.bindMarker;
import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.deleteFrom;
import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.selectFrom;
import static com.datastax.oss.driver.api.querybuilder.SchemaBuilder.createTable;
import static com.datastax.oss.driver.api.querybuilder.SchemaBuilder.createType;
import static com.datastax.oss.driver.api.querybuilder.relation.Relation.column;
import java.time.LocalDate;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;
import javax.annotation.PreDestroy;
import com.fasterxml.jackson.core.JsonProcessingException;
import dev.cassandraguide.model.Reservation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Profile;
import org.springframework.lang.NonNull;
import org.springframework.stereotype.Repository;
import com.datastax.oss.driver.api.core.CqlIdentifier;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.BatchStatement;
import com.datastax.oss.driver.api.core.cql.BoundStatement;
import com.datastax.oss.driver.api.core.cql.DefaultBatchType;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.datastax.oss.driver.api.core.cql.Row;
import com.datastax.oss.driver.api.core.metadata.schema.ClusteringOrder;
import com.datastax.oss.driver.api.core.type.DataTypes;
import com.datastax.oss.driver.api.core.type.UserDefinedType;
import com.datastax.oss.driver.api.querybuilder.QueryBuilder;
// TODO: Review imports for publishing to Kafka
import org.apache.kafka.clients.producer.*;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
/**
* The goal of this project is to provide a minimally functional implementation of a microservice
* that uses Apache Cassandra for its data storage. The reservation service is implemented as a
* RESTful service using Spring Boot.
*
* @author Jeff Carpenter, Cedrick Lunven
*/
@Repository
@Profile("!unit-test") // When I do some 'unit-test' no connectivity to DB
public class ReservationRepository {
/** Logger for the class. */
private static final Logger logger = LoggerFactory.getLogger(ReservationRepository.class);
// Reservation Schema Constants
public static final CqlIdentifier TYPE_ADDRESS = CqlIdentifier.fromCql("address");
public static final CqlIdentifier TABLE_RESERVATION_BY_HOTEL_DATE =
CqlIdentifier.fromCql("reservations_by_hotel_date");
public static final CqlIdentifier TABLE_RESERVATION_BY_CONFI = CqlIdentifier.fromCql("reservations_by_confirmation");
public static final CqlIdentifier TABLE_RESERVATION_BY_GUEST = CqlIdentifier.fromCql("reservations_by_guest");
public static final CqlIdentifier TABLE_GUESTS = CqlIdentifier.fromCql("guests");
public static final CqlIdentifier STREET = CqlIdentifier.fromCql("street");
public static final CqlIdentifier CITY = CqlIdentifier.fromCql("city");
public static final CqlIdentifier STATE_PROVINCE = CqlIdentifier.fromCql("state_or_province");
public static final CqlIdentifier POSTAL_CODE = CqlIdentifier.fromCql("postal_code");
public static final CqlIdentifier COUNTRY = CqlIdentifier.fromCql("country");
public static final CqlIdentifier HOTEL_ID = CqlIdentifier.fromCql("hotel_id");
public static final CqlIdentifier START_DATE = CqlIdentifier.fromCql("start_date");
public static final CqlIdentifier END_DATE = CqlIdentifier.fromCql("end_date");
public static final CqlIdentifier ROOM_NUMBER = CqlIdentifier.fromCql("room_number");
public static final CqlIdentifier CONFIRM_NUMBER = CqlIdentifier.fromCql("confirm_number");
public static final CqlIdentifier GUEST_ID = CqlIdentifier.fromCql("guest_id");
public static final CqlIdentifier GUEST_LAST_NAME = CqlIdentifier.fromCql("guest_last_name");
public static final CqlIdentifier FIRSTNAME = CqlIdentifier.fromCql("first_name");
public static final CqlIdentifier LASTNAME = CqlIdentifier.fromCql("last_name");
public static final CqlIdentifier TITLE = CqlIdentifier.fromCql("title");
public static final CqlIdentifier EMAILS = CqlIdentifier.fromCql("emails");
public static final CqlIdentifier PHONE_NUMBERS = CqlIdentifier.fromCql("phone_numbers");
public static final CqlIdentifier ADDRESSES = CqlIdentifier.fromCql("addresses");
private PreparedStatement psExistReservation;
private PreparedStatement psFindReservation;
private PreparedStatement psInsertReservationByHotelDate;
private PreparedStatement psInsertReservationByConfirmation;
private PreparedStatement psDeleteReservationByHotelDate;
private PreparedStatement psDeleteReservationByConfirmation;
private PreparedStatement psSearchReservation;
/** CqlSession holding metadata to interact with Cassandra. */
private CqlSession cqlSession;
private CqlIdentifier keyspaceName;
// TODO: Review variables used for publishing to Kafka
/** KafkaProducer for publishing messages to Kafka. */
private KafkaProducer<String, String> kafkaProducer;
private String kafkaTopicName;
private ObjectMapper objectMapper;
/** External Initialization. */
public ReservationRepository(
@NonNull CqlSession cqlSession,
@Qualifier("keyspace") @NonNull CqlIdentifier keyspaceName,
@NonNull KafkaProducer<String, String> kafkaProducer,
@NonNull String kafkaTopicName) {
this.cqlSession = cqlSession;
this.keyspaceName = keyspaceName;
// TODO: Review initialization of objects needed for publishing to Kafka
this.kafkaProducer = kafkaProducer;
this.kafkaTopicName = kafkaTopicName;
objectMapper = new ObjectMapper();
objectMapper.registerModule(new JavaTimeModule());
objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
objectMapper.enable(SerializationFeature.INDENT_OUTPUT);
// Will create tables (if they do not exist)
createReservationTables();
// Prepare Statements of reservation
prepareStatements();
logger.info("Application initialized.");
}
/**
* CqlSession is a stateful object handling TCP connections to nodes in the cluster.
* This operation properly closes sockets when the application is stopped
*/
@PreDestroy
public void cleanup() {
if (null != cqlSession) {
cqlSession.close();
logger.info("+ CqlSession has been successfully closed");
}
}
/**
* Testing existence is useful for building correct semantics in the RESTful API. To evaluate existence find the
* table where confirmation number is partition key which is reservations_by_confirmation
*
* @param confirmationNumber
* unique identifier for confirmation
* @return
* true if the reservation exists, false if it does not
*/
public boolean exists(String confirmationNumber) {
return cqlSession.execute(psExistReservation.bind(confirmationNumber))
.getAvailableWithoutFetching() > 0;
}
/**
* Similar to exists() but maps and parses results.
*
* @param confirmationNumber
* unique identifier for confirmation
* @return
* reservation if present or empty
*/
@NonNull
public Optional<Reservation> findByConfirmationNumber(@NonNull String confirmationNumber) {
ResultSet resultSet = cqlSession.execute(psFindReservation.bind(confirmationNumber));
// Hint: an empty result might not be an error as this method is sometimes used to check whether a
// reservation with this confirmation number exists
Row row = resultSet.one();
if (row == null) {
logger.debug("Unable to load reservation with confirmation number: " + confirmationNumber);
return Optional.empty();
}
// Hint: If there is a result, create a new reservation object and set the values
// Bonus: factor the logic to extract a reservation from a row into a separate method
// (you will reuse it again later in getAllReservations())
return Optional.of(mapRowToReservation(row));
}
/**
* Create new entry in multiple tables for this reservation.
*
* @param reservation
* current reservation object
* @return
* confirmation number for the reservation
*
*/
public String upsert(Reservation reservation) {
Objects.requireNonNull(reservation);
if (null == reservation.getConfirmationNumber()) {
// Generating a new reservation number if none has been provided
reservation.setConfirmationNumber(UUID.randomUUID().toString());
}
// Insert into 'reservations_by_hotel_date'
BoundStatement bsInsertReservationByHotel =
psInsertReservationByHotelDate.bind(reservation.getHotelId(), reservation.getStartDate(),
reservation.getEndDate(), reservation.getRoomNumber(), reservation.getConfirmationNumber(),
reservation.getGuestId());
// Insert into 'reservations_by_confirmation'
BoundStatement bsInsertReservationByConfirmation =
psInsertReservationByConfirmation.bind(reservation.getConfirmationNumber(), reservation.getHotelId(),
reservation.getStartDate(), reservation.getEndDate(), reservation.getRoomNumber(),
reservation.getGuestId());
BatchStatement batchInsertReservation = BatchStatement
.builder(DefaultBatchType.LOGGED)
.addStatement(bsInsertReservationByHotel)
.addStatement(bsInsertReservationByConfirmation)
.build();
cqlSession.execute(batchInsertReservation);
// TODO: Publish message to Kafka containing reservation
try {
String reservationJson = objectMapper.writeValueAsString(reservation);
// HINT: use the constructor ProducerRecord(String topic, K key, V value)
// with the reservation confirmation number as the key, and the JSON string as the value
ProducerRecord<String, String> record = null;
kafkaProducer.send(record);
} catch (Exception e) {
logger.warn("Error publishing reservation message to Kafka: {}", e);
}
return reservation.getConfirmationNumber();
}
/**
* We pick 'reservations_by_confirmation' table to list reservations
* BUT we could have used 'reservations_by_hotel_date' (as no key provided in request)
*
* @return
* list containing all reservations
*/
public List<Reservation> findAll() {
return cqlSession.execute(selectFrom(keyspaceName, TABLE_RESERVATION_BY_CONFI).all().build())
.all() // no paging we retrieve all objects
.stream() // because we are good people
.map(this::mapRowToReservation) // Mapping row as Reservation
.collect(Collectors.toList()); // Back to list objects
}
/**
* Deleting a reservation.
*
* @param confirmationNumber
* unique identifier for confirmation.
*/
public boolean delete(String confirmationNumber) {
// Retrieving entire reservation in order to obtain the attributes we will need to delete from
// reservations_by_hotel_date table
Optional<Reservation> reservationToDelete = this.findByConfirmationNumber(confirmationNumber);
if (reservationToDelete.isPresent()) {
// Delete from 'reservations_by_hotel_date'
Reservation reservation = reservationToDelete.get();
BoundStatement bsDeleteReservationByHotelDate =
psDeleteReservationByHotelDate.bind(reservation.getHotelId(),
reservation.getStartDate(), reservation.getRoomNumber());
// Delete from 'reservations_by_confirmation'
BoundStatement bsDeleteReservationByConfirmation =
psDeleteReservationByConfirmation.bind(confirmationNumber);
BatchStatement batchDeleteReservation = BatchStatement
.builder(DefaultBatchType.LOGGED)
.addStatement(bsDeleteReservationByHotelDate)
.addStatement(bsDeleteReservationByConfirmation)
.build();
cqlSession.execute(batchDeleteReservation);
// TODO: Publish message to Kafka with empty payload to indicate deletion
// HINT: use the constructor ProducerRecord(String topic, K key, V value)
// with the reservation confirmation number as the key, and an empty string as the value
ProducerRecord<String, String> record = null;
kafkaProducer.send(record);
return true;
}
return false;
}
/**
* Search all reservation for an hotel id and LocalDate.
*
* @param hotelId
* hotel identifier
* @param date
* searched Date
* @return
* list of reservations matching the search criteria
*/
public List<Reservation> findByHotelAndDate(String hotelId, LocalDate date) {
Objects.requireNonNull(hotelId);
Objects.requireNonNull(date);
return cqlSession.execute(psSearchReservation.bind(hotelId, date))
.all() // no paging we retrieve all objects
.stream() // because we are good people
.map(this::mapRowToReservation) // Mapping row as Reservation
.collect(Collectors.toList()); // Back to list objects
}
/**
* Utility method to marshal a row as expected Reservation Bean.
*
* @param row
* current row from ResultSet
* @return
* object
*/
private Reservation mapRowToReservation(Row row) {
Reservation reservation = new Reservation();
reservation.setHotelId(row.getString(HOTEL_ID));
reservation.setConfirmationNumber(row.getString(CONFIRM_NUMBER));
reservation.setGuestId(row.getUuid(GUEST_ID));
reservation.setRoomNumber(row.getShort(ROOM_NUMBER));
reservation.setStartDate(row.getLocalDate(START_DATE));
reservation.setEndDate(row.getLocalDate(END_DATE));
return reservation;
}
/**
* Create Keyspace and relevant tables as per defined in 'reservation.cql'
*/
public void createReservationTables() {
/**
* Create TYPE 'Address' if not exists
*
* CREATE TYPE reservation.address (
* street text,
* city text,
* state_or_province text,
* postal_code text,
* country text
* );
*/
cqlSession.execute(
createType(keyspaceName, TYPE_ADDRESS)
.ifNotExists()
.withField(STREET, DataTypes.TEXT)
.withField(CITY, DataTypes.TEXT)
.withField(STATE_PROVINCE, DataTypes.TEXT)
.withField(POSTAL_CODE, DataTypes.TEXT)
.withField(COUNTRY, DataTypes.TEXT)
.build());
logger.debug("+ Type '{}' has been created (if needed)", TYPE_ADDRESS.asInternal());
/**
* CREATE TABLE reservation.reservations_by_hotel_date (
* hotel_id text,
* start_date date,
* end_date date,
* room_number smallint,
* confirm_number text,
* guest_id uuid,
* PRIMARY KEY ((hotel_id, start_date), room_number)
* );
*/
cqlSession.execute(createTable(keyspaceName, TABLE_RESERVATION_BY_HOTEL_DATE)
.ifNotExists()
.withPartitionKey(HOTEL_ID, DataTypes.TEXT)
.withPartitionKey(START_DATE, DataTypes.DATE)
.withClusteringColumn(ROOM_NUMBER, DataTypes.SMALLINT)
.withColumn(END_DATE, DataTypes.DATE)
.withColumn(CONFIRM_NUMBER, DataTypes.TEXT)
.withColumn(GUEST_ID, DataTypes.UUID)
.withClusteringOrder(ROOM_NUMBER, ClusteringOrder.ASC)
.withComment("Q7. Find reservations by hotel and date")
.build());
logger.debug("+ Table '{}' has been created (if needed)", TABLE_RESERVATION_BY_HOTEL_DATE.asInternal());
/**
* CREATE TABLE reservation.reservations_by_confirmation (
* confirm_number text PRIMARY KEY,
* hotel_id text,
* start_date date,
* end_date date,
* room_number smallint,
* guest_id uuid
* );
*/
cqlSession.execute(createTable(keyspaceName, TABLE_RESERVATION_BY_CONFI)
.ifNotExists()
.withPartitionKey(CONFIRM_NUMBER, DataTypes.TEXT)
.withColumn(HOTEL_ID, DataTypes.TEXT)
.withColumn(START_DATE, DataTypes.DATE)
.withColumn(END_DATE, DataTypes.DATE)
.withColumn(ROOM_NUMBER, DataTypes.SMALLINT)
.withColumn(GUEST_ID, DataTypes.UUID)
.build());
logger.debug("+ Table '{}' has been created (if needed)", TABLE_RESERVATION_BY_CONFI.asInternal());
/**
* CREATE TABLE reservation.reservations_by_guest (
* guest_last_name text,
* hotel_id text,
* start_date date,
* end_date date,
* room_number smallint,
* confirm_number text,
* guest_id uuid,
* PRIMARY KEY ((guest_last_name), hotel_id)
* );
*/
cqlSession.execute(createTable(keyspaceName, TABLE_RESERVATION_BY_GUEST)
.ifNotExists()
.withPartitionKey(GUEST_LAST_NAME, DataTypes.TEXT)
.withClusteringColumn(HOTEL_ID, DataTypes.TEXT)
.withColumn(START_DATE, DataTypes.DATE)
.withColumn(END_DATE, DataTypes.DATE)
.withColumn(ROOM_NUMBER, DataTypes.SMALLINT)
.withColumn(CONFIRM_NUMBER, DataTypes.TEXT)
.withColumn(GUEST_ID, DataTypes.UUID)
.withComment("Q8. Find reservations by guest name")
.build());
logger.debug("+ Table '{}' has been created (if needed)", TABLE_RESERVATION_BY_GUEST.asInternal());
/**
* CREATE TABLE reservation.guests (
* guest_id uuid PRIMARY KEY,
* first_name text,
* last_name text,
* title text,
* emails set<text>,
* phone_numbers list<text>,
* addresses map<text, frozen<address>>,
* confirm_number text
* );
*/
UserDefinedType udtAddressType =
cqlSession.getMetadata().getKeyspace(keyspaceName).get() // Retrieving KeySpaceMetadata
.getUserDefinedType(TYPE_ADDRESS).get(); // Looking for UDT (extending DataType)
cqlSession.execute(createTable(keyspaceName, TABLE_GUESTS)
.ifNotExists()
.withPartitionKey(GUEST_ID, DataTypes.UUID)
.withColumn(FIRSTNAME, DataTypes.TEXT)
.withColumn(LASTNAME, DataTypes.TEXT)
.withColumn(TITLE, DataTypes.TEXT)
.withColumn(EMAILS, DataTypes.setOf(DataTypes.TEXT))
.withColumn(PHONE_NUMBERS, DataTypes.listOf(DataTypes.TEXT))
.withColumn(ADDRESSES, DataTypes.mapOf(DataTypes.TEXT, udtAddressType, true))
.withColumn(CONFIRM_NUMBER, DataTypes.TEXT)
.withComment("Q9. Find guest by ID")
.build());
logger.debug("+ Table '{}' has been created (if needed)", TABLE_GUESTS.asInternal());
logger.info("Schema has been successfully initialized.");
}
private void prepareStatements() {
if (psExistReservation == null) {
psExistReservation = cqlSession.prepare(selectFrom(keyspaceName, TABLE_RESERVATION_BY_CONFI).column(CONFIRM_NUMBER)
.where(column(CONFIRM_NUMBER).isEqualTo(bindMarker(CONFIRM_NUMBER)))
.build());
psFindReservation = cqlSession.prepare(
selectFrom(keyspaceName, TABLE_RESERVATION_BY_CONFI).all()
.where(column(CONFIRM_NUMBER).isEqualTo(bindMarker(CONFIRM_NUMBER)))
.build());
psSearchReservation = cqlSession.prepare(
selectFrom(keyspaceName, TABLE_RESERVATION_BY_HOTEL_DATE).all()
.where(column(HOTEL_ID).isEqualTo(bindMarker(HOTEL_ID)))
.where(column(START_DATE).isEqualTo(bindMarker(START_DATE)))
.build());
psDeleteReservationByConfirmation = cqlSession.prepare(
deleteFrom(keyspaceName, TABLE_RESERVATION_BY_CONFI)
.where(column(CONFIRM_NUMBER).isEqualTo(bindMarker(CONFIRM_NUMBER)))
.build());
psDeleteReservationByHotelDate = cqlSession.prepare(
deleteFrom(keyspaceName, TABLE_RESERVATION_BY_HOTEL_DATE)
.where(column(HOTEL_ID).isEqualTo(bindMarker(HOTEL_ID)))
.where(column(START_DATE).isEqualTo(bindMarker(START_DATE)))
.where(column(ROOM_NUMBER).isEqualTo(bindMarker(ROOM_NUMBER)))
.build());
psInsertReservationByHotelDate = cqlSession.prepare(QueryBuilder.insertInto(keyspaceName, TABLE_RESERVATION_BY_HOTEL_DATE)
.value(HOTEL_ID, bindMarker(HOTEL_ID))
.value(START_DATE, bindMarker(START_DATE))
.value(END_DATE, bindMarker(END_DATE))
.value(ROOM_NUMBER, bindMarker(ROOM_NUMBER))
.value(CONFIRM_NUMBER, bindMarker(CONFIRM_NUMBER))
.value(GUEST_ID, bindMarker(GUEST_ID))
.build());
psInsertReservationByConfirmation = cqlSession.prepare(QueryBuilder.insertInto(keyspaceName, TABLE_RESERVATION_BY_CONFI)
.value(CONFIRM_NUMBER, bindMarker(CONFIRM_NUMBER))
.value(HOTEL_ID, bindMarker(HOTEL_ID))
.value(START_DATE, bindMarker(START_DATE))
.value(END_DATE, bindMarker(END_DATE))
.value(ROOM_NUMBER, bindMarker(ROOM_NUMBER))
.value(GUEST_ID, bindMarker(GUEST_ID))
.build());
logger.info("Statements have been successfully prepared.");
}
}
}
Код, который создает подключение к Kafka reservation-service/src/main/java/dev/cassandraguide/conf/KafkaConfiguration.java
:
package dev.cassandraguide.conf;
import dev.cassandraguide.repository.ReservationRepository;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
/**
* Import Configuration from Configuration File
*
* @author Jeff Carpenter
*/
@Configuration
public class KafkaConfiguration {
// Logger
private static final Logger logger = LoggerFactory.getLogger(ReservationRepository.class);
// Bootstrap servers
@Value("${kafka.bootstrap-servers:localhost:9092}")
protected String bootstrapServers;
// Kafka Client ID
@Value("${kafka.client-id:ReservationService}")
protected String clientId = "ReservationService";
// Topic Name
@Value("${kafka.topicName:reservation}")
public String topicName = "reservation";
/**
* Default configuration.
*/
public KafkaConfiguration() {}
/**
* Initialization of Configuration.
*
* @param bootstrapServers
* @param clientId
* @param topicName
*/
public KafkaConfiguration(
String bootstrapServers, String clientId, String topicName) {
super();
this.bootstrapServers = bootstrapServers;
this.clientId = clientId;
this.topicName = topicName;
}
@Bean
public KafkaProducer<String, String> kafkaProducer() {
logger.info("Creating Kafka Producer.");
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
return new KafkaProducer<>(props);
}
/**
* Getter accessor for attribute 'bootstrapServers'.
*
* @return
* current value of 'bootstrapServers'
*/
public String getBootstrapServers() {
return bootstrapServers;
}
/**
* Setter accessor for attribute 'bootstrapServers'.
* @param bootstrapServers
* new value for 'bootstrapServers'
*/
public void setBootstrapServers(String bootstrapServers) {
this.bootstrapServers = bootstrapServers;
}
/**
* Getter accessor for attribute 'clientId'.
*
* @return
* current value of 'clientId'
*/
public String getClientId() {
return clientId;
}
/**
* Setter accessor for attribute 'clientId'.
* @param clientId
* new value for 'clientId '
*/
public void setClientId(String clientId) {
this.clientId = clientId;
}
/**
* Getter accessor for attribute 'topicName'.
*
* @return
* current value of 'topicName'
*/
public String getTopicName() {
return topicName;
}
/**
* Setter accessor for attribute 'topicName'.
* @param topicName
* new value for 'topicName '
*/
public void setTopicName(String topicName) {
this.topicName = topicName;
}
}
Обратите внимание, что служба резервирования использует файл конфигурации для хранения информации о конфигурации Cassandra и Kafka, которую мы можем просмотреть здесь reservation-service/src/main/resources/application.yml
:
# ----------------------------------------------------------
# Spring Boot Config
# ----------------------------------------------------------
spring:
application:
name: Reservation Reservicess
jackson:
serialization:
WRITE_DATES_AS_TIMESTAMPS: false
server:
port: 8080
# ----------------------------------------------------------
# DataStax Enterprise Java Driver Config
# ----------------------------------------------------------
cassandra:
contactPoint: 127.0.0.1
port: 9042
keyspaceName: reservation
localDataCenterName: datacenter1
dropSchema: false
# ----------------------------------------------------------
# Kafka Producer Config
# ----------------------------------------------------------
kafka:
bootstrap-servers: localhost:9092
client-id: ReservationService
topicName: reservation
Таблицы Cassandra для хранения бронирований
В дистрибутив Cassandra входит cqlsh
, оболочка для выдачи команд на CQL. Запустим оболочку:apache-cassandra-4.0-alpha4/bin/cqlsh
Мы должны увидеть приглашение cqlsh>
в терминале.
Как только подсказка будет доступна, мы можем загрузить файл схемы, который мы просмотрели ранее, чтобы создать пространство ключей, содержащее таблицы для резервирования:SOURCE 'reservation-service/src/main/resources/reservation.cql';
Теперь мы можем просмотреть только что созданную схему с помощью команды DESCRIBE
:DESCRIBE KEYSPACE reservation;
Давайте установим это как кейспейс для будущих команд:USE reservation;
Обратите внимание, что существуют три разные таблицы для хранения данных резервирования с похожими столбцами, но с разными первичными ключами.
Наша естественная тенденция как специалистов по моделированию данных заключалась бы в том, чтобы сначала сосредоточиться на разработке таблиц для хранения бронирований и других записей, таких как профили гостей, и только потом начинать думать о запросах, которые будут получать к ним доступ. Но в моделировании данных Cassandra мы начинаем с наших запросов. Эти таблицы предназначены для поддержки запросов, которые определяют, как ваши пользователи будут получать доступ к бронированиям, что приводит к денормализованному дизайну.
Во-первых, таблица reservations_by_confirmation
поддерживает поиск бронирований по уникальному номеру подтверждения, предоставленному клиенту во время бронирования:
CREATE TABLE reservations_by_confirmation
(
confirm_number text,
hotel_id text,
start_date date,
end_date date,
room_number smallint,
guest_id uuid,
PRIMARY KEY (confirm_number) )
Вот пример загрузки строки данных в эту таблицу:
INSERT INTO reservations_by_confirmation (confirm_number, hotel_id, start_date, end_date, room_number, guest_id) VALUES ('RS2G0Z', 'NY456', '2020-06-08', '2020-06-10', 111, 1b4d86f4-ccff-4256-a63d-45c905df2677);
Чтобы получить эту строку позже, мы использовали бы такой запрос:SELECT * FROM reservations_by_confirmation WHERE confirm_number = 'RS2G0Z';
Во-вторых, таблица reservations_by_hotel_date
позволяет персоналу отеля просматривать записи предстоящих бронирований по датам, чтобы получить представление о том, как работает отель, например, когда отель был заполнен или не заполнен:CREATE TABLE reservations_by_hotel_date ( hotel_id text, start_date date, end_date date, room_number smallint, confirm_number text, guest_id uuid, PRIMARY KEY ((hotel_id, start_date), room_number) );
Вот пример загрузки той же брони, которую мы только что загрузили в reservations_by_confirmation
, в эту дополнительную таблицу:INSERT INTO reservations_by_hotel_date (confirm_number, hotel_id, start_date, end_date, room_number, guest_id) VALUES ('RS2G0Z', 'NY456', '2020-06-08', '2020-06-10', 111, 1b4d86f4-ccff-4256-a63d-45c905df2677);
Эта таблица может поддерживать два запроса. Во-первых, поскольку ключ раздела содержит hotel_id и start_date, мы можем получить все бронирования для определенного отеля и даты:SELECT * FROM reservations_by_hotel_date WHERE hotel_id = 'NY456' AND start_date = '2020-06-08';
Также мы можем найти бронь на конкретный номер по дате:SELECT * FROM reservations_by_hotel_date WHERE hotel_id = 'NY456' AND start_date = '2020-06-08' AND room_number = 111;
Точно так же таблицу reservations_by_guest
можно использовать для поиска бронирования по имени гостя для гостя, который забыл свой номер подтверждения. Для целей мы сосредоточимся на таблицах reservations_by_confirmation
и reservations_by_hotel_date
.
Мы заметим, что класс ReservationRepository
использует инструкции Cassandra BATCH для группировки изменений в таблицах reservations_by_confirmation
и reservations_by_hotel_date
, чтобы они выполнялись как одна операция CQL reservation-service/src/main/java/dev/cassandraguide/repository/ReservationRepository.java
:
ReservationRepository.java
/*
* Copyright (C) 2017-2020 Jeff Carpenter
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dev.cassandraguide.repository;
import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.bindMarker;
import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.deleteFrom;
import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.selectFrom;
import static com.datastax.oss.driver.api.querybuilder.SchemaBuilder.createTable;
import static com.datastax.oss.driver.api.querybuilder.SchemaBuilder.createType;
import static com.datastax.oss.driver.api.querybuilder.relation.Relation.column;
import java.time.LocalDate;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;
import javax.annotation.PreDestroy;
import com.fasterxml.jackson.core.JsonProcessingException;
import dev.cassandraguide.model.Reservation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Profile;
import org.springframework.lang.NonNull;
import org.springframework.stereotype.Repository;
import com.datastax.oss.driver.api.core.CqlIdentifier;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.BatchStatement;
import com.datastax.oss.driver.api.core.cql.BoundStatement;
import com.datastax.oss.driver.api.core.cql.DefaultBatchType;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.datastax.oss.driver.api.core.cql.Row;
import com.datastax.oss.driver.api.core.metadata.schema.ClusteringOrder;
import com.datastax.oss.driver.api.core.type.DataTypes;
import com.datastax.oss.driver.api.core.type.UserDefinedType;
import com.datastax.oss.driver.api.querybuilder.QueryBuilder;
// TODO: Review imports for publishing to Kafka
import org.apache.kafka.clients.producer.*;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
/**
* The goal of this project is to provide a minimally functional implementation of a microservice
* that uses Apache Cassandra for its data storage. The reservation service is implemented as a
* RESTful service using Spring Boot.
*
* @author Jeff Carpenter, Cedrick Lunven
*/
@Repository
@Profile("!unit-test") // When I do some 'unit-test' no connectivity to DB
public class ReservationRepository {
/** Logger for the class. */
private static final Logger logger = LoggerFactory.getLogger(ReservationRepository.class);
// Reservation Schema Constants
public static final CqlIdentifier TYPE_ADDRESS = CqlIdentifier.fromCql("address");
public static final CqlIdentifier TABLE_RESERVATION_BY_HOTEL_DATE =
CqlIdentifier.fromCql("reservations_by_hotel_date");
public static final CqlIdentifier TABLE_RESERVATION_BY_CONFI = CqlIdentifier.fromCql("reservations_by_confirmation");
public static final CqlIdentifier TABLE_RESERVATION_BY_GUEST = CqlIdentifier.fromCql("reservations_by_guest");
public static final CqlIdentifier TABLE_GUESTS = CqlIdentifier.fromCql("guests");
public static final CqlIdentifier STREET = CqlIdentifier.fromCql("street");
public static final CqlIdentifier CITY = CqlIdentifier.fromCql("city");
public static final CqlIdentifier STATE_PROVINCE = CqlIdentifier.fromCql("state_or_province");
public static final CqlIdentifier POSTAL_CODE = CqlIdentifier.fromCql("postal_code");
public static final CqlIdentifier COUNTRY = CqlIdentifier.fromCql("country");
public static final CqlIdentifier HOTEL_ID = CqlIdentifier.fromCql("hotel_id");
public static final CqlIdentifier START_DATE = CqlIdentifier.fromCql("start_date");
public static final CqlIdentifier END_DATE = CqlIdentifier.fromCql("end_date");
public static final CqlIdentifier ROOM_NUMBER = CqlIdentifier.fromCql("room_number");
public static final CqlIdentifier CONFIRM_NUMBER = CqlIdentifier.fromCql("confirm_number");
public static final CqlIdentifier GUEST_ID = CqlIdentifier.fromCql("guest_id");
public static final CqlIdentifier GUEST_LAST_NAME = CqlIdentifier.fromCql("guest_last_name");
public static final CqlIdentifier FIRSTNAME = CqlIdentifier.fromCql("first_name");
public static final CqlIdentifier LASTNAME = CqlIdentifier.fromCql("last_name");
public static final CqlIdentifier TITLE = CqlIdentifier.fromCql("title");
public static final CqlIdentifier EMAILS = CqlIdentifier.fromCql("emails");
public static final CqlIdentifier PHONE_NUMBERS = CqlIdentifier.fromCql("phone_numbers");
public static final CqlIdentifier ADDRESSES = CqlIdentifier.fromCql("addresses");
private PreparedStatement psExistReservation;
private PreparedStatement psFindReservation;
private PreparedStatement psInsertReservationByHotelDate;
private PreparedStatement psInsertReservationByConfirmation;
private PreparedStatement psDeleteReservationByHotelDate;
private PreparedStatement psDeleteReservationByConfirmation;
private PreparedStatement psSearchReservation;
/** CqlSession holding metadata to interact with Cassandra. */
private CqlSession cqlSession;
private CqlIdentifier keyspaceName;
// TODO: Review variables used for publishing to Kafka
/** KafkaProducer for publishing messages to Kafka. */
private KafkaProducer<String, String> kafkaProducer;
private String kafkaTopicName;
private ObjectMapper objectMapper;
/** External Initialization. */
public ReservationRepository(
@NonNull CqlSession cqlSession,
@Qualifier("keyspace") @NonNull CqlIdentifier keyspaceName,
@NonNull KafkaProducer<String, String> kafkaProducer,
@NonNull String kafkaTopicName) {
this.cqlSession = cqlSession;
this.keyspaceName = keyspaceName;
// TODO: Review initialization of objects needed for publishing to Kafka
this.kafkaProducer = kafkaProducer;
this.kafkaTopicName = kafkaTopicName;
objectMapper = new ObjectMapper();
objectMapper.registerModule(new JavaTimeModule());
objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
objectMapper.enable(SerializationFeature.INDENT_OUTPUT);
// Will create tables (if they do not exist)
createReservationTables();
// Prepare Statements of reservation
prepareStatements();
logger.info("Application initialized.");
}
/**
* CqlSession is a stateful object handling TCP connections to nodes in the cluster.
* This operation properly closes sockets when the application is stopped
*/
@PreDestroy
public void cleanup() {
if (null != cqlSession) {
cqlSession.close();
logger.info("+ CqlSession has been successfully closed");
}
}
/**
* Testing existence is useful for building correct semantics in the RESTful API. To evaluate existence find the
* table where confirmation number is partition key which is reservations_by_confirmation
*
* @param confirmationNumber
* unique identifier for confirmation
* @return
* true if the reservation exists, false if it does not
*/
public boolean exists(String confirmationNumber) {
return cqlSession.execute(psExistReservation.bind(confirmationNumber))
.getAvailableWithoutFetching() > 0;
}
/**
* Similar to exists() but maps and parses results.
*
* @param confirmationNumber
* unique identifier for confirmation
* @return
* reservation if present or empty
*/
@NonNull
public Optional<Reservation> findByConfirmationNumber(@NonNull String confirmationNumber) {
ResultSet resultSet = cqlSession.execute(psFindReservation.bind(confirmationNumber));
// Hint: an empty result might not be an error as this method is sometimes used to check whether a
// reservation with this confirmation number exists
Row row = resultSet.one();
if (row == null) {
logger.debug("Unable to load reservation with confirmation number: " + confirmationNumber);
return Optional.empty();
}
// Hint: If there is a result, create a new reservation object and set the values
// Bonus: factor the logic to extract a reservation from a row into a separate method
// (you will reuse it again later in getAllReservations())
return Optional.of(mapRowToReservation(row));
}
/**
* Create new entry in multiple tables for this reservation.
*
* @param reservation
* current reservation object
* @return
* confirmation number for the reservation
*
*/
public String upsert(Reservation reservation) {
Objects.requireNonNull(reservation);
if (null == reservation.getConfirmationNumber()) {
// Generating a new reservation number if none has been provided
reservation.setConfirmationNumber(UUID.randomUUID().toString());
}
// Insert into 'reservations_by_hotel_date'
BoundStatement bsInsertReservationByHotel =
psInsertReservationByHotelDate.bind(reservation.getHotelId(), reservation.getStartDate(),
reservation.getEndDate(), reservation.getRoomNumber(), reservation.getConfirmationNumber(),
reservation.getGuestId());
// Insert into 'reservations_by_confirmation'
BoundStatement bsInsertReservationByConfirmation =
psInsertReservationByConfirmation.bind(reservation.getConfirmationNumber(), reservation.getHotelId(),
reservation.getStartDate(), reservation.getEndDate(), reservation.getRoomNumber(),
reservation.getGuestId());
BatchStatement batchInsertReservation = BatchStatement
.builder(DefaultBatchType.LOGGED)
.addStatement(bsInsertReservationByHotel)
.addStatement(bsInsertReservationByConfirmation)
.build();
cqlSession.execute(batchInsertReservation);
// TODO: Publish message to Kafka containing reservation
try {
String reservationJson = objectMapper.writeValueAsString(reservation);
// HINT: use the constructor ProducerRecord(String topic, K key, V value)
// with the reservation confirmation number as the key, and the JSON string as the value
ProducerRecord<String, String> record = null;
kafkaProducer.send(record);
} catch (Exception e) {
logger.warn("Error publishing reservation message to Kafka: {}", e);
}
return reservation.getConfirmationNumber();
}
/**
* We pick 'reservations_by_confirmation' table to list reservations
* BUT we could have used 'reservations_by_hotel_date' (as no key provided in request)
*
* @return
* list containing all reservations
*/
public List<Reservation> findAll() {
return cqlSession.execute(selectFrom(keyspaceName, TABLE_RESERVATION_BY_CONFI).all().build())
.all() // no paging we retrieve all objects
.stream() // because we are good people
.map(this::mapRowToReservation) // Mapping row as Reservation
.collect(Collectors.toList()); // Back to list objects
}
/**
* Deleting a reservation.
*
* @param confirmationNumber
* unique identifier for confirmation.
*/
public boolean delete(String confirmationNumber) {
// Retrieving entire reservation in order to obtain the attributes we will need to delete from
// reservations_by_hotel_date table
Optional<Reservation> reservationToDelete = this.findByConfirmationNumber(confirmationNumber);
if (reservationToDelete.isPresent()) {
// Delete from 'reservations_by_hotel_date'
Reservation reservation = reservationToDelete.get();
BoundStatement bsDeleteReservationByHotelDate =
psDeleteReservationByHotelDate.bind(reservation.getHotelId(),
reservation.getStartDate(), reservation.getRoomNumber());
// Delete from 'reservations_by_confirmation'
BoundStatement bsDeleteReservationByConfirmation =
psDeleteReservationByConfirmation.bind(confirmationNumber);
BatchStatement batchDeleteReservation = BatchStatement
.builder(DefaultBatchType.LOGGED)
.addStatement(bsDeleteReservationByHotelDate)
.addStatement(bsDeleteReservationByConfirmation)
.build();
cqlSession.execute(batchDeleteReservation);
// TODO: Publish message to Kafka with empty payload to indicate deletion
// HINT: use the constructor ProducerRecord(String topic, K key, V value)
// with the reservation confirmation number as the key, and an empty string as the value
ProducerRecord<String, String> record = null;
kafkaProducer.send(record);
return true;
}
return false;
}
/**
* Search all reservation for an hotel id and LocalDate.
*
* @param hotelId
* hotel identifier
* @param date
* searched Date
* @return
* list of reservations matching the search criteria
*/
public List<Reservation> findByHotelAndDate(String hotelId, LocalDate date) {
Objects.requireNonNull(hotelId);
Objects.requireNonNull(date);
return cqlSession.execute(psSearchReservation.bind(hotelId, date))
.all() // no paging we retrieve all objects
.stream() // because we are good people
.map(this::mapRowToReservation) // Mapping row as Reservation
.collect(Collectors.toList()); // Back to list objects
}
/**
* Utility method to marshal a row as expected Reservation Bean.
*
* @param row
* current row from ResultSet
* @return
* object
*/
private Reservation mapRowToReservation(Row row) {
Reservation reservation = new Reservation();
reservation.setHotelId(row.getString(HOTEL_ID));
reservation.setConfirmationNumber(row.getString(CONFIRM_NUMBER));
reservation.setGuestId(row.getUuid(GUEST_ID));
reservation.setRoomNumber(row.getShort(ROOM_NUMBER));
reservation.setStartDate(row.getLocalDate(START_DATE));
reservation.setEndDate(row.getLocalDate(END_DATE));
return reservation;
}
/**
* Create Keyspace and relevant tables as per defined in 'reservation.cql'
*/
public void createReservationTables() {
/**
* Create TYPE 'Address' if not exists
*
* CREATE TYPE reservation.address (
* street text,
* city text,
* state_or_province text,
* postal_code text,
* country text
* );
*/
cqlSession.execute(
createType(keyspaceName, TYPE_ADDRESS)
.ifNotExists()
.withField(STREET, DataTypes.TEXT)
.withField(CITY, DataTypes.TEXT)
.withField(STATE_PROVINCE, DataTypes.TEXT)
.withField(POSTAL_CODE, DataTypes.TEXT)
.withField(COUNTRY, DataTypes.TEXT)
.build());
logger.debug("+ Type '{}' has been created (if needed)", TYPE_ADDRESS.asInternal());
/**
* CREATE TABLE reservation.reservations_by_hotel_date (
* hotel_id text,
* start_date date,
* end_date date,
* room_number smallint,
* confirm_number text,
* guest_id uuid,
* PRIMARY KEY ((hotel_id, start_date), room_number)
* );
*/
cqlSession.execute(createTable(keyspaceName, TABLE_RESERVATION_BY_HOTEL_DATE)
.ifNotExists()
.withPartitionKey(HOTEL_ID, DataTypes.TEXT)
.withPartitionKey(START_DATE, DataTypes.DATE)
.withClusteringColumn(ROOM_NUMBER, DataTypes.SMALLINT)
.withColumn(END_DATE, DataTypes.DATE)
.withColumn(CONFIRM_NUMBER, DataTypes.TEXT)
.withColumn(GUEST_ID, DataTypes.UUID)
.withClusteringOrder(ROOM_NUMBER, ClusteringOrder.ASC)
.withComment("Q7. Find reservations by hotel and date")
.build());
logger.debug("+ Table '{}' has been created (if needed)", TABLE_RESERVATION_BY_HOTEL_DATE.asInternal());
/**
* CREATE TABLE reservation.reservations_by_confirmation (
* confirm_number text PRIMARY KEY,
* hotel_id text,
* start_date date,
* end_date date,
* room_number smallint,
* guest_id uuid
* );
*/
cqlSession.execute(createTable(keyspaceName, TABLE_RESERVATION_BY_CONFI)
.ifNotExists()
.withPartitionKey(CONFIRM_NUMBER, DataTypes.TEXT)
.withColumn(HOTEL_ID, DataTypes.TEXT)
.withColumn(START_DATE, DataTypes.DATE)
.withColumn(END_DATE, DataTypes.DATE)
.withColumn(ROOM_NUMBER, DataTypes.SMALLINT)
.withColumn(GUEST_ID, DataTypes.UUID)
.build());
logger.debug("+ Table '{}' has been created (if needed)", TABLE_RESERVATION_BY_CONFI.asInternal());
/**
* CREATE TABLE reservation.reservations_by_guest (
* guest_last_name text,
* hotel_id text,
* start_date date,
* end_date date,
* room_number smallint,
* confirm_number text,
* guest_id uuid,
* PRIMARY KEY ((guest_last_name), hotel_id)
* );
*/
cqlSession.execute(createTable(keyspaceName, TABLE_RESERVATION_BY_GUEST)
.ifNotExists()
.withPartitionKey(GUEST_LAST_NAME, DataTypes.TEXT)
.withClusteringColumn(HOTEL_ID, DataTypes.TEXT)
.withColumn(START_DATE, DataTypes.DATE)
.withColumn(END_DATE, DataTypes.DATE)
.withColumn(ROOM_NUMBER, DataTypes.SMALLINT)
.withColumn(CONFIRM_NUMBER, DataTypes.TEXT)
.withColumn(GUEST_ID, DataTypes.UUID)
.withComment("Q8. Find reservations by guest name")
.build());
logger.debug("+ Table '{}' has been created (if needed)", TABLE_RESERVATION_BY_GUEST.asInternal());
/**
* CREATE TABLE reservation.guests (
* guest_id uuid PRIMARY KEY,
* first_name text,
* last_name text,
* title text,
* emails set<text>,
* phone_numbers list<text>,
* addresses map<text, frozen<address>>,
* confirm_number text
* );
*/
UserDefinedType udtAddressType =
cqlSession.getMetadata().getKeyspace(keyspaceName).get() // Retrieving KeySpaceMetadata
.getUserDefinedType(TYPE_ADDRESS).get(); // Looking for UDT (extending DataType)
cqlSession.execute(createTable(keyspaceName, TABLE_GUESTS)
.ifNotExists()
.withPartitionKey(GUEST_ID, DataTypes.UUID)
.withColumn(FIRSTNAME, DataTypes.TEXT)
.withColumn(LASTNAME, DataTypes.TEXT)
.withColumn(TITLE, DataTypes.TEXT)
.withColumn(EMAILS, DataTypes.setOf(DataTypes.TEXT))
.withColumn(PHONE_NUMBERS, DataTypes.listOf(DataTypes.TEXT))
.withColumn(ADDRESSES, DataTypes.mapOf(DataTypes.TEXT, udtAddressType, true))
.withColumn(CONFIRM_NUMBER, DataTypes.TEXT)
.withComment("Q9. Find guest by ID")
.build());
logger.debug("+ Table '{}' has been created (if needed)", TABLE_GUESTS.asInternal());
logger.info("Schema has been successfully initialized.");
}
private void prepareStatements() {
if (psExistReservation == null) {
psExistReservation = cqlSession.prepare(selectFrom(keyspaceName, TABLE_RESERVATION_BY_CONFI).column(CONFIRM_NUMBER)
.where(column(CONFIRM_NUMBER).isEqualTo(bindMarker(CONFIRM_NUMBER)))
.build());
psFindReservation = cqlSession.prepare(
selectFrom(keyspaceName, TABLE_RESERVATION_BY_CONFI).all()
.where(column(CONFIRM_NUMBER).isEqualTo(bindMarker(CONFIRM_NUMBER)))
.build());
psSearchReservation = cqlSession.prepare(
selectFrom(keyspaceName, TABLE_RESERVATION_BY_HOTEL_DATE).all()
.where(column(HOTEL_ID).isEqualTo(bindMarker(HOTEL_ID)))
.where(column(START_DATE).isEqualTo(bindMarker(START_DATE)))
.build());
psDeleteReservationByConfirmation = cqlSession.prepare(
deleteFrom(keyspaceName, TABLE_RESERVATION_BY_CONFI)
.where(column(CONFIRM_NUMBER).isEqualTo(bindMarker(CONFIRM_NUMBER)))
.build());
psDeleteReservationByHotelDate = cqlSession.prepare(
deleteFrom(keyspaceName, TABLE_RESERVATION_BY_HOTEL_DATE)
.where(column(HOTEL_ID).isEqualTo(bindMarker(HOTEL_ID)))
.where(column(START_DATE).isEqualTo(bindMarker(START_DATE)))
.where(column(ROOM_NUMBER).isEqualTo(bindMarker(ROOM_NUMBER)))
.build());
psInsertReservationByHotelDate = cqlSession.prepare(QueryBuilder.insertInto(keyspaceName, TABLE_RESERVATION_BY_HOTEL_DATE)
.value(HOTEL_ID, bindMarker(HOTEL_ID))
.value(START_DATE, bindMarker(START_DATE))
.value(END_DATE, bindMarker(END_DATE))
.value(ROOM_NUMBER, bindMarker(ROOM_NUMBER))
.value(CONFIRM_NUMBER, bindMarker(CONFIRM_NUMBER))
.value(GUEST_ID, bindMarker(GUEST_ID))
.build());
psInsertReservationByConfirmation = cqlSession.prepare(QueryBuilder.insertInto(keyspaceName, TABLE_RESERVATION_BY_CONFI)
.value(CONFIRM_NUMBER, bindMarker(CONFIRM_NUMBER))
.value(HOTEL_ID, bindMarker(HOTEL_ID))
.value(START_DATE, bindMarker(START_DATE))
.value(END_DATE, bindMarker(END_DATE))
.value(ROOM_NUMBER, bindMarker(ROOM_NUMBER))
.value(GUEST_ID, bindMarker(GUEST_ID))
.build());
logger.info("Statements have been successfully prepared.");
}
}
}
Теперь мы закончили использовать cqlsh
, давайте выйдем:
Exit
Устанавливаем и запускаем Apache Kafka
Теперь давайте сосредоточимся на расширении службы резервирования, чтобы она публиковала события в Apache Kafka.
Сначала скачиваем и устанавливаем Kafka:wget
http://archive.apache.org/dist/kafka/2.5.0/kafka_2.12-2.5.0.tgztar xzf kafka_2.12-2.5.0.tgz
Теперь у вас есть папка в вашей файловой системе с именем _kafka_x.x-x.x.x_
(на основе версий Scala и Kafka соответственно).
Чтобы запустить кластер Kafka с одним узлом, вам нужно запустить два процесса: Zookeeper и брокер Kafka. Kafka использует Zookeeper для управления кластером. Он работает вместе с каждым брокером и гарантирует, что брокер может взаимодействовать с кластером. Zookeeper должен быть запущен, чтобы брокер мог получать или передавать сообщения. Запускаем Zookeeper командой:kafka_2.12-2.5.0/bin/zookeeper-server-start.sh kafka_2.12-2.5.0/config/zookeeper.properties &> zookeeper_start.log &
Далее запускаем брокера Kafka:kafka_2.12-2.5.0/bin/kafka-server-start.sh kafka_2.12-2.5.0/config/server.properties &> kafka_start.log &
Создаем Kafka Topic и публикуем сообщения
Kafka поддерживает стиль обмена сообщениями публикации и подписки, при котором сообщения публикуются в темах в формате ключ-значение. Подобно Cassandra, Kafka разделяет данные с помощью ключа и реплицирует данные между несколькими узлами, известными как брокеры в Kafka.
Теперь давайте создадим топик Kafka для данных бронирования:
kafka_2.12-2.5.0/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic reservation --config retention.ms=-1
Далее мы обновим службу резервирования, чтобы публиковать ее в разделе Kafka о бронировании каждый раз, когда резервирование изменяется или удаляется. Мы будем вводить элементы API понемногу, и сначала мы получим опыт работы с ними в jshell, Java REPL, прежде чем обновлять фактический код в службе бронирования:cd reservation-service mvn -q compile com.github.johnpoth:jshell-maven-plugin:1.3:run
Давайте создадим KafkaProducer, который мы можем использовать для публикации сообщений в топике Kafka:
import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ProducerConfig.CLIENT_ID_CONFIG, "ReservationService"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); KafkaProducer<String, String> producer = new KafkaProducer<>(props);
Давайте создадим объект Reservation, используя класс, который служба Reservation использует для представления обрабатываемого нами бронирования:import java.time.LocalDate; import java.util.UUID; import dev.cassandraguide.model.Reservation; String confirmationNumber = "RS2G0Z" Reservation reservation = new Reservation(); reservation.setConfirmationNumber(confirmationNumber); reservation.setStartDate(LocalDate.now()); reservation.setEndDate(LocalDate.now().plusDays(2)); reservation.setHotelId("NY456"); reservation.setGuestId(UUID.fromString("1b4d86f4-ccff-4256-a63d-45c905df2677")); reservation.setRoomNumber((short)111);
Теперь сериализуйте этот класс в строку JSON, используя Jackson ObjectMapper:
Параметры, выбранные в objectMapper, предназначены для упрощения форматирования даты и красивой печати JSON. Чтобы увидеть результирующую строку, мы выполняем:System.out.println(reservationJson)
;
Теперь мы можем опубликовать сообщение в топике бронирования Kafka, используя confirmNumber
в качестве ключа и reservationJson
в качестве полезной нагрузки:
ProducerRecord<String, String> record = new ProducerRecord<>("reservation", confirmationNumber, reservationJson); producer.send(record);
Выйдем из jshell
: exit
Давайте воспользуемся простым консьюмером командной строки, чтобы прочитать сообщение, которое мы только что опубликовали:cd kafka_2.12-2.5.0/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic reservation --max-messages 10
Выйти из консьюмера можно с помощью Ctrl+C.
Обновление службы бронирования для публикации в Kafka
Теперь, когда у нас есть некоторый опыт публикации в Kafka, давайте обновим службу резервирования, чтобы публиковать события в Kafka при изменении или удалении резервирования. Взглянем на класс ReservationRepository
:
ReservationRepository.java
/*
* Copyright (C) 2017-2020 Jeff Carpenter
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dev.cassandraguide.repository;
import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.bindMarker;
import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.deleteFrom;
import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.selectFrom;
import static com.datastax.oss.driver.api.querybuilder.SchemaBuilder.createTable;
import static com.datastax.oss.driver.api.querybuilder.SchemaBuilder.createType;
import static com.datastax.oss.driver.api.querybuilder.relation.Relation.column;
import java.time.LocalDate;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;
import javax.annotation.PreDestroy;
import com.fasterxml.jackson.core.JsonProcessingException;
import dev.cassandraguide.model.Reservation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Profile;
import org.springframework.lang.NonNull;
import org.springframework.stereotype.Repository;
import com.datastax.oss.driver.api.core.CqlIdentifier;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.BatchStatement;
import com.datastax.oss.driver.api.core.cql.BoundStatement;
import com.datastax.oss.driver.api.core.cql.DefaultBatchType;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.datastax.oss.driver.api.core.cql.Row;
import com.datastax.oss.driver.api.core.metadata.schema.ClusteringOrder;
import com.datastax.oss.driver.api.core.type.DataTypes;
import com.datastax.oss.driver.api.core.type.UserDefinedType;
import com.datastax.oss.driver.api.querybuilder.QueryBuilder;
// TODO: Review imports for publishing to Kafka
import org.apache.kafka.clients.producer.*;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
/**
* The goal of this project is to provide a minimally functional implementation of a microservice
* that uses Apache Cassandra for its data storage. The reservation service is implemented as a
* RESTful service using Spring Boot.
*
* @author Jeff Carpenter, Cedrick Lunven
*/
@Repository
@Profile("!unit-test") // When I do some 'unit-test' no connectivity to DB
public class ReservationRepository {
/** Logger for the class. */
private static final Logger logger = LoggerFactory.getLogger(ReservationRepository.class);
// Reservation Schema Constants
public static final CqlIdentifier TYPE_ADDRESS = CqlIdentifier.fromCql("address");
public static final CqlIdentifier TABLE_RESERVATION_BY_HOTEL_DATE =
CqlIdentifier.fromCql("reservations_by_hotel_date");
public static final CqlIdentifier TABLE_RESERVATION_BY_CONFI = CqlIdentifier.fromCql("reservations_by_confirmation");
public static final CqlIdentifier TABLE_RESERVATION_BY_GUEST = CqlIdentifier.fromCql("reservations_by_guest");
public static final CqlIdentifier TABLE_GUESTS = CqlIdentifier.fromCql("guests");
public static final CqlIdentifier STREET = CqlIdentifier.fromCql("street");
public static final CqlIdentifier CITY = CqlIdentifier.fromCql("city");
public static final CqlIdentifier STATE_PROVINCE = CqlIdentifier.fromCql("state_or_province");
public static final CqlIdentifier POSTAL_CODE = CqlIdentifier.fromCql("postal_code");
public static final CqlIdentifier COUNTRY = CqlIdentifier.fromCql("country");
public static final CqlIdentifier HOTEL_ID = CqlIdentifier.fromCql("hotel_id");
public static final CqlIdentifier START_DATE = CqlIdentifier.fromCql("start_date");
public static final CqlIdentifier END_DATE = CqlIdentifier.fromCql("end_date");
public static final CqlIdentifier ROOM_NUMBER = CqlIdentifier.fromCql("room_number");
public static final CqlIdentifier CONFIRM_NUMBER = CqlIdentifier.fromCql("confirm_number");
public static final CqlIdentifier GUEST_ID = CqlIdentifier.fromCql("guest_id");
public static final CqlIdentifier GUEST_LAST_NAME = CqlIdentifier.fromCql("guest_last_name");
public static final CqlIdentifier FIRSTNAME = CqlIdentifier.fromCql("first_name");
public static final CqlIdentifier LASTNAME = CqlIdentifier.fromCql("last_name");
public static final CqlIdentifier TITLE = CqlIdentifier.fromCql("title");
public static final CqlIdentifier EMAILS = CqlIdentifier.fromCql("emails");
public static final CqlIdentifier PHONE_NUMBERS = CqlIdentifier.fromCql("phone_numbers");
public static final CqlIdentifier ADDRESSES = CqlIdentifier.fromCql("addresses");
private PreparedStatement psExistReservation;
private PreparedStatement psFindReservation;
private PreparedStatement psInsertReservationByHotelDate;
private PreparedStatement psInsertReservationByConfirmation;
private PreparedStatement psDeleteReservationByHotelDate;
private PreparedStatement psDeleteReservationByConfirmation;
private PreparedStatement psSearchReservation;
/** CqlSession holding metadata to interact with Cassandra. */
private CqlSession cqlSession;
private CqlIdentifier keyspaceName;
// TODO: Review variables used for publishing to Kafka
/** KafkaProducer for publishing messages to Kafka. */
private KafkaProducer<String, String> kafkaProducer;
private String kafkaTopicName;
private ObjectMapper objectMapper;
/** External Initialization. */
public ReservationRepository(
@NonNull CqlSession cqlSession,
@Qualifier("keyspace") @NonNull CqlIdentifier keyspaceName,
@NonNull KafkaProducer<String, String> kafkaProducer,
@NonNull String kafkaTopicName) {
this.cqlSession = cqlSession;
this.keyspaceName = keyspaceName;
// TODO: Review initialization of objects needed for publishing to Kafka
this.kafkaProducer = kafkaProducer;
this.kafkaTopicName = kafkaTopicName;
objectMapper = new ObjectMapper();
objectMapper.registerModule(new JavaTimeModule());
objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
objectMapper.enable(SerializationFeature.INDENT_OUTPUT);
// Will create tables (if they do not exist)
createReservationTables();
// Prepare Statements of reservation
prepareStatements();
logger.info("Application initialized.");
}
/**
* CqlSession is a stateful object handling TCP connections to nodes in the cluster.
* This operation properly closes sockets when the application is stopped
*/
@PreDestroy
public void cleanup() {
if (null != cqlSession) {
cqlSession.close();
logger.info("+ CqlSession has been successfully closed");
}
}
/**
* Testing existence is useful for building correct semantics in the RESTful API. To evaluate existence find the
* table where confirmation number is partition key which is reservations_by_confirmation
*
* @param confirmationNumber
* unique identifier for confirmation
* @return
* true if the reservation exists, false if it does not
*/
public boolean exists(String confirmationNumber) {
return cqlSession.execute(psExistReservation.bind(confirmationNumber))
.getAvailableWithoutFetching() > 0;
}
/**
* Similar to exists() but maps and parses results.
*
* @param confirmationNumber
* unique identifier for confirmation
* @return
* reservation if present or empty
*/
@NonNull
public Optional<Reservation> findByConfirmationNumber(@NonNull String confirmationNumber) {
ResultSet resultSet = cqlSession.execute(psFindReservation.bind(confirmationNumber));
// Hint: an empty result might not be an error as this method is sometimes used to check whether a
// reservation with this confirmation number exists
Row row = resultSet.one();
if (row == null) {
logger.debug("Unable to load reservation with confirmation number: " + confirmationNumber);
return Optional.empty();
}
// Hint: If there is a result, create a new reservation object and set the values
// Bonus: factor the logic to extract a reservation from a row into a separate method
// (you will reuse it again later in getAllReservations())
return Optional.of(mapRowToReservation(row));
}
/**
* Create new entry in multiple tables for this reservation.
*
* @param reservation
* current reservation object
* @return
* confirmation number for the reservation
*
*/
public String upsert(Reservation reservation) {
Objects.requireNonNull(reservation);
if (null == reservation.getConfirmationNumber()) {
// Generating a new reservation number if none has been provided
reservation.setConfirmationNumber(UUID.randomUUID().toString());
}
// Insert into 'reservations_by_hotel_date'
BoundStatement bsInsertReservationByHotel =
psInsertReservationByHotelDate.bind(reservation.getHotelId(), reservation.getStartDate(),
reservation.getEndDate(), reservation.getRoomNumber(), reservation.getConfirmationNumber(),
reservation.getGuestId());
// Insert into 'reservations_by_confirmation'
BoundStatement bsInsertReservationByConfirmation =
psInsertReservationByConfirmation.bind(reservation.getConfirmationNumber(), reservation.getHotelId(),
reservation.getStartDate(), reservation.getEndDate(), reservation.getRoomNumber(),
reservation.getGuestId());
BatchStatement batchInsertReservation = BatchStatement
.builder(DefaultBatchType.LOGGED)
.addStatement(bsInsertReservationByHotel)
.addStatement(bsInsertReservationByConfirmation)
.build();
cqlSession.execute(batchInsertReservation);
// TODO: Publish message to Kafka containing reservation
try {
String reservationJson = objectMapper.writeValueAsString(reservation);
// HINT: use the constructor ProducerRecord(String topic, K key, V value)
// with the reservation confirmation number as the key, and the JSON string as the value
ProducerRecord<String, String> record = null;
kafkaProducer.send(record);
} catch (Exception e) {
logger.warn("Error publishing reservation message to Kafka: {}", e);
}
return reservation.getConfirmationNumber();
}
/**
* We pick 'reservations_by_confirmation' table to list reservations
* BUT we could have used 'reservations_by_hotel_date' (as no key provided in request)
*
* @return
* list containing all reservations
*/
public List<Reservation> findAll() {
return cqlSession.execute(selectFrom(keyspaceName, TABLE_RESERVATION_BY_CONFI).all().build())
.all() // no paging we retrieve all objects
.stream() // because we are good people
.map(this::mapRowToReservation) // Mapping row as Reservation
.collect(Collectors.toList()); // Back to list objects
}
/**
* Deleting a reservation.
*
* @param confirmationNumber
* unique identifier for confirmation.
*/
public boolean delete(String confirmationNumber) {
// Retrieving entire reservation in order to obtain the attributes we will need to delete from
// reservations_by_hotel_date table
Optional<Reservation> reservationToDelete = this.findByConfirmationNumber(confirmationNumber);
if (reservationToDelete.isPresent()) {
// Delete from 'reservations_by_hotel_date'
Reservation reservation = reservationToDelete.get();
BoundStatement bsDeleteReservationByHotelDate =
psDeleteReservationByHotelDate.bind(reservation.getHotelId(),
reservation.getStartDate(), reservation.getRoomNumber());
// Delete from 'reservations_by_confirmation'
BoundStatement bsDeleteReservationByConfirmation =
psDeleteReservationByConfirmation.bind(confirmationNumber);
BatchStatement batchDeleteReservation = BatchStatement
.builder(DefaultBatchType.LOGGED)
.addStatement(bsDeleteReservationByHotelDate)
.addStatement(bsDeleteReservationByConfirmation)
.build();
cqlSession.execute(batchDeleteReservation);
// TODO: Publish message to Kafka with empty payload to indicate deletion
// HINT: use the constructor ProducerRecord(String topic, K key, V value)
// with the reservation confirmation number as the key, and an empty string as the value
ProducerRecord<String, String> record = null;
kafkaProducer.send(record);
return true;
}
return false;
}
/**
* Search all reservation for an hotel id and LocalDate.
*
* @param hotelId
* hotel identifier
* @param date
* searched Date
* @return
* list of reservations matching the search criteria
*/
public List<Reservation> findByHotelAndDate(String hotelId, LocalDate date) {
Objects.requireNonNull(hotelId);
Objects.requireNonNull(date);
return cqlSession.execute(psSearchReservation.bind(hotelId, date))
.all() // no paging we retrieve all objects
.stream() // because we are good people
.map(this::mapRowToReservation) // Mapping row as Reservation
.collect(Collectors.toList()); // Back to list objects
}
/**
* Utility method to marshal a row as expected Reservation Bean.
*
* @param row
* current row from ResultSet
* @return
* object
*/
private Reservation mapRowToReservation(Row row) {
Reservation reservation = new Reservation();
reservation.setHotelId(row.getString(HOTEL_ID));
reservation.setConfirmationNumber(row.getString(CONFIRM_NUMBER));
reservation.setGuestId(row.getUuid(GUEST_ID));
reservation.setRoomNumber(row.getShort(ROOM_NUMBER));
reservation.setStartDate(row.getLocalDate(START_DATE));
reservation.setEndDate(row.getLocalDate(END_DATE));
return reservation;
}
/**
* Create Keyspace and relevant tables as per defined in 'reservation.cql'
*/
public void createReservationTables() {
/**
* Create TYPE 'Address' if not exists
*
* CREATE TYPE reservation.address (
* street text,
* city text,
* state_or_province text,
* postal_code text,
* country text
* );
*/
cqlSession.execute(
createType(keyspaceName, TYPE_ADDRESS)
.ifNotExists()
.withField(STREET, DataTypes.TEXT)
.withField(CITY, DataTypes.TEXT)
.withField(STATE_PROVINCE, DataTypes.TEXT)
.withField(POSTAL_CODE, DataTypes.TEXT)
.withField(COUNTRY, DataTypes.TEXT)
.build());
logger.debug("+ Type '{}' has been created (if needed)", TYPE_ADDRESS.asInternal());
/**
* CREATE TABLE reservation.reservations_by_hotel_date (
* hotel_id text,
* start_date date,
* end_date date,
* room_number smallint,
* confirm_number text,
* guest_id uuid,
* PRIMARY KEY ((hotel_id, start_date), room_number)
* );
*/
cqlSession.execute(createTable(keyspaceName, TABLE_RESERVATION_BY_HOTEL_DATE)
.ifNotExists()
.withPartitionKey(HOTEL_ID, DataTypes.TEXT)
.withPartitionKey(START_DATE, DataTypes.DATE)
.withClusteringColumn(ROOM_NUMBER, DataTypes.SMALLINT)
.withColumn(END_DATE, DataTypes.DATE)
.withColumn(CONFIRM_NUMBER, DataTypes.TEXT)
.withColumn(GUEST_ID, DataTypes.UUID)
.withClusteringOrder(ROOM_NUMBER, ClusteringOrder.ASC)
.withComment("Q7. Find reservations by hotel and date")
.build());
logger.debug("+ Table '{}' has been created (if needed)", TABLE_RESERVATION_BY_HOTEL_DATE.asInternal());
/**
* CREATE TABLE reservation.reservations_by_confirmation (
* confirm_number text PRIMARY KEY,
* hotel_id text,
* start_date date,
* end_date date,
* room_number smallint,
* guest_id uuid
* );
*/
cqlSession.execute(createTable(keyspaceName, TABLE_RESERVATION_BY_CONFI)
.ifNotExists()
.withPartitionKey(CONFIRM_NUMBER, DataTypes.TEXT)
.withColumn(HOTEL_ID, DataTypes.TEXT)
.withColumn(START_DATE, DataTypes.DATE)
.withColumn(END_DATE, DataTypes.DATE)
.withColumn(ROOM_NUMBER, DataTypes.SMALLINT)
.withColumn(GUEST_ID, DataTypes.UUID)
.build());
logger.debug("+ Table '{}' has been created (if needed)", TABLE_RESERVATION_BY_CONFI.asInternal());
/**
* CREATE TABLE reservation.reservations_by_guest (
* guest_last_name text,
* hotel_id text,
* start_date date,
* end_date date,
* room_number smallint,
* confirm_number text,
* guest_id uuid,
* PRIMARY KEY ((guest_last_name), hotel_id)
* );
*/
cqlSession.execute(createTable(keyspaceName, TABLE_RESERVATION_BY_GUEST)
.ifNotExists()
.withPartitionKey(GUEST_LAST_NAME, DataTypes.TEXT)
.withClusteringColumn(HOTEL_ID, DataTypes.TEXT)
.withColumn(START_DATE, DataTypes.DATE)
.withColumn(END_DATE, DataTypes.DATE)
.withColumn(ROOM_NUMBER, DataTypes.SMALLINT)
.withColumn(CONFIRM_NUMBER, DataTypes.TEXT)
.withColumn(GUEST_ID, DataTypes.UUID)
.withComment("Q8. Find reservations by guest name")
.build());
logger.debug("+ Table '{}' has been created (if needed)", TABLE_RESERVATION_BY_GUEST.asInternal());
/**
* CREATE TABLE reservation.guests (
* guest_id uuid PRIMARY KEY,
* first_name text,
* last_name text,
* title text,
* emails set<text>,
* phone_numbers list<text>,
* addresses map<text, frozen<address>>,
* confirm_number text
* );
*/
UserDefinedType udtAddressType =
cqlSession.getMetadata().getKeyspace(keyspaceName).get() // Retrieving KeySpaceMetadata
.getUserDefinedType(TYPE_ADDRESS).get(); // Looking for UDT (extending DataType)
cqlSession.execute(createTable(keyspaceName, TABLE_GUESTS)
.ifNotExists()
.withPartitionKey(GUEST_ID, DataTypes.UUID)
.withColumn(FIRSTNAME, DataTypes.TEXT)
.withColumn(LASTNAME, DataTypes.TEXT)
.withColumn(TITLE, DataTypes.TEXT)
.withColumn(EMAILS, DataTypes.setOf(DataTypes.TEXT))
.withColumn(PHONE_NUMBERS, DataTypes.listOf(DataTypes.TEXT))
.withColumn(ADDRESSES, DataTypes.mapOf(DataTypes.TEXT, udtAddressType, true))
.withColumn(CONFIRM_NUMBER, DataTypes.TEXT)
.withComment("Q9. Find guest by ID")
.build());
logger.debug("+ Table '{}' has been created (if needed)", TABLE_GUESTS.asInternal());
logger.info("Schema has been successfully initialized.");
}
private void prepareStatements() {
if (psExistReservation == null) {
psExistReservation = cqlSession.prepare(selectFrom(keyspaceName, TABLE_RESERVATION_BY_CONFI).column(CONFIRM_NUMBER)
.where(column(CONFIRM_NUMBER).isEqualTo(bindMarker(CONFIRM_NUMBER)))
.build());
psFindReservation = cqlSession.prepare(
selectFrom(keyspaceName, TABLE_RESERVATION_BY_CONFI).all()
.where(column(CONFIRM_NUMBER).isEqualTo(bindMarker(CONFIRM_NUMBER)))
.build());
psSearchReservation = cqlSession.prepare(
selectFrom(keyspaceName, TABLE_RESERVATION_BY_HOTEL_DATE).all()
.where(column(HOTEL_ID).isEqualTo(bindMarker(HOTEL_ID)))
.where(column(START_DATE).isEqualTo(bindMarker(START_DATE)))
.build());
psDeleteReservationByConfirmation = cqlSession.prepare(
deleteFrom(keyspaceName, TABLE_RESERVATION_BY_CONFI)
.where(column(CONFIRM_NUMBER).isEqualTo(bindMarker(CONFIRM_NUMBER)))
.build());
psDeleteReservationByHotelDate = cqlSession.prepare(
deleteFrom(keyspaceName, TABLE_RESERVATION_BY_HOTEL_DATE)
.where(column(HOTEL_ID).isEqualTo(bindMarker(HOTEL_ID)))
.where(column(START_DATE).isEqualTo(bindMarker(START_DATE)))
.where(column(ROOM_NUMBER).isEqualTo(bindMarker(ROOM_NUMBER)))
.build());
psInsertReservationByHotelDate = cqlSession.prepare(QueryBuilder.insertInto(keyspaceName, TABLE_RESERVATION_BY_HOTEL_DATE)
.value(HOTEL_ID, bindMarker(HOTEL_ID))
.value(START_DATE, bindMarker(START_DATE))
.value(END_DATE, bindMarker(END_DATE))
.value(ROOM_NUMBER, bindMarker(ROOM_NUMBER))
.value(CONFIRM_NUMBER, bindMarker(CONFIRM_NUMBER))
.value(GUEST_ID, bindMarker(GUEST_ID))
.build());
psInsertReservationByConfirmation = cqlSession.prepare(QueryBuilder.insertInto(keyspaceName, TABLE_RESERVATION_BY_CONFI)
.value(CONFIRM_NUMBER, bindMarker(CONFIRM_NUMBER))
.value(HOTEL_ID, bindMarker(HOTEL_ID))
.value(START_DATE, bindMarker(START_DATE))
.value(END_DATE, bindMarker(END_DATE))
.value(ROOM_NUMBER, bindMarker(ROOM_NUMBER))
.value(GUEST_ID, bindMarker(GUEST_ID))
.build());
logger.info("Statements have been successfully prepared.");
}
}
}
TODO
в этом классе включают несколько пунктов, которые мы должны рассмотреть:
TODO
: проверяет импорт для публикации в Kafka. Обратите внимание, что классы, которые вы использовали ранее, импортированы.TODO
: Это обзор переменных, используемых для публикации в Kafka. В репозитории хранятсяObjectMapper
иKafkaProducer
.TODO
: Проверка инициализации объектов, необходимых для публикации в Kafka, инициализация переменных.
Теперь самое интересное — реализация кода для публикации сообщений:
TODO
: Это публикует сообщение Кафке, содержащее reservation. В методеupsert()
опубликуйте сообщение, содержащее резервирование, в виде строки JSON.TODO
: публикует сообщение для Kafka с пустой полезной нагрузкой, указывающее на удаление. В методеdelete()
публикует сообщение об удалении резервирования.
Подправим наш код:
ReservationRepositorySolution.java
/*
* Copyright (C) 2017-2020 Jeff Carpenter
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dev.cassandraguide.repository;
import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.bindMarker;
import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.deleteFrom;
import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.selectFrom;
import static com.datastax.oss.driver.api.querybuilder.SchemaBuilder.createTable;
import static com.datastax.oss.driver.api.querybuilder.SchemaBuilder.createType;
import static com.datastax.oss.driver.api.querybuilder.relation.Relation.column;
import java.time.LocalDate;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;
import javax.annotation.PreDestroy;
import com.fasterxml.jackson.core.JsonProcessingException;
import dev.cassandraguide.model.Reservation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Profile;
import org.springframework.lang.NonNull;
import org.springframework.stereotype.Repository;
import com.datastax.oss.driver.api.core.CqlIdentifier;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.BatchStatement;
import com.datastax.oss.driver.api.core.cql.BoundStatement;
import com.datastax.oss.driver.api.core.cql.DefaultBatchType;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.datastax.oss.driver.api.core.cql.Row;
import com.datastax.oss.driver.api.core.metadata.schema.ClusteringOrder;
import com.datastax.oss.driver.api.core.type.DataTypes;
import com.datastax.oss.driver.api.core.type.UserDefinedType;
import com.datastax.oss.driver.api.querybuilder.QueryBuilder;
// TODO: Review imports for publishing to Kafka
import org.apache.kafka.clients.producer.*;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
/**
* The goal of this project is to provide a minimally functional implementation of a microservice
* that uses Apache Cassandra for its data storage. The reservation service is implemented as a
* RESTful service using Spring Boot.
*
* @author Jeff Carpenter, Cedrick Lunven
*/
@Repository
@Profile("!unit-test") // When I do some 'unit-test' no connectivity to DB
public class ReservationRepository {
/** Logger for the class. */
private static final Logger logger = LoggerFactory.getLogger(ReservationRepository.class);
// Reservation Schema Constants
public static final CqlIdentifier TYPE_ADDRESS = CqlIdentifier.fromCql("address");
public static final CqlIdentifier TABLE_RESERVATION_BY_HOTEL_DATE =
CqlIdentifier.fromCql("reservations_by_hotel_date");
public static final CqlIdentifier TABLE_RESERVATION_BY_CONFI = CqlIdentifier.fromCql("reservations_by_confirmation");
public static final CqlIdentifier TABLE_RESERVATION_BY_GUEST = CqlIdentifier.fromCql("reservations_by_guest");
public static final CqlIdentifier TABLE_GUESTS = CqlIdentifier.fromCql("guests");
public static final CqlIdentifier STREET = CqlIdentifier.fromCql("street");
public static final CqlIdentifier CITY = CqlIdentifier.fromCql("city");
public static final CqlIdentifier STATE_PROVINCE = CqlIdentifier.fromCql("state_or_province");
public static final CqlIdentifier POSTAL_CODE = CqlIdentifier.fromCql("postal_code");
public static final CqlIdentifier COUNTRY = CqlIdentifier.fromCql("country");
public static final CqlIdentifier HOTEL_ID = CqlIdentifier.fromCql("hotel_id");
public static final CqlIdentifier START_DATE = CqlIdentifier.fromCql("start_date");
public static final CqlIdentifier END_DATE = CqlIdentifier.fromCql("end_date");
public static final CqlIdentifier ROOM_NUMBER = CqlIdentifier.fromCql("room_number");
public static final CqlIdentifier CONFIRMATION_NUMBER = CqlIdentifier.fromCql("confirmation_number");
public static final CqlIdentifier GUEST_ID = CqlIdentifier.fromCql("guest_id");
public static final CqlIdentifier GUEST_LAST_NAME = CqlIdentifier.fromCql("guest_last_name");
public static final CqlIdentifier FIRSTNAME = CqlIdentifier.fromCql("first_name");
public static final CqlIdentifier LASTNAME = CqlIdentifier.fromCql("last_name");
public static final CqlIdentifier TITLE = CqlIdentifier.fromCql("title");
public static final CqlIdentifier EMAILS = CqlIdentifier.fromCql("emails");
public static final CqlIdentifier PHONE_NUMBERS = CqlIdentifier.fromCql("phone_numbers");
public static final CqlIdentifier ADDRESSES = CqlIdentifier.fromCql("addresses");
private PreparedStatement psExistReservation;
private PreparedStatement psFindReservation;
private PreparedStatement psInsertReservationByHotelDate;
private PreparedStatement psInsertReservationByConfirmation;
private PreparedStatement psDeleteReservationByHotelDate;
private PreparedStatement psDeleteReservationByConfirmation;
private PreparedStatement psSearchReservation;
/** CqlSession holding metadata to interact with Cassandra. */
private CqlSession cqlSession;
private CqlIdentifier keyspaceName;
// TODO: Review variables used for publishing to Kafka
/** KafkaProducer for publishing messages to Kafka. */
private KafkaProducer<String, String> kafkaProducer;
private String kafkaTopicName;
private ObjectMapper objectMapper;
/** External Initialization. */
public ReservationRepository(
@NonNull CqlSession cqlSession,
@Qualifier("keyspace") @NonNull CqlIdentifier keyspaceName,
@NonNull KafkaProducer<String, String> kafkaProducer,
@NonNull String kafkaTopicName) {
this.cqlSession = cqlSession;
this.keyspaceName = keyspaceName;
// TODO: Review initialization of objects needed for publishing to Kafka
this.kafkaProducer = kafkaProducer;
this.kafkaTopicName = kafkaTopicName;
objectMapper = new ObjectMapper();
objectMapper.registerModule(new JavaTimeModule());
objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS)
objectMapper.enable(SerializationFeature.INDENT_OUTPUT)
// Will create tables (if they do not exist)
createReservationTables();
// Prepare Statements of reservation
prepareStatements();
logger.info("Application initialized.");
}
/**
* CqlSession is a stateful object handling TCP connections to nodes in the cluster.
* This operation properly closes sockets when the application is stopped
*/
@PreDestroy
public void cleanup() {
if (null != cqlSession) {
cqlSession.close();
logger.info("+ CqlSession has been successfully closed");
}
}
/**
* Testing existence is useful for building correct semantics in the RESTful API. To evaluate existence find the
* table where confirmation number is partition key which is reservations_by_confirmation
*
* @param confirmationNumber
* unique identifier for confirmation
* @return
* true if the reservation exists, false if it does not
*/
public boolean exists(String confirmationNumber) {
return cqlSession.execute(psExistReservation.bind(confirmationNumber))
.getAvailableWithoutFetching() > 0;
}
/**
* Similar to exists() but maps and parses results.
*
* @param confirmationNumber
* unique identifier for confirmation
* @return
* reservation if present or empty
*/
@NonNull
public Optional<Reservation> findByConfirmationNumber(@NonNull String confirmationNumber) {
ResultSet resultSet = cqlSession.execute(psFindReservation.bind(confirmationNumber));
// Hint: an empty result might not be an error as this method is sometimes used to check whether a
// reservation with this confirmation number exists
Row row = resultSet.one();
if (row == null) {
logger.debug("Unable to load reservation with confirmation number: " + confirmationNumber);
return Optional.empty();
}
// Hint: If there is a result, create a new reservation object and set the values
// Bonus: factor the logic to extract a reservation from a row into a separate method
// (you will reuse it again later in getAllReservations())
return Optional.of(mapRowToReservation(row));
}
/**
* Create new entry in multiple tables for this reservation.
*
* @param reservation
* current reservation object
* @return
* confirmation number for the reservation
*
*/
public String upsert(Reservation reservation) {
Objects.requireNonNull(reservation);
if (null == reservation.getConfirmationNumber()) {
// Generating a new reservation number if none has been provided
reservation.setConfirmationNumber(UUID.randomUUID().toString());
}
// Insert into 'reservations_by_hotel_date'
BoundStatement bsInsertReservationByHotel =
psInsertReservationByHotelDate.bind(reservation.getHotelId(), reservation.getStartDate(),
reservation.getEndDate(), reservation.getRoomNumber(), reservation.getConfirmationNumber(),
reservation.getGuestId());
// Insert into 'reservations_by_confirmation'
BoundStatement bsInsertReservationByConfirmation =
psInsertReservationByConfirmation.bind(reservation.getConfirmationNumber(), reservation.getHotelId(),
reservation.getStartDate(), reservation.getEndDate(), reservation.getRoomNumber(),
reservation.getGuestId());
BatchStatement batchInsertReservation = BatchStatement
.builder(DefaultBatchType.LOGGED)
.addStatement(bsInsertReservationByHotel)
.addStatement(bsInsertReservationByConfirmation)
.build();
cqlSession.execute(batchInsertReservation);
// TODO: Publish message to Kafka containing reservation
try {
String reservationJson = objectMapper.writeValueAsString(reservation);
ProducerRecord<String, String> record = new ProducerRecord<>(kafkaTopicName, reservation.getConfirmationNumber(), reservationJson);
kafkaProducer.send(record);
} catch (Exception e) {
logger.warn("Error publishing reservation message to Kafka: {}", e);
}
return reservation.getConfirmationNumber();
}
/**
* We pick 'reservations_by_confirmation' table to list reservations
* BUT we could have used 'reservations_by_hotel_date' (as no key provided in request)
*
* @return
* list containing all reservations
*/
public List<Reservation> findAll() {
return cqlSession.execute(selectFrom(keyspaceName, TABLE_RESERVATION_BY_CONFI).all().build())
.all() // no paging we retrieve all objects
.stream() // because we are good people
.map(this::mapRowToReservation) // Mapping row as Reservation
.collect(Collectors.toList()); // Back to list objects
}
/**
* Deleting a reservation.
*
* @param confirmationNumber
* unique identifier for confirmation.
*/
public boolean delete(String confirmationNumber) {
// Retrieving entire reservation in order to obtain the attributes we will need to delete from
// reservations_by_hotel_date table
Optional<Reservation> reservationToDelete = this.findByConfirmationNumber(confirmationNumber);
if (reservationToDelete.isPresent()) {
// Delete from 'reservations_by_hotel_date'
Reservation reservation = reservationToDelete.get();
BoundStatement bsDeleteReservationByHotelDate =
psDeleteReservationByHotelDate.bind(reservation.getHotelId(),
reservation.getStartDate(), reservation.getRoomNumber());
// Delete from 'reservations_by_confirmation'
BoundStatement bsDeleteReservationByConfirmation =
psDeleteReservationByConfirmation.bind(confirmationNumber);
BatchStatement batchDeleteReservation = BatchStatement
.builder(DefaultBatchType.LOGGED)
.addStatement(bsDeleteReservationByHotelDate)
.addStatement(bsDeleteReservationByConfirmation)
.build();
cqlSession.execute(batchDeleteReservation);
// TODO: Publish message to Kafka with empty payload to indicate deletion
ProducerRecord<String, String> record = new ProducerRecord<>(kafkaTopicName, reservation.getConfirmationNumber(), "");
kafkaProducer.send(record);
return true;
}
return false;
}
/**
* Search all reservation for an hotel id and LocalDate.
*
* @param hotelId
* hotel identifier
* @param date
* searched Date
* @return
* list of reservations matching the search criteria
*/
public List<Reservation> findByHotelAndDate(String hotelId, LocalDate date) {
Objects.requireNonNull(hotelId);
Objects.requireNonNull(date);
return cqlSession.execute(psSearchReservation.bind(hotelId, date))
.all() // no paging we retrieve all objects
.stream() // because we are good people
.map(this::mapRowToReservation) // Mapping row as Reservation
.collect(Collectors.toList()); // Back to list objects
}
/**
* Utility method to marshal a row as expected Reservation Bean.
*
* @param row
* current row from ResultSet
* @return
* object
*/
private Reservation mapRowToReservation(Row row) {
Reservation reservation = new Reservation();
reservation.setHotelId(row.getString(HOTEL_ID));
reservation.setConfirmationNumber(row.getString(CONFIRMATION_NUMBER));
reservation.setGuestId(row.getUuid(GUEST_ID));
reservation.setRoomNumber(row.getShort(ROOM_NUMBER));
reservation.setStartDate(row.getLocalDate(START_DATE));
reservation.setEndDate(row.getLocalDate(END_DATE));
return reservation;
}
/**
* Create Keyspace and relevant tables as per defined in 'reservation.cql'
*/
public void createReservationTables() {
/**
* Create TYPE 'Address' if not exists
*
* CREATE TYPE reservation.address (
* street text,
* city text,
* state_or_province text,
* postal_code text,
* country text
* );
*/
cqlSession.execute(
createType(keyspaceName, TYPE_ADDRESS)
.ifNotExists()
.withField(STREET, DataTypes.TEXT)
.withField(CITY, DataTypes.TEXT)
.withField(STATE_PROVINCE, DataTypes.TEXT)
.withField(POSTAL_CODE, DataTypes.TEXT)
.withField(COUNTRY, DataTypes.TEXT)
.build());
logger.debug("+ Type '{}' has been created (if needed)", TYPE_ADDRESS.asInternal());
/**
* CREATE TABLE reservation.reservations_by_hotel_date (
* hotel_id text,
* start_date date,
* end_date date,
* room_number smallint,
* confirmation_number text,
* guest_id uuid,
* PRIMARY KEY ((hotel_id, start_date), room_number)
* ) WITH comment = 'Q7. Find reservations by hotel and date';
*/
cqlSession.execute(createTable(keyspaceName, TABLE_RESERVATION_BY_HOTEL_DATE)
.ifNotExists()
.withPartitionKey(HOTEL_ID, DataTypes.TEXT)
.withPartitionKey(START_DATE, DataTypes.DATE)
.withClusteringColumn(ROOM_NUMBER, DataTypes.SMALLINT)
.withColumn(END_DATE, DataTypes.DATE)
.withColumn(CONFIRMATION_NUMBER, DataTypes.TEXT)
.withColumn(GUEST_ID, DataTypes.UUID)
.withClusteringOrder(ROOM_NUMBER, ClusteringOrder.ASC)
.withComment("Q7. Find reservations by hotel and date")
.build());
logger.debug("+ Table '{}' has been created (if needed)", TABLE_RESERVATION_BY_HOTEL_DATE.asInternal());
/**
* CREATE TABLE reservation.reservations_by_confirmation (
* confirmation_number text PRIMARY KEY,
* hotel_id text,
* start_date date,
* end_date date,
* room_number smallint,
* guest_id uuid
* );
*/
cqlSession.execute(createTable(keyspaceName, TABLE_RESERVATION_BY_CONFI)
.ifNotExists()
.withPartitionKey(CONFIRMATION_NUMBER, DataTypes.TEXT)
.withColumn(HOTEL_ID, DataTypes.TEXT)
.withColumn(START_DATE, DataTypes.DATE)
.withColumn(END_DATE, DataTypes.DATE)
.withColumn(ROOM_NUMBER, DataTypes.SMALLINT)
.withColumn(GUEST_ID, DataTypes.UUID)
.build());
logger.debug("+ Table '{}' has been created (if needed)", TABLE_RESERVATION_BY_CONFI.asInternal());
/**
* CREATE TABLE reservation.reservations_by_guest (
* guest_last_name text,
* hotel_id text,
* start_date date,
* end_date date,
* room_number smallint,
* confirmation_number text,
* guest_id uuid,
* PRIMARY KEY ((guest_last_name), hotel_id)
* ) WITH comment = 'Q8. Find reservations by guest name';
*/
cqlSession.execute(createTable(keyspaceName, TABLE_RESERVATION_BY_GUEST)
.ifNotExists()
.withPartitionKey(GUEST_LAST_NAME, DataTypes.TEXT)
.withClusteringColumn(HOTEL_ID, DataTypes.TEXT)
.withColumn(START_DATE, DataTypes.DATE)
.withColumn(END_DATE, DataTypes.DATE)
.withColumn(ROOM_NUMBER, DataTypes.SMALLINT)
.withColumn(CONFIRMATION_NUMBER, DataTypes.TEXT)
.withColumn(GUEST_ID, DataTypes.UUID)
.withComment("Q8. Find reservations by guest name")
.build());
logger.debug("+ Table '{}' has been created (if needed)", TABLE_RESERVATION_BY_GUEST.asInternal());
/**
* CREATE TABLE reservation.guests (
* guest_id uuid PRIMARY KEY,
* first_name text,
* last_name text,
* title text,
* emails set<text>,
* phone_numbers list<text>,
* addresses map<text, frozen<address>>,
* confirmation_number text
* ) WITH comment = 'Q9. Find guest by ID';
*/
UserDefinedType udtAddressType =
cqlSession.getMetadata().getKeyspace(keyspaceName).get() // Retrieving KeySpaceMetadata
.getUserDefinedType(TYPE_ADDRESS).get(); // Looking for UDT (extending DataType)
cqlSession.execute(createTable(keyspaceName, TABLE_GUESTS)
.ifNotExists()
.withPartitionKey(GUEST_ID, DataTypes.UUID)
.withColumn(FIRSTNAME, DataTypes.TEXT)
.withColumn(LASTNAME, DataTypes.TEXT)
.withColumn(TITLE, DataTypes.TEXT)
.withColumn(EMAILS, DataTypes.setOf(DataTypes.TEXT))
.withColumn(PHONE_NUMBERS, DataTypes.listOf(DataTypes.TEXT))
.withColumn(ADDRESSES, DataTypes.mapOf(DataTypes.TEXT, udtAddressType, true))
.withColumn(CONFIRMATION_NUMBER, DataTypes.TEXT)
.withComment("Q9. Find guest by ID")
.build());
logger.debug("+ Table '{}' has been created (if needed)", TABLE_GUESTS.asInternal());
logger.info("Schema has been successfully initialized.");
}
private void prepareStatements() {
if (psExistReservation == null) {
psExistReservation = cqlSession.prepare(selectFrom(keyspaceName, TABLE_RESERVATION_BY_CONFI).column(CONFIRMATION_NUMBER)
.where(column(CONFIRMATION_NUMBER).isEqualTo(bindMarker(CONFIRMATION_NUMBER)))
.build());
psFindReservation = cqlSession.prepare(
selectFrom(keyspaceName, TABLE_RESERVATION_BY_CONFI).all()
.where(column(CONFIRMATION_NUMBER).isEqualTo(bindMarker(CONFIRMATION_NUMBER)))
.build());
psSearchReservation = cqlSession.prepare(
selectFrom(keyspaceName, TABLE_RESERVATION_BY_HOTEL_DATE).all()
.where(column(HOTEL_ID).isEqualTo(bindMarker(HOTEL_ID)))
.where(column(START_DATE).isEqualTo(bindMarker(START_DATE)))
.build());
psDeleteReservationByConfirmation = cqlSession.prepare(
deleteFrom(keyspaceName, TABLE_RESERVATION_BY_CONFI)
.where(column(CONFIRMATION_NUMBER).isEqualTo(bindMarker(CONFIRMATION_NUMBER)))
.build());
psDeleteReservationByHotelDate = cqlSession.prepare(
deleteFrom(keyspaceName, TABLE_RESERVATION_BY_HOTEL_DATE)
.where(column(HOTEL_ID).isEqualTo(bindMarker(HOTEL_ID)))
.where(column(START_DATE).isEqualTo(bindMarker(START_DATE)))
.where(column(ROOM_NUMBER).isEqualTo(bindMarker(ROOM_NUMBER)))
.build());
psInsertReservationByHotelDate = cqlSession.prepare(QueryBuilder.insertInto(keyspaceName, TABLE_RESERVATION_BY_HOTEL_DATE)
.value(HOTEL_ID, bindMarker(HOTEL_ID))
.value(START_DATE, bindMarker(START_DATE))
.value(END_DATE, bindMarker(END_DATE))
.value(ROOM_NUMBER, bindMarker(ROOM_NUMBER))
.value(CONFIRMATION_NUMBER, bindMarker(CONFIRMATION_NUMBER))
.value(GUEST_ID, bindMarker(GUEST_ID))
.build());
psInsertReservationByConfirmation = cqlSession.prepare(QueryBuilder.insertInto(keyspaceName, TABLE_RESERVATION_BY_CONFI)
.value(CONFIRMATION_NUMBER, bindMarker(CONFIRMATION_NUMBER))
.value(HOTEL_ID, bindMarker(HOTEL_ID))
.value(START_DATE, bindMarker(START_DATE))
.value(END_DATE, bindMarker(END_DATE))
.value(ROOM_NUMBER, bindMarker(ROOM_NUMBER))
.value(GUEST_ID, bindMarker(GUEST_ID))
.build());
logger.info("Statements have been successfully prepared.");
}
}
}
И переходим к тестам.
Служба резервирования также включает тесты, которые выполняются для контейнера Cassandra, запущенного в Docker, который запускается для целей теста, а затем уничтожается.
Перед запуском этого теста мы хотим отключить узел Cassandra:kill `cat /tmp/cassandra-pid`
Теперь мы можем запустить тесты с помощью Maven:mvn test
Мы узнали, как написать микросервис, который сохраняет данные в Apache Cassandra с помощью Java-драйвера DataStax и генерирует события для изменения данных в Apache Kafka.
Статья подготовлена в преддверии старта курса Software Architect. Также хочу поделиться с вами записью бесплатного урока по теме "Архитектурное свойство "Сопровождаемость" на примере сервисов k8s".