Иванов Максим
Младший Java программист
Рецепт по приготовлению своего «Telegram-Франкенштейна»
Всем привет, данная статья является, своего рода моей первой, но все же постараюсь максимально просто рассказать вам о том, как создать бота, прикрутив к нему все обещанные выше свистелки-тарахтелки.
Статьи будут разделены на 2 части, первая часть - создание основного бота с оправкой логов (Kafka Producer) и записью их в БД, вторая часть - обработка всех логов (Kafka Consumer).
Ингредиенты:
Создание Spring Boot проект, проще всего это сделать через встроенный конфигуратор в IntelliJ IDEA, либо используя Spring Initializr. (в качестве системы сборки будет использоваться Gradle)
PostgreSQL (для комфортной работы я использую DBeaver)
Если возникнут сложности с воссозданием туториала
Прошу пишите в коментариях возникшие проблемы, на всякий случай - вот мой git
Начинаем с нарезки:
Первостепенно нужно настроить build.grable со всеми зависимостями
build.grable
buildscript {
repositories {
mavenCentral()
}
}
plugins {
id 'org.springframework.boot' version '2.4.2'
id 'io.spring.dependency-management' version '1.0.11.RELEASE'
id 'java'
}
apply from: 'build-test.gradle'
group 'com.sercetary.bot'
sourceCompatibility = '14'
configurations {
compileOnly {
extendsFrom annotationProcessor
}
}
repositories {
mavenCentral()
}
configurations.all {
exclude module: 'slf4j-log4j12'
}
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-web:2.5.6'
implementation 'org.springframework.boot:spring-boot-starter-jdbc:2.5.6'
implementation 'org.springframework.data:spring-data-commons:2.6.0'
implementation 'org.springframework.kafka:spring-kafka:2.7.6'
implementation 'org.postgresql:postgresql:42.3.1'
implementation 'com.h2database:h2:1.4.200'
implementation group: 'org.telegram', name: 'telegrambots-abilities', version: '5.3.0'
implementation group: 'org.telegram', name: 'telegrambots', version: '5.3.0'
compile group: 'org.slf4j', name: 'slf4j-log4j12', version: '1.7.29'
compileOnly 'org.projectlombok:lombok:1.18.22'
annotationProcessor 'org.projectlombok:lombok:1.18.22'
}
Далее сразу для работы Kafka опишем application.yml, в котором находятся настройки нашего kafka producer
application.yml
server:
port: 9000
spring:
kafka:
producer:
bootstrap-servers: localhost:9092
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
Теперь настройки application.properties
application.properties
# HTTP port for incoming requests
server.port=8081
app.http.bot=change-me
telegram-bot.name=change-me
telegram-bot.token=change-me
# Bot db
app.db.bot-db.url=jdbc:postgresql://localhost:5432/change-me
app.db.bot-db.driver=org.postgresql.Driver
app.db.bot-db.user=change-me
app.db.bot-db.password=change-me
app.db.bot-db.pool-size=10
# logging
logging.level.root=INFO
logging.level.org.springframework.web=DEBUG
logging.level.ru.centerinform.webhook=TRACE
logging.file.name=change-me
Хорошо, после настроек нашего проекта, давайте обговорим его структуру:
Пакеты:
config - описание бинов и конфигурации проекта
controller - обрабатывает запрос пользователя
dto - хранит данные, а так же описывает модель таблицы БД
exceptions - кастомный пакет обработчика ошибок
repository - логика работа с БД
service - основная бизнес логика проекта
Сейчас мы собираем игредиенты и маринуем:
Настройки бинов:
- Первым делом прописываем конфигурация бинов нашего приложения в пакете config, тут настройки инициализации TelegramBotsApi и ObjectMapper
AppConfig
@Configuration
public class AppConfig {
@Bean
ObjectMapper customObjectMapper() {
return new ObjectMapper();
}
@Bean
TelegramBotsApi telegramBotsApi() throws TelegramApiException{
return new TelegramBotsApi(DefaultBotSession.class);
}
}
- Внутри нашего класса DbConfig, есть класс SpringDataJdbcProperties, который описывает настройки SpringDataJdbc
DbConfig
@Configuration
public class DbConfig extends DefaultDbConfig {
@Bean
@Qualifier("bot-db")
@ConfigurationProperties(prefix = "app.db.bot-db")
SpringDataJdbcProperties gitlabJdbcProperties() {
return new SpringDataJdbcProperties();
}
@Bean
@Qualifier("bot-db")
public DataSource gitlabDataSource(@Qualifier("bot-db") SpringDataJdbcProperties properties) {
return hikariDataSource("db", properties);
}
@Bean
@Qualifier("bot-db")
JdbcTemplate gitlabJdbcTemplate(@Qualifier("bot-db") DataSource dataSource) {
return new JdbcTemplate(dataSource);
}
@Data
@NoArgsConstructor
public static class SpringDataJdbcProperties {
// constants
private static final String H2_DATABASE_DRIVER = "org.h2.Driver";
/**
* JDBC URL property
*/
String url;
/**
* JDBC driver class name property
*/
String driver;
/**
* JDBC username property
*/
String user;
/**
* JDBC password property
*/
String password;
/**
* Hikari / Vertica maxPoolSize property
*/
String poolSize;
/**
* Minimum pool size
*/
int minPoolSize = 4;
/**
* Maximum pool size
*/
int maxPoolSize = 10;
/**
* This property controls the maximum amount of time (in milliseconds) that a connection is allowed to
* sit idle in the pool. A value of 0 means that idle connections are never removed from the pool.
*/
long idleTimeout;
/**
* This property controls the maximum lifetime of a connection in the pool. When a connection
* reaches this timeout, even if recently used, it will be retired from the pool.
* An in-use connection will never be retired, only when it is idle will it be removed
*/
long maxLifetime;
/**
* Bulk insert size
*/
Integer bulkSize;
/**
* All-args constructor for {@link SpringDataJdbcProperties#toString()} (logging)
*
* @param url JDBC driver class name property
* @param driver JDBC driver class name property
* @param user JDBC username property
* @param password JDBC password property
* @param poolSize Hikari / Vertica maxPoolSize property
* @param bulkSize bulk insert size
*/
public SpringDataJdbcProperties(
String url, String driver, String user, String password, String poolSize, Integer bulkSize) {
this.url = url;
this.driver = driver;
this.user = user;
this.password = password;
this.poolSize = poolSize;
this.bulkSize = bulkSize;
}
/**
* Возвращает истину, если экземпляр описывает in-memory H2 database
*
* @return истина, если экземпляр описывает in-memory H2 database
*/
public boolean isH2Database() {
return driver.equals(H2_DATABASE_DRIVER);
}
/**
* Возвращает строковое представление экземпляра объекта в формате JSON
*
* @return строковое представление экземпляра объекта в формате JSON
*/
@Override
public String toString() {
var props = new SpringDataJdbcProperties(
url, driver, user, ((password == null) || password.isEmpty()) ? "" : "*****", poolSize, bulkSize);
return Json.encode(props);
}
}
}
- Создадим базовый класс для уменьшения дублирования кода инициализации бинов
DefaultDbConfig
@Slf4j
class DefaultDbConfig {
protected DataSource hikariDataSource(String tag, DbConfig.SpringDataJdbcProperties properties) {
log.info("[{}] настройки БД: [{}]", tag, properties.toString());
HikariDataSource ds = new HikariDataSource();
ds.setJdbcUrl(properties.getUrl());
ds.setDriverClassName(properties.getDriver());
ds.setUsername(properties.getUser());
ds.setPassword(properties.getPassword());
ds.setMaximumPoolSize(Integer.parseInt(properties.getPoolSize()));
return ds;
}
}
- После напишем утилитный класс для логирования
Json
public class Json {
static final ObjectMapper mapper = new ObjectMapper();
/**
* Encode instance as JSON
*
* @param obj instance
* @return JSON
*/
public static String encode(Object obj) {
try {
return mapper.writeValueAsString(obj);
} catch (JsonProcessingException e) {
return obj.toString();
}
}
public static <T> T decode(String json, Class<T> clazz) throws JsonProcessingException {
return mapper.readValue(json, clazz);
}
Далее мы напишем контроллер, для доступа к сервису из вне
- Создаем простенький контроллер, для получения списка записей из БД
UsersController
@Slf4j
@RestController
@RequestMapping("${app.http.bot")
@RequiredArgsConstructor
@SuppressWarnings("unused")
public class UsersController {
private final UserService userService;
/**
* Возвращает список пользователей и связанных с ними планами
*/
@RequestMapping(path = "/users_idea", method = RequestMethod.GET)
public List<User> getIdeaList() {
log.debug("Method - getIdeaList was called");
return userService.getUserList();
}
}
После переходим к созданию модели
- Создаем модель пользователя User, а так же его маппер UserMapper, который понадобиться для работы с БД и маппинга полей в таблице
User
@Data
@RequiredArgsConstructor
public class User {
/**
* user's id
*/
@JsonProperty("id")
private final int id;
/**
* user's name
*/
@JsonProperty("name")
private final String name;
/**
* description
*/
@JsonProperty("description")
private final String description;
private String startWord = "";
@Override
public String toString() { return startWord + description; }
}
UserMapper
@Slf4j
public class UserMapper implements RowMapper<User> {
@Override
public User mapRow(ResultSet rs, int rowNum) throws SQLException {
var entity = new User(
rs.getInt("id"),
rs.getString("user_name"),
rs.getString("description")
);
log.trace("mapRow(): entity = [{}]", entity);
return entity;
}
}
Переходим к созданию кастомных exception
Для чего они нужны
Их мы используем для обработки ошибок, которые могут произойти в процессе работы приложения, чтобы бот не сломался и продолжил свою работу.
- BaseException - класс, который наследуется от RuntimeException, в конструкторе принимает 2 параметра - сообщение и тело ошибки
BaseException
@Slf4j
public class BaseException extends RuntimeException{
public BaseException(String msg, Throwable t) {
super(msg, t);
log.error(msg, t);
}
public BaseException(String msg) {
super(msg);
log.error(msg);
}
}
- NotFoundException - класс, который вывзывается, когда ответ не найден, наследуется от BaseException
NotFoundException
@ResponseStatus(HttpStatus.NOT_FOUND)
public class NotFoundException extends BaseException {
private final static String MESSAGE = "Not Found";
public NotFoundException(Throwable t) {
super(MESSAGE, t);
}
public NotFoundException() {
super(MESSAGE);
}
}
- DbException - класс, который обрабатыевает ошибки связанные с БД, наследуется от RuntimeException
DbException
@ResponseStatus(HttpStatus.INTERNAL_SERVER_ERROR)
public class DbException extends RuntimeException {
private static final String MESSAGE = "Ошибка БД";
public DbException(String message) {
super(message);
}
public DbException(Throwable cause) {
super(MESSAGE, cause);
}
}
Теперь для работы с БД, создаем repository
- Создадим интерфейс, который описывает методы, для работы с записями в БД
IUserRepository
public interface IUserRepository {
/**
* Возвращает список записей по id
*
* @return запрашиваемая запись
* @throws DbException в случае ошибки БД
*/
User getById(int id);
/**
* Возвращает список записей
*
* @return список всех записей
* @throws DbException в случае ошибки БД
*/
List<User> getUserList();
/**
* Вставка новой записи
*
* @param entity новая запись
* @throws DbException в случае ошибки БД
*/
void insert(User entity);
/**
* Удаление записи
*
* @param entity удаляемая запись
* @throws DbException в случае ошибки БД
*/
void delete(User entity);
}
- Теперь напишем класс, который реализует методы интерфейса
UserRepository
@Slf4j
@Repository
public class UserRepository implements IUserRepository {
// constants
private static final String SQL_SELECT_BY_NAME = "" +
"SELECT id, user_name, description FROM user_table WHERE id=?";
private static final String SQL_SELECT_LIST = "" +
"SELECT id, user_name, description FROM user_table";
private static final String SQL_INSERT = "" +
"INSERT INTO user_table (user_name, description) VALUES (?, ?)";
private static final String SQL_DELETE = "" +
"DELETE FROM user_table WHERE id = ?";
protected final static UserMapper USER_MAPPER = new UserMapper();
// beans
protected final JdbcTemplate template;
/**
* Req-args constructor for Spring DI
*/
public UserRepository(@Qualifier("bot-db") JdbcTemplate template) {
this.template = template;
}
/**
* Возвращает список записей по id
*
* @return запрашиваемая запись
* @throws DbException в случае ошибки БД
*/
@Override
public User getById(int id) throws DbException {
try {
return DataAccessUtils.singleResult(
template.query(SQL_SELECT_BY_NAME, USER_MAPPER, id));
} catch (DataAccessException exception) {
throw new DbException(exception);
}
}
/**
* Возвращает список записей
*
* @return запрашиваемая запись
* @throws DbException в случае ошибки БД
*/
@Override
public List<User> getUserList() throws DbException {
try {
return template.query(SQL_SELECT_LIST, USER_MAPPER);
} catch (DataAccessException exception) {
throw new DbException(exception);
}
}
/**
* Вставка новой записи
*
* @param entity новая запись
* @throws DbException в случае ошибки БД
*/
@Override
public void insert(User entity) throws DbException {
try {
// В параметры запроса все поля сущности кроме идентификатора, т.к. он serial и генерируется автоматом
var result = template.update(SQL_INSERT,
entity.getName(),
entity.getDescription());
if (result != 1) log.trace("UserRepository.update() with {} rows inserted", entity);
log.info("insert({}) result={}", entity, result);
} catch (DataAccessException exception) {
throw new DbException(exception);
}
}
/**
* Удаление записи
*
* @param entity удаляемая запись
* @throws DbException в случае ошибки БД
*/
@Override
public void delete(User entity) throws DbException {
try {
var result = template.update(SQL_DELETE, entity.getId());
if (result != 1) log.trace("UserRepository.delete() with {} rows inserted", entity);
log.info("delete({}) result={}", entity, result);
} catch (DataAccessException exception) {
throw new DbException(exception);
}
}
}
- Далее у нас идет логика бота, тут все тривиально, в отнаследованном onUpdateReceived методе от класса родителя TelegramLongPollingBot мы пишем поведение, которое происходит при обновлении чата с пользователем, подробнее об этом здесь, так же в методе обработки сообщений есть вызов нашего producer и запись данных в БД
TelegramBot
@Slf4j
@Getter
@Component
public class TelegramBot extends TelegramLongPollingBot {
private Message requestMessage = new Message();
private final SendMessage response = new SendMessage();
private final Producer producerService;
private final UserService userService;
private final String botUsername;
private final String botToken;
public TelegramBot(
TelegramBotsApi telegramBotsApi,
@Value("${telegram-bot.name}") String botUsername,
@Value("${telegram-bot.token}") String botToken,
Producer producerService, UserService userService) throws TelegramApiException {
this.botUsername = botUsername;
this.botToken = botToken;
this.producerService = producerService;
this.userService = userService;
telegramBotsApi.registerBot(this);
}
/**
* Этот метод вызывается при получении обновлений через метод GetUpdates.
*
* @param request Получено обновление
*/
@SneakyThrows
@Override
public void onUpdateReceived(Update request) {
requestMessage = request.getMessage();
response.setChatId(requestMessage.getChatId().toString());
var entity = new User(
0, requestMessage.getChat().getUserName(),
requestMessage.getText());
if (request.hasMessage() && requestMessage.hasText())
log.info("Working onUpdateReceived, request text[{}]", request.getMessage().getText());
if (requestMessage.getText().equals("/start"))
defaultMsg(response, "Напишите команду для показа списка мыслей: \n " + "/idea - показать мысли");
else if (requestMessage.getText().equals("/idea"))
onIdea(response);
else
defaultMsg(response, "Я записал вашу мысль :) \n ");
log.info("Working, text[{}]", requestMessage.getText());
if (requestMessage.getText().startsWith("/")) {
entity.setStartWord("команда: ");
producerService.sendMessage( entity);
} else {
entity.setStartWord("мысль: ");
producerService.sendMessage( entity);
userService.insert(entity);
}
}
/**
* Метод отправки сообщения со списком мыслей - по команде "/idea"
*
* @param response - метод обработки сообщения
*/
private void onIdea(SendMessage response) throws TelegramApiException {
if (userService.getUserList().isEmpty()) {
defaultMsg(response, "В списке нет мыслей. \n");
} else {
defaultMsg(response, "Вот список ваших мыслей: \n");
for (User txt : userService.getUserList()) {
response.setText(txt.toString());
execute(response);
}
}
}
/**
* Шабонный метод отправки сообщения пользователю
*
* @param response - метод обработки сообщения
* @param msg - сообщение
*/
private void defaultMsg(SendMessage response, String msg) throws TelegramApiException {
response.setText(msg);
execute(response);
}
}
Фрагмент кода с отправкой в Kafka и записью в БД
if (requestMessage.getText().startsWith("/")) {
entity.setStartWord("команда: ");
producerService.sendMessage( entity);
} else {
entity.setStartWord("мысль: ");
producerService.sendMessage( entity);
userService.insert(entity);
}
Переходим к созданию бизнес логики приложения
- BaseService - реализует базовые методы сервисов проекта
BaseService
public class BaseService {
/**
* Обёртка результата
*
* @param result результат
* @return результат
* @throws NotFoundException если результат null
*/
public <T> T wrapResult(T result) {
if(result == null)
throw new NotFoundException();
return result;
}
/**
* Обёртка результата
*
* @param result результат
* @return результат
* @throws NotFoundException если результат null или пустой
*/
public <T> List<T> wrapResults(List<T> result) {
if(result == null || result.size() == 0)
throw new NotFoundException();
return result;
}
}
- Класс UserService работает с нашим репозиторием IUserRepository и содержит в себе бизнес-логику работы с записями о событиях в БД
UserService
@Service
@Slf4j
@RequiredArgsConstructor
public class UserService extends BaseService {
//beans
protected final IUserRepository repo;
/**
* Возвращает список записей
*
* @return список записей
* @throws DbException в случае ошибки БД
*/
public List<User> getUserList() {
log.trace("#### getUserList() - working");
return wrapResults(repo.getUserList());
}
/**
* Возвращает список записей по id
*
* @throws DbException в случае ошибки БД
*/
public User getById(int id) {
log.trace("#### getById() [id={}]", id);
return wrapResult(repo.getById(id));
}
/**
* Вставка новой записи
*
* @param entity новая запись
* @throws DbException в случае ошибки БД
*/
public void insert(User entity) {
log.trace("#### insert() [entity={}]", entity);
repo.insert(entity);
}
/**
* Удаление записи
*
* @param entity удаляемая запись
* @throws DbException в случае ошибки БД
*/
public void delete(User entity) {
log.trace("#### delete() [entity={}]", entity);
repo.delete(entity);
}
}
- Класс Producer, как раз тот класс, который шлет сообщения в топик users, а так же здесь мы можем изменять формат самого сообщения и данные, которые он отправляет
Producer
@Service
@Slf4j
public class Producer {
private static final String TOPIC = "users";
protected final IUserRepository repo;
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public Producer(IUserRepository repo) {
this.repo = repo;
}
public void sendMessage(User user) {
if (user.getName() == null || user.getDescription().isEmpty()) log.info("#### Empty name/description message");
log.info("#### Producing message [user={}]", user);
kafkaTemplate.send(TOPIC, "Writing in log -> " + user);
}
}
В конце класс, который собственно и запускает все наше приложене
WebHookApp
@Slf4j
@SpringBootApplication
public class WebHookApp {
public static void main(String[] args) {
SpringApplication.run(WebHookApp.class, args);
}
}
Теперь мы замариновали все ингридиенты и подготовили блюдо к запеканию:
- Сначала проверим, запущена ли Kafka
- После, запускаем Conductor и видим, что у нас работет брокер сообщений, после запуска нашего приложения, тут появится топик users, в который будут лететь сообщения отправленные нашим producer
- Далее запускаем DBeaver и создаем 2 таблицы (log и user_table), вот схема создания таблиц:
CREATE TABLE public.log (
id serial4 NOT NULL,
message varchar(500) NOT NULL,
date_time date NOT NULL,
topic varchar(100) NOT NULL,
CONSTRAINT log_pkey PRIMARY KEY (id)
);
CREATE TABLE public.user_table (
id serial4 NOT NULL,
user_name varchar(100) NOT NULL,
description varchar(500) NULL,
CONSTRAINT user_table_pkey PRIMARY KEY (id)
);
Отлично, блюдо запеклось и готово к подаче:
- Запускаем проект, проверяем, что все настроено и корректно работает
- Открываем телеграмм и пробуем на вкус нашего "Франкенштейна"
Пишем - /start и начинаем тест ... Я в шоке, оно живое !
- Давайте посмотрим, что же нам написал Spring в логах и записались ли данные в Kafka и БД ?
Логи нашего бота, ошибок не наблюдается
. ____ _ __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v2.4.2)
2022-01-15 16:46:19.248 INFO 412498 --- [ main] com.secretary.bot.WebHookApp : The following profiles are active: bot
2022-01-15 16:46:19.291 WARN 412498 --- [kground-preinit] o.s.h.c.j.Jackson2ObjectMapperBuilder : For Jackson Kotlin classes support please add "com.fasterxml.jackson.module:jackson-module-kotlin" to the classpath
2022-01-15 16:46:19.882 INFO 412498 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat initialized with port(s): 8081 (http)
2022-01-15 16:46:19.887 INFO 412498 --- [ main] o.apache.catalina.core.StandardService : Starting service [Tomcat]
2022-01-15 16:46:19.887 INFO 412498 --- [ main] org.apache.catalina.core.StandardEngine : Starting Servlet engine: [Apache Tomcat/9.0.41]
2022-01-15 16:46:19.956 INFO 412498 --- [ main] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring embedded WebApplicationContext
2022-01-15 16:46:19.957 INFO 412498 --- [ main] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 678 ms
2022-01-15 16:46:20.013 INFO 412498 --- [ main] c.secretary.bot.config.DefaultDbConfig : [db] настройки БД: [{"url":"jdbc:postgresql://localhost:5432/postgres","driver":"org.postgresql.Driver","user":"*****","password":"*****","poolSize":"10","minPoolSize":4,"maxPoolSize":10,"idleTimeout":0,"maxLifetime":0,"bulkSize":null,"h2Database":false}]
2022-01-15 16:46:20.565 INFO 412498 --- [ main] o.s.s.concurrent.ThreadPoolTaskExecutor : Initializing ExecutorService 'applicationTaskExecutor'
2022-01-15 16:46:20.574 DEBUG 412498 --- [ main] s.w.s.m.m.a.RequestMappingHandlerAdapter : ControllerAdvice beans: 0 @ModelAttribute, 0 @InitBinder, 1 RequestBodyAdvice, 1 ResponseBodyAdvice
2022-01-15 16:46:20.598 DEBUG 412498 --- [ main] s.w.s.m.m.a.RequestMappingHandlerMapping : 3 mappings in 'requestMappingHandlerMapping'
2022-01-15 16:46:20.619 DEBUG 412498 --- [ main] o.s.w.s.handler.SimpleUrlHandlerMapping : Patterns [/webjars/**, /**] in 'resourceHandlerMapping'
2022-01-15 16:46:20.627 DEBUG 412498 --- [ main] .m.m.a.ExceptionHandlerExceptionResolver : ControllerAdvice beans: 0 @ExceptionHandler, 1 ResponseBodyAdvice
2022-01-15 16:46:20.702 INFO 412498 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 8081 (http) with context path ''
2022-01-15 16:46:20.709 INFO 412498 --- [ main] com.secretary.bot.WebHookApp : Started WebHookApp in 1.65 seconds (JVM running for 1.962)
SSS2022-01-15 16:52:33.916 INFO 412498 --- [legram Executor] o.a.k.clients.producer.ProducerConfig : ProducerConfig values:
acks = 1
batch.size = 16384
bootstrap.servers = [localhost:9092]
buffer.memory = 33554432
client.dns.lookup = use_all_dns_ips
client.id = producer-1
compression.type = none
connections.max.idle.ms = 540000
delivery.timeout.ms = 120000
enable.idempotence = false
interceptor.classes = []
internal.auto.downgrade.txn.commit = true
key.serializer = class org.apache.kafka.common.serialization.StringSerializer
linger.ms = 0
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metadata.max.idle.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 2147483647
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class org.apache.kafka.common.serialization.StringSerializer
2022-01-15 16:52:33.947 INFO 412498 --- [legram Executor] o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.6.0
2022-01-15 16:52:33.948 INFO 412498 --- [legram Executor] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 62abe01bee039651
2022-01-15 16:52:33.948 INFO 412498 --- [legram Executor] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1642254753947
2022-01-15 16:52:34.056 INFO 412498 --- [ad | producer-1] org.apache.kafka.clients.Metadata : [Producer clientId=producer-1] Cluster ID: faKjxP6CTvGFeeVKJw
2022-01-15 16:54:01.115 INFO 412498 --- [legram Executor] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Starting...
2022-01-15 16:54:01.188 INFO 412498 --- [legram Executor] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Start completed.
- Как мы видим, сообщения отправленные Боту появились в БД
- Открыв кондуктор, перейдите во вкладку topics, после нажимаем на наш топик users
- Далее во вкладке нашего топика нажимаем на кнопку CONSUME DATA
- В открывшемся окне, ставим такие же настройки (самая важная из них это Start From - указывает, с какого момента показывать сообщения в Kafka, наша настройка - показывает все сообщения, включая отправленые ранее)
- Вот и все, теперь мы убедились, что сообщения благополучно прилетели в Kafka, записались в БД и не вызвали ошибок в приложении
Ну что же, большое всем спасибо за время, потраченное на прочтение данной статьи, жду вас во второй части этого туториала, где мы используем Consumer Kafka, с помощью которого будем обрабатывать прилетающие сообщения.
Комментарии (11)
MWGuy
12.03.2022 19:56+4В чём причина использовать application.yml и application.properties вместе? В этом нет никакого смысла, так как это один и тот-же файл, но в разных форматах.
Также хочу ещё обратить внимание автора на "build.grable" :^)MWGuy
12.03.2022 20:02+5Также советую автору прочитать хоть базовый туториал про Spring Data. Тогда бы данная статья сократилась минимум в 2 раза.
Ещё хочу отметить, что мы используем Spring Boot, а не голый Spring. Это значит что проблему с конфигурацией уже решили за нас. К примеру: ObjectMapper уже инициализирован и лежит в контексте, создавать его не нужно
MWGuy
12.03.2022 20:09+3Пожалуй напишу ещё один совет для автора - знай свой инструмент. Intellij IDEA включает в себя SQL клиент. Никакой дополнительный больше не нужен.
Почитать про него можно тут: https://www.jetbrains.com/help/idea/relational-databases.htmlUnicornFreedom
14.03.2022 08:49Справедливости ради - Database tools это часть пакета Ultimate.
Помимо собственно "знания своего инструмента", потребуется взнос в размере $50 в месяц (либо $500 в год).
LeshaRB
12.03.2022 22:58+1Внедрение рекомендуется делать через конструктор, а не через сеттер
@RequestMapping(path = "/users_idea", method = RequestMethod.GET), почему не использовать Get mapping?
Зачем использовать Slf4j если в коде нет логирования? Абы было?
Где конструктор самописный, где-то через ломобок
Ну и прочее
isden
13.03.2022 10:53+2К написанному выше еще хочется добавить:
Далее запускаем DBeaver и создаем 2 таблицы (log и user_table), вот схема создания таблиц:
Почему не используете вещи вроде liquibase? Более того, в Spring Boot есть и другие варианты.
Да, и где же тесты? :) Это же самое интересное.
martuz0
14.03.2022 08:44Для первой статьи, это весьма круто! Так держать!
P.S обратил внимание, что вы используете Кондуктор, круто, но платно)), есть интересное open source решение, которое позволяет менеджить кластеры кафка
aszhitarev
Простите, накипело
Ещё раз простите.
А статья отличная. Жду продолжения.