Иванов Максим

Младший Java программист

Рецепт по приготовлению своего «Telegram-Франкенштейна»

Даже человек средних способностей, упорно занимаясь одним предметом, непременно достигнет в нем глубоких познаний. - «Франкенштейн» Мэри Шелли
Даже человек средних способностей, упорно занимаясь одним предметом, непременно достигнет в нем глубоких познаний. - «Франкенштейн» Мэри Шелли

Всем привет, данная статья является, своего рода моей первой, но все же постараюсь максимально просто рассказать вам о том, как создать бота, прикрутив к нему все обещанные выше свистелки-тарахтелки.

Статьи будут разделены на 2 части, первая часть - создание основного бота с оправкой логов (Kafka Producer) и записью их в БД, вторая часть - обработка всех логов (Kafka Consumer).

Ингредиенты:

  1. Регистрация бота

  2. Создание Spring Boot проект, проще всего это сделать через встроенный конфигуратор в IntelliJ IDEA, либо используя Spring Initializr. (в качестве системы сборки будет использоваться Gradle)

  3. Kafka (для отслеживания топиков я использую Conductor)

  4. PostgreSQL (для комфортной работы я использую DBeaver)

Если возникнут сложности с воссозданием туториала

Прошу пишите в коментариях возникшие проблемы, на всякий случай - вот мой git

Начинаем с нарезки:

Первостепенно нужно настроить build.grable со всеми зависимостями

build.grable
buildscript {
    repositories {
        mavenCentral()
    }
}

plugins {
    id 'org.springframework.boot' version '2.4.2'
    id 'io.spring.dependency-management' version '1.0.11.RELEASE'
    id 'java'
}

apply from: 'build-test.gradle'

group 'com.sercetary.bot'
sourceCompatibility = '14'

configurations {
    compileOnly {
        extendsFrom annotationProcessor
    }
}

repositories {
    mavenCentral()
}

configurations.all {
    exclude module: 'slf4j-log4j12'
}

dependencies {
    implementation 'org.springframework.boot:spring-boot-starter-web:2.5.6'
    implementation 'org.springframework.boot:spring-boot-starter-jdbc:2.5.6'
    implementation 'org.springframework.data:spring-data-commons:2.6.0'
    implementation 'org.springframework.kafka:spring-kafka:2.7.6'
    implementation 'org.postgresql:postgresql:42.3.1'
    implementation 'com.h2database:h2:1.4.200'

    implementation group: 'org.telegram', name: 'telegrambots-abilities', version: '5.3.0'
    implementation group: 'org.telegram', name: 'telegrambots', version: '5.3.0'

    compile group: 'org.slf4j', name: 'slf4j-log4j12', version: '1.7.29'
    compileOnly 'org.projectlombok:lombok:1.18.22'
    annotationProcessor 'org.projectlombok:lombok:1.18.22'
}

Далее сразу для работы Kafka опишем application.yml, в котором находятся настройки нашего kafka producer

application.yml
server:
  port: 9000
spring:
  kafka:
    producer:
      bootstrap-servers: localhost:9092
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

Теперь настройки application.properties

application.properties
# HTTP port for incoming requests
server.port=8081

app.http.bot=change-me
telegram-bot.name=change-me
telegram-bot.token=change-me

# Bot db
app.db.bot-db.url=jdbc:postgresql://localhost:5432/change-me
app.db.bot-db.driver=org.postgresql.Driver
app.db.bot-db.user=change-me
app.db.bot-db.password=change-me
app.db.bot-db.pool-size=10

# logging
logging.level.root=INFO
logging.level.org.springframework.web=DEBUG
logging.level.ru.centerinform.webhook=TRACE
logging.file.name=change-me

Хорошо, после настроек нашего проекта, давайте обговорим его структуру:

Структура проекта
Структура проекта

Пакеты:

  • config - описание бинов и конфигурации проекта

  • controller - обрабатывает запрос пользователя

  • dto - хранит данные, а так же описывает модель таблицы БД

  • exceptions - кастомный пакет обработчика ошибок

  • repository - логика работа с БД

  • service - основная бизнес логика проекта

Сейчас мы собираем игредиенты и маринуем:

Настройки бинов:

- Первым делом прописываем конфигурация бинов нашего приложения в пакете config, тут настройки инициализации TelegramBotsApi и ObjectMapper

AppConfig
@Configuration
public class AppConfig {

    @Bean
    ObjectMapper customObjectMapper() {
        return new ObjectMapper();
    }

    @Bean
    TelegramBotsApi telegramBotsApi() throws TelegramApiException{
        return new TelegramBotsApi(DefaultBotSession.class);
    }
}

- Внутри нашего класса DbConfig, есть класс SpringDataJdbcProperties, который описывает настройки SpringDataJdbc

DbConfig
@Configuration
public class DbConfig extends DefaultDbConfig {

    @Bean
    @Qualifier("bot-db")
    @ConfigurationProperties(prefix = "app.db.bot-db")
    SpringDataJdbcProperties gitlabJdbcProperties() {
        return new SpringDataJdbcProperties();
    }

    @Bean
    @Qualifier("bot-db")
    public DataSource gitlabDataSource(@Qualifier("bot-db") SpringDataJdbcProperties properties) {
        return hikariDataSource("db", properties);
    }

    @Bean
    @Qualifier("bot-db")
    JdbcTemplate gitlabJdbcTemplate(@Qualifier("bot-db") DataSource dataSource) {
        return new JdbcTemplate(dataSource);
    }

    @Data
    @NoArgsConstructor
    public static class SpringDataJdbcProperties {

        // constants
        private static final String H2_DATABASE_DRIVER = "org.h2.Driver";

        /**
         * JDBC URL property
         */
        String url;
        /**
         * JDBC driver class name property
         */
        String driver;
        /**
         * JDBC username property
         */
        String user;
        /**
         * JDBC password property
         */
        String password;
        /**
         * Hikari / Vertica maxPoolSize property
         */
        String poolSize;
        /**
         * Minimum pool size
         */
        int minPoolSize = 4;
        /**
         * Maximum pool size
         */
        int maxPoolSize = 10;
        /**
         * This property controls the maximum amount of time (in milliseconds) that a connection is allowed to
         * sit idle in the pool. A value of 0 means that idle connections are never removed from the pool.
         */
        long idleTimeout;
        /**
         * This property controls the maximum lifetime of a connection in the pool. When a connection
         * reaches this timeout, even if recently used, it will be retired from the pool.
         * An in-use connection will never be retired, only when it is idle will it be removed
         */
        long maxLifetime;
        /**
         * Bulk insert size
         */
        Integer bulkSize;


        /**
         * All-args constructor for {@link SpringDataJdbcProperties#toString()} (logging)
         *
         * @param url JDBC driver class name property
         * @param driver JDBC driver class name property
         * @param user JDBC username property
         * @param password JDBC password property
         * @param poolSize Hikari / Vertica maxPoolSize property
         * @param bulkSize bulk insert size
         */
        public SpringDataJdbcProperties(
                String url, String driver, String user, String password, String poolSize, Integer bulkSize) {
            this.url = url;
            this.driver = driver;
            this.user = user;
            this.password = password;
            this.poolSize = poolSize;
            this.bulkSize = bulkSize;
        }


        /**
         * Возвращает истину, если экземпляр описывает in-memory H2 database
         *
         * @return истина, если экземпляр описывает in-memory H2 database
         */
        public boolean isH2Database() {
            return driver.equals(H2_DATABASE_DRIVER);
        }

        /**
         * Возвращает строковое представление экземпляра объекта в формате JSON
         *
         * @return строковое представление экземпляра объекта в формате JSON
         */
        @Override
        public String toString() {
            var props = new SpringDataJdbcProperties(
                    url, driver, user, ((password == null) || password.isEmpty()) ? "" : "*****", poolSize, bulkSize);
            return Json.encode(props);
        }

    }

}

- Создадим базовый класс для уменьшения дублирования кода инициализации бинов

DefaultDbConfig
@Slf4j
class DefaultDbConfig {

    protected DataSource hikariDataSource(String tag, DbConfig.SpringDataJdbcProperties properties) {
        log.info("[{}] настройки БД: [{}]", tag, properties.toString());

        HikariDataSource ds = new HikariDataSource();
        ds.setJdbcUrl(properties.getUrl());
        ds.setDriverClassName(properties.getDriver());
        ds.setUsername(properties.getUser());
        ds.setPassword(properties.getPassword());
        ds.setMaximumPoolSize(Integer.parseInt(properties.getPoolSize()));
        return ds;
    }
}

- После напишем утилитный класс для логирования

Json
public class Json {
    static final ObjectMapper mapper = new ObjectMapper();

    /**
     * Encode instance as JSON
     *
     * @param obj instance
     * @return JSON
     */
    public static String encode(Object obj) {
        try {
            return mapper.writeValueAsString(obj);
        } catch (JsonProcessingException e) {
            return obj.toString();
        }
    }

    public static <T> T decode(String json, Class<T> clazz) throws JsonProcessingException {
        return mapper.readValue(json, clazz);
    }

Далее мы напишем контроллер, для доступа к сервису из вне

- Создаем простенький контроллер, для получения списка записей из БД

UsersController
@Slf4j
@RestController
@RequestMapping("${app.http.bot")
@RequiredArgsConstructor
@SuppressWarnings("unused")
public class UsersController {

    private final UserService userService;

    /**
     * Возвращает список пользователей и связанных с ними планами
     */
    @RequestMapping(path = "/users_idea", method = RequestMethod.GET)
    public List<User> getIdeaList() {
        log.debug("Method - getIdeaList was called");
        return userService.getUserList();
    }
}

После переходим к созданию модели

- Создаем модель пользователя User, а так же его маппер UserMapper, который понадобиться для работы с БД и маппинга полей в таблице

User
@Data
@RequiredArgsConstructor
public class User {
    /**
     * user's id
     */
    @JsonProperty("id")
    private final int id;
    /**
     * user's name
     */
    @JsonProperty("name")
    private final String name;
    /**
     * description
     */
    @JsonProperty("description")
    private final String description;

    private String startWord = "";

    @Override
    public String toString() { return startWord + description; }
}
UserMapper
@Slf4j
public class UserMapper implements RowMapper<User> {

    @Override
    public User mapRow(ResultSet rs, int rowNum) throws SQLException {
        var entity = new User(
                rs.getInt("id"),
                rs.getString("user_name"),
                rs.getString("description")
                );
        log.trace("mapRow(): entity = [{}]", entity);
        return entity;
    }
}

Переходим к созданию кастомных exception

Для чего они нужны

Их мы используем для обработки ошибок, которые могут произойти в процессе работы приложения, чтобы бот не сломался и продолжил свою работу.

- BaseException - класс, который наследуется от RuntimeException, в конструкторе принимает 2 параметра - сообщение и тело ошибки

BaseException
@Slf4j
public class BaseException extends RuntimeException{

    public BaseException(String msg, Throwable t) {
        super(msg, t);
        log.error(msg, t);
    }

    public BaseException(String msg) {
        super(msg);
        log.error(msg);
    }

}

- NotFoundException - класс, который вывзывается, когда ответ не найден, наследуется от BaseException

NotFoundException
@ResponseStatus(HttpStatus.NOT_FOUND)
public class NotFoundException extends BaseException {

    private final static String MESSAGE = "Not Found";

    public NotFoundException(Throwable t) {
        super(MESSAGE, t);
    }

    public NotFoundException() {
        super(MESSAGE);
    }
}

- DbException - класс, который обрабатыевает ошибки связанные с БД, наследуется от RuntimeException

DbException
@ResponseStatus(HttpStatus.INTERNAL_SERVER_ERROR)
public class DbException extends RuntimeException {

    private static final String MESSAGE = "Ошибка БД";

    public DbException(String message) {
        super(message);
    }

    public DbException(Throwable cause) {
        super(MESSAGE, cause);
    }
}

Теперь для работы с БД, создаем repository

- Создадим интерфейс, который описывает методы, для работы с записями в БД

IUserRepository
public interface IUserRepository {

    /**
     * Возвращает список записей по id
     *
     * @return запрашиваемая запись
     * @throws DbException в случае ошибки БД
     */
    User getById(int id);

    /**
     * Возвращает список записей
     *
     * @return список всех записей
     * @throws DbException в случае ошибки БД
     */
    List<User> getUserList();

    /**
     * Вставка новой записи
     *
     * @param entity новая запись
     * @throws DbException в случае ошибки БД
     */
    void insert(User entity);

    /**
     * Удаление записи
     *
     * @param entity удаляемая запись
     * @throws DbException в случае ошибки БД
     */
    void delete(User entity);
}

- Теперь напишем класс, который реализует методы интерфейса

UserRepository
@Slf4j
@Repository
public class UserRepository implements IUserRepository {

    // constants
    private static final String SQL_SELECT_BY_NAME = "" +
            "SELECT id, user_name, description FROM user_table WHERE id=?";
    private static final String SQL_SELECT_LIST = "" +
            "SELECT id, user_name, description FROM user_table";
    private static final String SQL_INSERT = "" +
            "INSERT INTO user_table (user_name, description) VALUES (?, ?)";
    private static final String SQL_DELETE = "" +
            "DELETE FROM user_table WHERE id = ?";

    protected final static UserMapper USER_MAPPER = new UserMapper();

    // beans
    protected final JdbcTemplate template;


    /**
     * Req-args constructor for Spring DI
     */
    public UserRepository(@Qualifier("bot-db") JdbcTemplate template) {
        this.template = template;
    }

    /**
     * Возвращает список записей по id
     *
     * @return запрашиваемая запись
     * @throws DbException в случае ошибки БД
     */
    @Override
    public User getById(int id) throws DbException {
        try {
            return DataAccessUtils.singleResult(
                    template.query(SQL_SELECT_BY_NAME, USER_MAPPER, id));
        } catch (DataAccessException exception) {
            throw new DbException(exception);
        }
    }

    /**
     * Возвращает список записей
     *
     * @return запрашиваемая запись
     * @throws DbException в случае ошибки БД
     */
    @Override
    public List<User> getUserList() throws DbException {
        try {
            return template.query(SQL_SELECT_LIST, USER_MAPPER);
        } catch (DataAccessException exception) {
            throw new DbException(exception);
        }
    }

    /**
     * Вставка новой записи
     *
     * @param entity новая запись
     * @throws DbException в случае ошибки БД
     */
    @Override
    public void insert(User entity) throws DbException {
        try {
            // В параметры запроса все поля сущности кроме идентификатора, т.к. он serial и генерируется автоматом
            var result = template.update(SQL_INSERT,
                    entity.getName(),
                    entity.getDescription());
            if (result != 1) log.trace("UserRepository.update() with {} rows inserted", entity);
            log.info("insert({}) result={}", entity, result);
        } catch (DataAccessException exception) {
            throw new DbException(exception);
        }
    }

    /**
     * Удаление записи
     *
     * @param entity удаляемая запись
     * @throws DbException в случае ошибки БД
     */
    @Override
    public void delete(User entity) throws DbException {
        try {
            var result = template.update(SQL_DELETE, entity.getId());
            if (result != 1) log.trace("UserRepository.delete() with {} rows inserted", entity);
            log.info("delete({}) result={}", entity, result);
        } catch (DataAccessException exception) {
            throw new DbException(exception);
        }
    }
}

- Далее у нас идет логика бота, тут все тривиально, в отнаследованном onUpdateReceived методе от класса родителя TelegramLongPollingBot мы пишем поведение, которое происходит при обновлении чата с пользователем, подробнее об этом здесь, так же в методе обработки сообщений есть вызов нашего producer и запись данных в БД

TelegramBot
@Slf4j
@Getter
@Component
public class TelegramBot extends TelegramLongPollingBot {

    private Message requestMessage = new Message();
    private final SendMessage response = new SendMessage();
    private final Producer producerService;
    private final UserService userService;

    private final String botUsername;
    private final String botToken;

    public TelegramBot(
            TelegramBotsApi telegramBotsApi,
            @Value("${telegram-bot.name}") String botUsername,
            @Value("${telegram-bot.token}") String botToken,
            Producer producerService, UserService userService) throws TelegramApiException {
        this.botUsername = botUsername;
        this.botToken = botToken;
        this.producerService = producerService;
        this.userService = userService;

        telegramBotsApi.registerBot(this);
    }

    /**
     * Этот метод вызывается при получении обновлений через метод GetUpdates.
     *
     * @param request Получено обновление
     */
    @SneakyThrows
    @Override
    public void onUpdateReceived(Update request) {
        requestMessage = request.getMessage();
        response.setChatId(requestMessage.getChatId().toString());

        var entity = new User(
                0, requestMessage.getChat().getUserName(),
                requestMessage.getText());

        if (request.hasMessage() && requestMessage.hasText())
            log.info("Working onUpdateReceived, request text[{}]", request.getMessage().getText());

        if (requestMessage.getText().equals("/start"))
            defaultMsg(response, "Напишите команду для показа списка мыслей: \n " + "/idea - показать мысли");
        else if (requestMessage.getText().equals("/idea"))
            onIdea(response);
        else
            defaultMsg(response, "Я записал вашу мысль :) \n ");

        log.info("Working, text[{}]", requestMessage.getText());

        if (requestMessage.getText().startsWith("/")) {
            entity.setStartWord("команда: ");
            producerService.sendMessage( entity);
        } else {
            entity.setStartWord("мысль: ");
            producerService.sendMessage( entity);
            userService.insert(entity);
        }
    }

    /**
     * Метод отправки сообщения со списком мыслей - по команде "/idea"
     *
     * @param response - метод обработки сообщения
     */
    private void onIdea(SendMessage response) throws TelegramApiException {
        if (userService.getUserList().isEmpty()) {
            defaultMsg(response, "В списке нет мыслей. \n");
        } else {
            defaultMsg(response, "Вот список ваших мыслей: \n");
            for (User txt : userService.getUserList()) {
                response.setText(txt.toString());
                execute(response);
            }
        }
    }

    /**
     * Шабонный метод отправки сообщения пользователю
     *
     * @param response - метод обработки сообщения
     * @param msg - сообщение
     */
    private void defaultMsg(SendMessage response, String msg) throws TelegramApiException {
        response.setText(msg);
        execute(response);
    }
}
Фрагмент кода с отправкой в Kafka и записью в БД
        if (requestMessage.getText().startsWith("/")) {
            entity.setStartWord("команда: ");
            producerService.sendMessage( entity);
        } else {
            entity.setStartWord("мысль: ");
            producerService.sendMessage( entity);
            userService.insert(entity);
        }

Переходим к созданию бизнес логики приложения

- BaseService - реализует базовые методы сервисов проекта

BaseService
public class BaseService {

    /**
     * Обёртка результата
     *
     * @param result результат
     * @return результат
     * @throws NotFoundException если результат null
     */
    public <T> T wrapResult(T result) {
        if(result == null)
            throw new NotFoundException();
        return result;
    }

    /**
     * Обёртка результата
     *
     * @param result результат
     * @return результат
     * @throws NotFoundException если результат null или пустой
     */
    public <T> List<T> wrapResults(List<T> result) {
        if(result == null || result.size() == 0)
            throw new NotFoundException();
        return result;
    }

}

- Класс UserService работает с нашим репозиторием IUserRepository и содержит в себе бизнес-логику работы с записями о событиях в БД

UserService
@Service
@Slf4j
@RequiredArgsConstructor
public class UserService extends BaseService {

    //beans
    protected final IUserRepository repo;

    /**
     * Возвращает список записей
     *
     * @return список записей
     * @throws DbException в случае ошибки БД
     */
    public List<User> getUserList() {
        log.trace("#### getUserList() - working");
        return wrapResults(repo.getUserList());
    }

    /**
     * Возвращает список записей по id
     *
     * @throws DbException в случае ошибки БД
     */
    public User getById(int id) {
        log.trace("#### getById() [id={}]", id);
        return wrapResult(repo.getById(id));
    }

    /**
     * Вставка новой записи
     *
     * @param entity новая запись
     * @throws DbException в случае ошибки БД
     */
    public void insert(User entity) {
        log.trace("#### insert() [entity={}]", entity);
        repo.insert(entity);
    }

    /**
     * Удаление записи
     *
     * @param entity удаляемая запись
     * @throws DbException в случае ошибки БД
     */
    public void delete(User entity) {
        log.trace("#### delete() [entity={}]", entity);
        repo.delete(entity);
    }

}

- Класс Producer, как раз тот класс, который шлет сообщения в топик users, а так же здесь мы можем изменять формат самого сообщения и данные, которые он отправляет

Producer
@Service
@Slf4j
public class Producer {

    private static final String TOPIC = "users";
    protected final IUserRepository repo;

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public Producer(IUserRepository repo) {
        this.repo = repo;
    }

    public void sendMessage(User user) {
        if (user.getName() == null || user.getDescription().isEmpty()) log.info("#### Empty name/description message");
        log.info("#### Producing message [user={}]", user);
        kafkaTemplate.send(TOPIC, "Writing in log -> " + user);
    }
}

В конце класс, который собственно и запускает все наше приложене

WebHookApp
@Slf4j
@SpringBootApplication
public class WebHookApp {
    public static void main(String[] args) {
        SpringApplication.run(WebHookApp.class, args);
    }
}

Теперь мы замариновали все ингридиенты и подготовили блюдо к запеканию:

- Сначала проверим, запущена ли Kafka

запуск по команде - sudo su systemctl start kafka
запуск по команде - sudo su systemctl start kafka

- После, запускаем Conductor и видим, что у нас работет брокер сообщений, после запуска нашего приложения, тут появится топик users, в который будут лететь сообщения отправленные нашим producer

Запущенный брокер
Запущенный брокер

- Далее запускаем DBeaver и создаем 2 таблицы (log и user_table), вот схема создания таблиц:

CREATE TABLE public.log (
	id serial4 NOT NULL,
	message varchar(500) NOT NULL,
	date_time date NOT NULL,
	topic varchar(100) NOT NULL,
	CONSTRAINT log_pkey PRIMARY KEY (id)
);
CREATE TABLE public.user_table (
	id serial4 NOT NULL,
	user_name varchar(100) NOT NULL,
	description varchar(500) NULL,
	CONSTRAINT user_table_pkey PRIMARY KEY (id)
);
Схема БД public
Схема БД public
Вот как выглядят таблица log
Вот как выглядят таблица log
Вот как выглядит таблица user_table
Вот как выглядит таблица user_table

Отлично, блюдо запеклось и готово к подаче:

- Запускаем проект, проверяем, что все настроено и корректно работает

Spring logs
Spring logs

- Открываем телеграмм и пробуем на вкус нашего "Франкенштейна"

  • Пишем - /start и начинаем тест ... Я в шоке, оно живое !

Общение с ботом в Телеграмм
Общение с ботом в Телеграмм

- Давайте посмотрим, что же нам написал Spring в логах и записались ли данные в Kafka и БД ?

Логи нашего бота, ошибок не наблюдается
  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::                (v2.4.2)
2022-01-15 16:46:19.248  INFO 412498 --- [           main] com.secretary.bot.WebHookApp             : The following profiles are active: bot
2022-01-15 16:46:19.291  WARN 412498 --- [kground-preinit] o.s.h.c.j.Jackson2ObjectMapperBuilder    : For Jackson Kotlin classes support please add "com.fasterxml.jackson.module:jackson-module-kotlin" to the classpath
2022-01-15 16:46:19.882  INFO 412498 --- [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat initialized with port(s): 8081 (http)
2022-01-15 16:46:19.887  INFO 412498 --- [           main] o.apache.catalina.core.StandardService   : Starting service [Tomcat]
2022-01-15 16:46:19.887  INFO 412498 --- [           main] org.apache.catalina.core.StandardEngine  : Starting Servlet engine: [Apache Tomcat/9.0.41]
2022-01-15 16:46:19.956  INFO 412498 --- [           main] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring embedded WebApplicationContext
2022-01-15 16:46:19.957  INFO 412498 --- [           main] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 678 ms
2022-01-15 16:46:20.013  INFO 412498 --- [           main] c.secretary.bot.config.DefaultDbConfig   : [db] настройки БД: [{"url":"jdbc:postgresql://localhost:5432/postgres","driver":"org.postgresql.Driver","user":"*****","password":"*****","poolSize":"10","minPoolSize":4,"maxPoolSize":10,"idleTimeout":0,"maxLifetime":0,"bulkSize":null,"h2Database":false}]
2022-01-15 16:46:20.565  INFO 412498 --- [           main] o.s.s.concurrent.ThreadPoolTaskExecutor  : Initializing ExecutorService 'applicationTaskExecutor'
2022-01-15 16:46:20.574 DEBUG 412498 --- [           main] s.w.s.m.m.a.RequestMappingHandlerAdapter : ControllerAdvice beans: 0 @ModelAttribute, 0 @InitBinder, 1 RequestBodyAdvice, 1 ResponseBodyAdvice
2022-01-15 16:46:20.598 DEBUG 412498 --- [           main] s.w.s.m.m.a.RequestMappingHandlerMapping : 3 mappings in 'requestMappingHandlerMapping'
2022-01-15 16:46:20.619 DEBUG 412498 --- [           main] o.s.w.s.handler.SimpleUrlHandlerMapping  : Patterns [/webjars/**, /**] in 'resourceHandlerMapping'
2022-01-15 16:46:20.627 DEBUG 412498 --- [           main] .m.m.a.ExceptionHandlerExceptionResolver : ControllerAdvice beans: 0 @ExceptionHandler, 1 ResponseBodyAdvice
2022-01-15 16:46:20.702  INFO 412498 --- [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat started on port(s): 8081 (http) with context path ''
2022-01-15 16:46:20.709  INFO 412498 --- [           main] com.secretary.bot.WebHookApp             : Started WebHookApp in 1.65 seconds (JVM running for 1.962)
SSS2022-01-15 16:52:33.916  INFO 412498 --- [legram Executor] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values: 
	acks = 1
	batch.size = 16384
	bootstrap.servers = [localhost:9092]
	buffer.memory = 33554432
	client.dns.lookup = use_all_dns_ips
	client.id = producer-1
	compression.type = none
	connections.max.idle.ms = 540000
	delivery.timeout.ms = 120000
	enable.idempotence = false
	interceptor.classes = []
	internal.auto.downgrade.txn.commit = true
	key.serializer = class org.apache.kafka.common.serialization.StringSerializer
	linger.ms = 0
	max.block.ms = 60000
	max.in.flight.requests.per.connection = 5
	max.request.size = 1048576
	metadata.max.age.ms = 300000
	metadata.max.idle.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
	receive.buffer.bytes = 32768
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 30000
	retries = 2147483647
	retry.backoff.ms = 100
	sasl.client.callback.handler.class = null
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.login.callback.handler.class = null
	sasl.login.class = null
	sasl.login.refresh.buffer.seconds = 300
	sasl.login.refresh.min.period.seconds = 60
	sasl.login.refresh.window.factor = 0.8
	sasl.login.refresh.window.jitter = 0.05
	sasl.mechanism = GSSAPI
	security.protocol = PLAINTEXT
	security.providers = null
	send.buffer.bytes = 131072
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
	ssl.endpoint.identification.algorithm = https
	ssl.engine.factory.class = null
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLSv1.3
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	transaction.timeout.ms = 60000
	transactional.id = null
	value.serializer = class org.apache.kafka.common.serialization.StringSerializer

2022-01-15 16:52:33.947  INFO 412498 --- [legram Executor] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.6.0
2022-01-15 16:52:33.948  INFO 412498 --- [legram Executor] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 62abe01bee039651
2022-01-15 16:52:33.948  INFO 412498 --- [legram Executor] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1642254753947
2022-01-15 16:52:34.056  INFO 412498 --- [ad | producer-1] org.apache.kafka.clients.Metadata        : [Producer clientId=producer-1] Cluster ID: faKjxP6CTvGFeeVKJw
2022-01-15 16:54:01.115  INFO 412498 --- [legram Executor] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Starting...
2022-01-15 16:54:01.188  INFO 412498 --- [legram Executor] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Start completed.

- Как мы видим, сообщения отправленные Боту появились в БД

Записи в БД
Записи в БД

- Открыв кондуктор, перейдите во вкладку topics, после нажимаем на наш топик users

Вкладка topics
Вкладка topics

- Далее во вкладке нашего топика нажимаем на кнопку CONSUME DATA

Информация о топике users
Информация о топике users

- В открывшемся окне, ставим такие же настройки (самая важная из них это Start From - указывает, с какого момента показывать сообщения в Kafka, наша настройка - показывает все сообщения, включая отправленые ранее)

Настройки просмотра сообщений
Настройки просмотра сообщений

- Вот и все, теперь мы убедились, что сообщения благополучно прилетели в Kafka, записались в БД и не вызвали ошибок в приложении

Прилетевшие в Kafka сообщения
Прилетевшие в Kafka сообщения

Ну что же, большое всем спасибо за время, потраченное на прочтение данной статьи, жду вас во второй части этого туториала, где мы используем Consumer Kafka, с помощью которого будем обрабатывать прилетающие сообщения.

Комментарии (11)


  1. aszhitarev
    12.03.2022 15:57
    +3

    Простите, накипело

    1. Телеграм пишется с одной М
    2. Франкенштейн — создатель и его звали Виктор. А создание, которое он создал, так и назывался — монстр Франкенштейна.

    Ещё раз простите.
    А статья отличная. Жду продолжения.


  1. MWGuy
    12.03.2022 19:56
    +4

    В чём причина использовать application.yml и application.properties вместе? В этом нет никакого смысла, так как это один и тот-же файл, но в разных форматах.

    Также хочу ещё обратить внимание автора на "build.grable" :^)


    1. MWGuy
      12.03.2022 20:02
      +5

      Также советую автору прочитать хоть базовый туториал про Spring Data. Тогда бы данная статья сократилась минимум в 2 раза.

      Ещё хочу отметить, что мы используем Spring Boot, а не голый Spring. Это значит что проблему с конфигурацией уже решили за нас. К примеру: ObjectMapper уже инициализирован и лежит в контексте, создавать его не нужно


      1. JuniorProg Автор
        14.03.2022 19:35

        Спасибо большое, я учту это. :)


    1. MWGuy
      12.03.2022 20:09
      +3

      Пожалуй напишу ещё один совет для автора - знай свой инструмент. Intellij IDEA включает в себя SQL клиент. Никакой дополнительный больше не нужен.
      Почитать про него можно тут: https://www.jetbrains.com/help/idea/relational-databases.html


      1. UnicornFreedom
        14.03.2022 08:49

        Справедливости ради - Database tools это часть пакета Ultimate.
        Помимо собственно "знания своего инструмента", потребуется взнос в размере $50 в месяц (либо $500 в год).


        1. Borz
          14.03.2022 21:39

          но, когда автор статьи предлагаетесоздавать Spring проект через встроенный в IDEA конфигуратор, разве не предполагается, что это Ultimate редакция? Ведь в Community версии нет поддержки Spring


  1. LeshaRB
    12.03.2022 22:58
    +1

    1. Внедрение рекомендуется делать через конструктор, а не через сеттер

    2. @RequestMapping(path = "/users_idea", method = RequestMethod.GET), почему не использовать Get mapping?

    3. Зачем использовать Slf4j если в коде нет логирования? Абы было?

    4. Где конструктор самописный, где-то через ломобок

    Ну и прочее


  1. isden
    13.03.2022 10:53
    +2

    К написанному выше еще хочется добавить:


    Далее запускаем DBeaver и создаем 2 таблицы (log и user_table), вот схема создания таблиц:

    Почему не используете вещи вроде liquibase? Более того, в Spring Boot есть и другие варианты.
    Да, и где же тесты? :) Это же самое интересное.


  1. martuz0
    14.03.2022 08:44

    Для первой статьи, это весьма круто! Так держать!

    P.S обратил внимание, что вы используете Кондуктор, круто, но платно)), есть интересное open source решение, которое позволяет менеджить кластеры кафка


  1. slayeeer
    14.03.2022 09:04

    статья понравилась, но бд можно создавать через java используя ликвибейс