Иванов Максим

Младший Java программист

Рецепт легкого перекуса для «Telegram - монстра Франкенштейна»

Ничто не вызывает у нас столь мучительных страданий, как резкая и внезапная перемена.- «Франкенштейн» Мэри Шелли
Ничто не вызывает у нас столь мучительных страданий, как резкая и внезапная перемена.- «Франкенштейн» Мэри Шелли

Всем привет, это вторая часть создания телеграм-бота (ссылка на первую часть), в ней мы реализуем Kafka Consumer, который будет ловить любые колебания в силе и выдавать нам всю информацию о действиях пользователя.

Ингредиенты:

  1. Регистрация бота

  2. Создание Spring Boot проект, проще всего это сделать через Spring Initializr. (в качестве системы сборки будет использоваться Gradle)

  3. Kafka (для отслеживания топиков я использую Conductor)

  4. PostgreSQL (для комфортной работы я использую DBeaver)

Если возникнут сложности с воссозданием туториала

Прошу пишите в комментариях возникшие проблемы, на всякий случай - вот мой git и ТГ

Начинаем с яичных желтков:

Первостепенно нужно настроить build.gradle со всеми зависимостями

build.gradle
plugins {
	id 'org.springframework.boot' version '2.5.6'
	id 'io.spring.dependency-management' version '1.0.11.RELEASE'
	id 'java'
}

group = 'com.demo.kafka'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '14'

repositories {
	mavenCentral()
}

configurations.all {
	exclude module: 'slf4j-log4j12'
}

dependencies {
	implementation 'org.springframework.boot:spring-boot-starter-web:2.5.6'
	implementation 'org.springframework.kafka:spring-kafka:2.7.6'
  implementation 'org.projectlombok:lombok:1.18.22'
	implementation 'org.springframework.boot:spring-boot-starter-jdbc:2.5.6'
	implementation 'org.springframework.data:spring-data-commons:2.6.0'
	implementation 'org.postgresql:postgresql:42.3.1'
	implementation 'com.h2database:h2:1.4.200'

	testImplementation 'org.springframework.boot:spring-boot-starter-test:2.5.6'
	testImplementation 'org.springframework.kafka:spring-kafka-test:2.7.6'

	compileOnly 'org.projectlombok:lombok:1.18.22'
	annotationProcessor 'org.projectlombok:lombok:1.18.22'
}

test {
	useJUnitPlatform()
}

Далее для работы Kafka опишем application.yml, в котором находятся настройки нашего kafka consumer.

application.yml
server:
  port: 9002
spring:
  kafka:
    consumer:
      bootstrap-servers: localhost:9092
      group-id: group_id
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

Ну и в конце - настройки application.properties

application.properties
# HTTP port for incoming requests
server.port=8080

# Log db
app.db.demo.url=jdbc:postgresql://localhost:5432/change-me
app.db.demo.driver=org.postgresql.Driver
app.db.demo.user=change-me
app.db.demo.password=change-me
app.db.demo.pool-size=10

# kafka-metadata-consumer
app.metadata.tag=logs
app.metadata.bootstrapServers=athena:9092
app.metadata.groupId=group_id
app.metadata.topic=users
app.metadata.autoOffsetReset=earliest
app.metadata.enableAutoCommit=false
app.metadata.maxPollRecords=10
app.metadata.concurrency=4
app.metadata.path=files


# logging
logging.level.root=INFO
logging.level.org.springframework.web=DEBUG
logging.level.ru.centerinform.webhook=TRACE
logging.file.name=change-me

Хорошо, говоря о структуре проекта, то советую придерживаться такого вида:

Структура проекта
Структура проекта

Пакеты:

  • config - описание бинов и конфигурации проекта

  • controller - обрабатывает запрос пользователя

  • model - хранит модель данных, а так же описывает маппер для этой модели

  • repository - логика работа с БД

  • service - основная бизнес логика проекта

Намазываем на тост, посыпаем сыром и кидаем в духовку:

Так как настройка бинов, можно сказать - шаблонный код, за основу она идентична с первой частью, так что, не удивляйтесь сходству.

Настройки бинов:

- Первым делом в пакете config прописываем конфигурацию бинов нашего приложения, тут настройки инициализации JdbcTemplate, так же, обратите внимание, что внутри нашего класса DbConfig, есть класс SpringDataJdbcProperties, который описывает настройки SpringDataJdbc

DbConfig
@Configuration
public class DbConfig extends DefaultDbConfig {

    @Bean
    @Qualifier("demo")
    @ConfigurationProperties(prefix = "app.db.demo")
    SpringDataJdbcProperties demoJdbcProperties() {
        return new SpringDataJdbcProperties();
    }

    @Bean
    @Qualifier("demo")
    public DataSource demoDataSource(@Qualifier("demo") SpringDataJdbcProperties properties) {
        return hikariDataSource("db", properties);
    }

    @Bean
    @Qualifier("demo")
    JdbcTemplate demoJdbcTemplate(@Qualifier("demo") DataSource dataSource) {
        return new JdbcTemplate(dataSource);
    }

    @Data
    @NoArgsConstructor
    public static class SpringDataJdbcProperties {

        // constants
        private static final String H2_DATABASE_DRIVER = "org.h2.Driver";

        /**
         * JDBC URL property
         */
        String url;
        /**
         * JDBC driver class name property
         */
        String driver;
        /**
         * JDBC username property
         */
        String user;
        /**
         * JDBC password property
         */
        String password;
        /**
         * Hikari / Vertica maxPoolSize property
         */
        String poolSize;
        /**
         * Minimum pool size
         */
        int minPoolSize = 4;
        /**
         * Maximum pool size
         */
        int maxPoolSize = 10;
        /**
         * This property controls the maximum amount of time (in milliseconds) that a connection is allowed to
         * sit idle in the pool. A value of 0 means that idle connections are never removed from the pool.
         */
        long idleTimeout;
        /**
         * This property controls the maximum lifetime of a connection in the pool. When a connection
         * reaches this timeout, even if recently used, it will be retired from the pool.
         * An in-use connection will never be retired, only when it is idle will it be removed
         */
        long maxLifetime;
        /**
         * Bulk insert size
         */
        Integer bulkSize;


        /**
         * All-args constructor for {@link SpringDataJdbcProperties#toString()} (logging)
         *
         * @param url JDBC driver class name property
         * @param driver JDBC driver class name property
         * @param user JDBC username property
         * @param password JDBC password property
         * @param poolSize Hikari / Vertica maxPoolSize property
         * @param bulkSize bulk insert size
         */
        public SpringDataJdbcProperties(
                String url, String driver, String user, String password, String poolSize, Integer bulkSize) {
            this.url = url;
            this.driver = driver;
            this.user = user;
            this.password = password;
            this.poolSize = poolSize;
            this.bulkSize = bulkSize;
        }


        /**
         * Возвращает истину, если экземпляр описывает in-memory H2 database
         *
         * @return истина, если экземпляр описывает in-memory H2 database
         */
        public boolean isH2Database() {
            return driver.equals(H2_DATABASE_DRIVER);
        }

        /**
         * Возвращает строковое представление экземпляра объекта в формате JSON
         *
         * @return строковое представление экземпляра объекта в формате JSON
         */
        @Override
        public String toString() {
            var props = new SpringDataJdbcProperties(
                    url, driver, user, ((password == null) || password.isEmpty()) ? "" : "*****", poolSize, bulkSize);
            return Json.encode(props);
        }

    }

}

- Создадим базовый класс для уменьшения дублирования кода инициализации бинов

DefaultDbConfig
@Slf4j
class DefaultDbConfig {

    protected DataSource hikariDataSource(String tag, DbConfig.SpringDataJdbcProperties properties) {
        log.info("[{}] настройки БД: [{}]", tag, properties.toString());

        HikariDataSource ds = new HikariDataSource();
        ds.setJdbcUrl(properties.getUrl());
        ds.setDriverClassName(properties.getDriver());
        ds.setUsername(properties.getUser());
        ds.setPassword(properties.getPassword());
        ds.setMaximumPoolSize(Integer.parseInt(properties.getPoolSize()));
        return ds;
    }
}

- После, напишем утилитный класс для логирования.

Json
public class Json {
    static final ObjectMapper mapper = new ObjectMapper();

    /**
     * Encode instance as JSON
     *
     * @param obj instance
     * @return JSON
     */
    public static String encode(Object obj) {
        try {
            return mapper.writeValueAsString(obj);
        } catch (JsonProcessingException e) {
            return obj.toString();
        }
    }

    public static <T> T decode(String json, Class<T> clazz) throws JsonProcessingException {
        return mapper.readValue(json, clazz);
    }

}

Создание модели данных

- Создаем модель для обработки логов ConsumerLog, а так же маппер ConsumerMapper, который понадобиться для работы с БД и описывания полей в таблице.

ConsumerLog
@Data
@RequiredArgsConstructor
public class ConsumerLog {

    @JsonProperty("id")
    @JsonIgnoreProperties(ignoreUnknown = true)
    private final int id;

    @JsonProperty("message")
    @JsonIgnoreProperties(ignoreUnknown = true)
    private final String msg;

    @JsonProperty("topic")
    @JsonIgnoreProperties(ignoreUnknown = true)
    private final String topic;

    @JsonProperty("logDate")
    @JsonIgnoreProperties(ignoreUnknown = true)
    private final LocalDate logDate;

    @Override
    public String toString() {
        return "Was added log [id=" + id + ", topic=" + topic + "log=" + msg + ", date=" +  logDate.toString() + "]";
    }
}
ConsumerMapper
@Slf4j
public class ConsumerMapper implements RowMapper<ConsumerLog> {

    @Override
    public ConsumerLog mapRow(ResultSet rs, int rowNum) throws SQLException {
        var date = rs.getDate("date_time");
        var entity = new ConsumerLog(
                rs.getInt("id"),
                rs.getString("message"),
                rs.getString("topic"),
                date == null ? null : date.toLocalDate()
        );
        log.trace("ConsumerMapper(): entity = [{}]", entity);
        return entity;
    }
}

После создания модели данных и ее маппера, приступаем к репозиториям

- Создадим интерфейс, который описывает методы, для работы с записями в БД.

IConsumerLogRepository
public interface IConsumerLogRepository {

     /**
      * Возвращает список записей
      *
      * @return список всех записей
      * @throws DbException в случае ошибки БД
      */
     List<ConsumerLog> getLogsList();

     /**
      * Вставка новой записи
      *
      * @param entity новая запись
      * @throws DbException в случае ошибки БД
      */
     void insert(ConsumerLog entity);
}

- Теперь напишем класс, который реализует методы интерфейса.

ConsumerLogRepository
@Repository
@Slf4j
public class ConsumerLogRepository implements IConsumerLogRepository {

    private static final String SQL_SELECT_LIST = "SELECT id, message, date_time, topic FROM log";
    private static final String SQL_INSERT = "INSERT INTO log (message, date_time, topic) VALUES (?, ?, ?)";

    protected final static ConsumerMapper CONSUMER_LOG_MAPPER = new ConsumerMapper();

    protected final JdbcTemplate template;

    public ConsumerLogRepository(@Qualifier("demo") JdbcTemplate template) {
        this.template = template;
    }

    /**
     * Возвращает записи элемента из таблицы логов подписчика
     */
    @Override
    public List<ConsumerLog> getLogsList() throws DbException {
        return template.query(SQL_SELECT_LIST, CONSUMER_LOG_MAPPER);
    }

    /**
     * Заполняет записи элементами из приходящего топика логов
     */
    @Override
    public void insert(ConsumerLog entity) throws DbException {
        var result = template.update(SQL_INSERT, entity.getMsg(), entity.getLogDate(), entity.getTopic());
        if (result != 1) log.trace("ConsumerLogRepository.insert() with {} rows inserted", entity);
        log.trace("insert({}) result={}", entity, result);
    }
}

Ну и главный элемент бизнес логики приложения - kafka consumer

- Это класс подписчик, он получает сообщения из Kafka и обрабатывает их.

Consumer
@Slf4j
@Service
@AllArgsConstructor
public class Consumer {
    private static final String TOPIC_NAME = "users";
    protected final IConsumerLogRepository consumerRepo;

    /**
     * Метод обработки сообщений от producer,
     * который "отлавливает" эти самые сообщения с помощью аннотации KafkaListener и принимает их в виде параметра.
     *
     * @param message сообщение от producer, которое прилетает в кафка
     */
    @KafkaListener(topics = TOPIC_NAME, groupId = "group_id")
    public void consumeWriting(String message) {
        var consumerLog = new ConsumerLog(0, message, TOPIC_NAME, LocalDate.now());
        consumerRepo.insert(consumerLog);
        log.info("#### Consumed received message [{}]", message);
    }

    /**
     * Получение списка логов из БД
     */
    public List<ConsumerLog> consumeLog() {
        var list = consumerRepo.getLogsList();
        list.forEach(msg -> log.info("#### Consumer list log [{}]", msg.toString()));
        return list;
    }
}

Далее, как и в прошлой статье, мы напишем контроллер, для доступа к сервису из вне

- Создаем простенький контроллер, для получения списка логов из БД.

TestController
@Slf4j
@RestController
@AllArgsConstructor
@RequestMapping(value = "/kafka")
public class TestController {

    private final Consumer consumerService;

    /**
     * Возвращает записи элемента из таблицы логов подписчика
     *
     */
    @GetMapping(value = "/log_list")
    public String getLogList() {
        log.trace("[GET] getLogList()");
        return consumerService.consumeLog().toString();
    }
}

В заключении, класс, который собственно и запускает все наше приложение

BotLogsApplication
@SpringBootApplication
public class BotLogsApplication {
	public static void main(String[] args) {
		SpringApplication.run(BotLogsApplication.class, args);
	}
}

Перед тем, как вытаскивать из духовки наше блюдо, нужно подготовиться:

- Сначала проверим, запущена ли Kafka.

запуск по команде - sudo su systemctl start kafka
запуск по команде - sudo su systemctl start kafka

- После, запускаем Conductor и видим, что у нас работает топик users.

Вкладка topics
Вкладка topics

- Далее запускаем DBeaver и благодаря первой статье, у нас уже заранее создано 2 таблицы (log и user_table), схема создания таблиц из первой части

Отлично, вынимаем наши тосты из духовки:

- Запускаем проект, проверяем, что все настроено и корректно работает.

Логи запущенного приложения
  .   ____          _            __ _ _
/\ / ' __ _ () __  __ _ \ \ \ 
( ( )__ | '_ | '| | ' / ` | \ \ \ 
\/  )| |)| | | | | || (| |  ) ) ) )
'  || .__|| ||| |_, | / / / /
=========||==============|/=////
:: Spring Boot ::                (v2.5.6)
2022-01-19 22:14:49.283  INFO 41808 --- [           main] c.l.kafka.consumer.BotLogsApplication    : No active profile set, falling back to default profiles: default
2022-01-19 22:14:49.903  INFO 41808 --- [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat initialized with port(s): 9002 (http)
2022-01-19 22:14:49.910  INFO 41808 --- [           main] o.apache.catalina.core.StandardService   : Starting service [Tomcat]
2022-01-19 22:14:49.910  INFO 41808 --- [           main] org.apache.catalina.core.StandardEngine  : Starting Servlet engine: [Apache Tomcat/9.0.54]
2022-01-19 22:14:49.974  INFO 41808 --- [           main] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring embedded WebApplicationContext
2022-01-19 22:14:49.974  INFO 41808 --- [           main] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 667 ms
2022-01-19 22:14:50.048  INFO 41808 --- [           main] c.l.k.consumer.config.DefaultDbConfig    : [db] настройки БД: [{"url":"jdbc:postgresql://localhost:5432/change-me","driver":"org.postgresql.Driver","user":"change-me","password":"*","poolSize":"10","minPoolSize":4,"maxPoolSize":10,"idleTimeout":0,"maxLifetime":0,"bulkSize":null,"h2Database":false}]
2022-01-19 22:14:50.242 DEBUG 41808 --- [           main] .m.m.a.ExceptionHandlerExceptionResolver : ControllerAdvice beans: 0 @ExceptionHandler, 1 ResponseBodyAdvice2022-01-19 22:14:50.175 DEBUG 41808 --- [           main] s.w.s.m.m.a.RequestMappingHandlerAdapter : ControllerAdvice beans: 0 @ModelAttribute, 0 @InitBinder, 1 RequestBodyAdvice, 1 ResponseBodyAdvice
2022-01-19 22:14:50.214 DEBUG 41808 --- [           main] s.w.s.m.m.a.RequestMappingHandlerMapping : 3 mappings in 'requestMappingHandlerMapping'
2022-01-19 22:14:50.236 DEBUG 41808 --- [           main] o.s.w.s.handler.SimpleUrlHandlerMapping  : Patterns [/webjars/, /] in 'resourceHandlerMapping'
2022-01-19 22:14:50.242 DEBUG 41808 --- [           main] .m.m.a.ExceptionHandlerExceptionResolver : ControllerAdvice beans: 0 @ExceptionHandler, 1 ResponseBodyAdvice
2022-01-19 22:14:50.367  INFO 41808 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [localhost:9092]
check.crcs = true
client.dns.lookup = use_all_dns_ips
client.id = consumer-group_id-1
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = group_id
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
internal.throw.on.fetch.stable.offset.unsupported = false
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
session.timeout.ms = 10000
socket.connection.setup.timeout.max.ms = 127000
socket.connection.setup.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
2022-01-19 22:14:50.406  INFO 41808 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.7.1
2022-01-19 22:14:50.407  INFO 41808 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 61dbce85d0d41457
2022-01-19 22:14:50.407  INFO 41808 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1642619690406
2022-01-19 22:14:50.408  INFO 41808 --- [           main] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-group_id-1, groupId=group_id] Subscribed to topic(s): users
2022-01-19 22:14:50.422  INFO 41808 --- [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat started on port(s): 9002 (http) with context path ''
2022-01-19 22:14:50.429  INFO 41808 --- [           main] c.l.kafka.consumer.BotLogsApplication    : Started BotLogsApplication in 1.413 seconds (JVM running for 1.876)
2022-01-19 22:14:50.551  INFO 41808 --- [ntainer#0-0-C-1] org.apache.kafka.clients.Metadata        : [Consumer clientId=consumer-group_id-1, groupId=group_id] Cluster ID: O9iXkXIMQpKE3DgrEQtJ5w
2022-01-19 22:14:50.552  INFO 41808 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-group_id-1, groupId=group_id] Discovered group coordinator omen:9092 (id: 2147483647 rack: null)
2022-01-19 22:14:50.553  INFO 41808 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-group_id-1, groupId=group_id] (Re-)joining group
2022-01-19 22:14:50.560  INFO 41808 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-group_id-1, groupId=group_id] (Re-)joining group
2022-01-19 22:14:50.562  INFO 41808 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-group_id-1, groupId=group_id] Successfully joined group with generation Generation{generationId=17, memberId='consumer-group_id-1-cb88956f-09ef-4c33-a241-2941be87ff1b', protocol='range'}
2022-01-19 22:14:50.563  INFO 41808 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-group_id-1, groupId=group_id] Finished assignment for group at generation 17: {consumer-group_id-1-cb88956f-09ef-4c33-a241-2941be87ff1b=Assignment(partitions=[users-0])}
2022-01-19 22:14:50.632  INFO 41808 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-group_id-1, groupId=group_id] Successfully synced group in generation Generation{generationId=17, memberId='consumer-group_id-1-cb88956f-09ef-4c33-a241-2941be87ff1b', protocol='range'}
2022-01-19 22:14:50.633  INFO 41808 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-group_id-1, groupId=group_id] Notifying assignor about the new Assignment(partitions=[users-0])
2022-01-19 22:14:50.637  INFO 41808 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-group_id-1, groupId=group_id] Adding newly assigned partitions: users-0
2022-01-19 22:14:50.651  INFO 41808 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-group_id-1, groupId=group_id] Setting offset for partition users-0 to the committed offset FetchPosition{offset=4, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[omen:9092 (id: 0 rack: null)], epoch=0}}
2022-01-19 22:14:50.652  INFO 41808 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : group_id: partitions assigned: [users-0]

- Открываем телеграм и пробуем на вкус наши закуски.

  • Пишем - /start, начинаем тест и видим, что бот работает!

Общение с ботом в Телеграм
Общение с ботом в Телеграм

- Давайте посмотрим, что же нам написал Spring в логах, отловил ли наш consumer данные из Kafka и сделал ли записи в БД?

Логи нашего consumer, ошибок не наблюдается
  .   ____          _            __ _ _
/\ / ' __ _ () __  __ _ \ \ \ 
( ( )__ | '_ | '| | ' / ` | \ \ \ 
\/  )| |)| | | | | || (| |  ) ) ) )
'  || .__|| ||| |_, | / / / /
=========||==============|/=////
:: Spring Boot ::                (v2.5.6)
2022-01-19 22:21:26.142  INFO 42281 --- [           main] c.l.kafka.consumer.BotLogsApplication    : No active profile set, falling back to default profiles: default
2022-01-19 22:21:27.195  INFO 42281 --- [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat initialized with port(s): 9002 (http)
2022-01-19 22:21:27.201  INFO 42281 --- [           main] o.apache.catalina.core.StandardService   : Starting service [Tomcat]
2022-01-19 22:21:27.201  INFO 42281 --- [           main] org.apache.catalina.core.StandardEngine  : Starting Servlet engine: [Apache Tomcat/9.0.54]
2022-01-19 22:21:27.245  INFO 42281 --- [           main] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring embedded WebApplicationContext
2022-01-19 22:21:27.246  INFO 42281 --- [           main] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 1030 ms
2022-01-19 22:21:27.329  INFO 42281 --- [           main] c.l.k.consumer.config.DefaultDbConfig    : [db] настройки БД: [{"url":"jdbc:postgresql://localhost:5432/postgres","driver":"org.postgresql.Driver","user":"postgres","password":"*","poolSize":"10","minPoolSize":4,"maxPoolSize":10,"idleTimeout":0,"maxLifetime":0,"bulkSize":null,"h2Database":false}]
2022-01-19 22:21:27.561 DEBUG 42281 --- [           main] .m.m.a.ExceptionHandlerExceptionResolver : ControllerAdvice beans: 0 @ExceptionHandler, 1 ResponseBodyAdvice2022-01-19 22:21:27.490 DEBUG 42281 --- [           main] s.w.s.m.m.a.RequestMappingHandlerAdapter : ControllerAdvice beans: 0 @ModelAttribute, 0 @InitBinder, 1 RequestBodyAdvice, 1 ResponseBodyAdvice
2022-01-19 22:21:27.524 DEBUG 42281 --- [           main] s.w.s.m.m.a.RequestMappingHandlerMapping : 3 mappings in 'requestMappingHandlerMapping'
2022-01-19 22:21:27.551 DEBUG 42281 --- [           main] o.s.w.s.handler.SimpleUrlHandlerMapping  : Patterns [/webjars/, /] in 'resourceHandlerMapping'
2022-01-19 22:21:27.561 DEBUG 42281 --- [           main] .m.m.a.ExceptionHandlerExceptionResolver : ControllerAdvice beans: 0 @ExceptionHandler, 1 ResponseBodyAdvice
2022-01-19 22:21:27.726  INFO 42281 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [localhost:9092]
check.crcs = true
client.dns.lookup = use_all_dns_ips
client.id = consumer-group_id-1
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = group_id
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
internal.throw.on.fetch.stable.offset.unsupported = false
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
session.timeout.ms = 10000
socket.connection.setup.timeout.max.ms = 127000
socket.connection.setup.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
2022-01-19 22:21:27.772  INFO 42281 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.7.1
2022-01-19 22:21:27.772  INFO 42281 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 61dbce85d0d41457
2022-01-19 22:21:27.772  INFO 42281 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1642620087771
2022-01-19 22:21:27.774  INFO 42281 --- [           main] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-group_id-1, groupId=group_id] Subscribed to topic(s): users
2022-01-19 22:21:27.787  INFO 42281 --- [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat started on port(s): 9002 (http) with context path ''
2022-01-19 22:21:27.794  INFO 42281 --- [           main] c.l.kafka.consumer.BotLogsApplication    : Started BotLogsApplication in 2.184 seconds (JVM running for 2.825)
2022-01-19 22:21:27.964  INFO 42281 --- [ntainer#0-0-C-1] org.apache.kafka.clients.Metadata        : [Consumer clientId=consumer-group_id-1, groupId=group_id] Cluster ID: O9iXkXIMQpKE3DgrEQtJ5w
2022-01-19 22:21:27.965  INFO 42281 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-group_id-1, groupId=group_id] Discovered group coordinator omen:9092 (id: 2147483647 rack: null)
2022-01-19 22:21:27.974  INFO 42281 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-group_id-1, groupId=group_id] (Re-)joining group
2022-01-19 22:21:27.988  INFO 42281 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-group_id-1, groupId=group_id] (Re-)joining group
2022-01-19 22:21:27.993  INFO 42281 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-group_id-1, groupId=group_id] Successfully joined group with generation Generation{generationId=19, memberId='consumer-group_id-1-74db2249-45e9-493e-9180-d349f0688066', protocol='range'}
2022-01-19 22:21:27.994  INFO 42281 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-group_id-1, groupId=group_id] Finished assignment for group at generation 19: {consumer-group_id-1-74db2249-45e9-493e-9180-d349f0688066=Assignment(partitions=[users-0])}
2022-01-19 22:21:28.000  INFO 42281 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-group_id-1, groupId=group_id] Successfully synced group in generation Generation{generationId=19, memberId='consumer-group_id-1-74db2249-45e9-493e-9180-d349f0688066', protocol='range'}
2022-01-19 22:21:28.002  INFO 42281 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-group_id-1, groupId=group_id] Notifying assignor about the new Assignment(partitions=[users-0])
2022-01-19 22:21:28.003  INFO 42281 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-group_id-1, groupId=group_id] Adding newly assigned partitions: users-0
2022-01-19 22:21:28.011  INFO 42281 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-group_id-1, groupId=group_id] Setting offset for partition users-0 to the committed offset FetchPosition{offset=4, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[omen:9092 (id: 0 rack: null)], epoch=0}}
2022-01-19 22:21:28.012  INFO 42281 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : group_id: partitions assigned: [users-0]
2022-01-19 22:21:29.407  INFO 42281 --- [ntainer#0-0-C-1] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Starting...
2022-01-19 22:21:29.566  INFO 42281 --- [ntainer#0-0-C-1] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Start completed.
2022-01-19 22:22:17.027  INFO 42281 --- [ntainer#0-0-C-1] c.logs.kafka.consumer.service.Consumer   : #### Consumed received message [Writing in log -> команда: /start]
2022-01-19 22:22:20.652  INFO 42281 --- [ntainer#0-0-C-1] c.logs.kafka.consumer.service.Consumer   : #### Consumed received message [Writing in log -> мысль: Как же хочется написать статью на Хабр !!!]
2022-01-19 22:22:25.344  INFO 42281 --- [ntainer#0-0-C-1] c.logs.kafka.consumer.service.Consumer   : #### Consumed received message [Writing in log -> мысль: Может написать статью о боте в Телеграмм ?]
2022-01-19 22:22:30.394  INFO 42281 --- [ntainer#0-0-C-1] c.logs.kafka.consumer.service.Consumer   : #### Consumed received message [Writing in log -> мысль: Написать статью!]
2022-01-19 22:22:35.652  INFO 42281 --- [ntainer#0-0-C-1] c.logs.kafka.consumer.service.Consumer   : #### Consumed received message [Writing in log -> команда: /idea]

- Сообщения, отправленные producer в Kafka, были обработаны нашим consumer и записаны в БД.

Записи в БД
Записи в БД

- Далее, по инструкции из первой статьи, откройте окно Сonsume from Topic, здесь показаны прилетевшие в Kafka сообщения

Как и в первой статье, мы убедились, что приложение корректно работает, сообщения благополучно прилетели в Kafka, были отловлены, обработаны и записаны в БД

Прилетевшие в Kafka сообщения
Прилетевшие в Kafka сообщения

Вот и все, надеюсь, что у всех получилось повторить туториал в первого раза, в будущем будет еще много интересного, всем спасибо.

Комментарии (0)