Эта статья является живым очерком о буднях современного тестировщика в agile-команде и рассматривает подход в тестировании, при котором описывается вариант решения проблемы настройки и разворачивания тестируемого приложения. В качестве примера возьмем сервис для поиска данных во временных рядах — Time Series Pattern, разрабатываемый нами на аутсорс. 

Меня зовут Роман Мостафин, и я работаю QA-инженером в компании Factory5. 
Начинаем. 

На данный момент многие команды сталкиваются с тем, что сложно обслуживать среду для тестирования в виде стендов, а также с многопользовательским доступом к одним и тем же данным и версионирование этих стендов. Порою эта проблема стоит довольно остро. Решением может быть поднятие сервисов и части систем на локальной машине. К его плюсам относится: легкость версионирования, быстрота разворачивания и настройки. Данное решение применяется в нашей компании для тестирования отдельных сервисов. И сейчас я расскажу, как мы его используем на примере нашего сервиса разрабатываемого на аутсорсе.  

Установка сервисов 

Для начала нам понадобится утилита Docker — система контейнеризации и виртуализации. 

Команда сделала сборку в виде докер-файла, в которую входят следующие сервисы: 

  • Kafka 

  • Kafdrop 

  • Zookeeper 

  • Kafka UI 

  • Clickhouse 

  • Flink-jobmanager 

  • Flink-taskmanager 

  • TSP 

Создадим директорию на локальной машине и перенесем туда docker-compose файл.

Затем мы воспользуемся терминалом и перейдем в данную директорию. 

Пока поднимается данная сборка, расскажу про возможности самого сервиса. 

TSP (Time Series Patterns) — аналитический сервис для поиска шаблонов в данных временных рядов большого объема. Имеет гибкий DSL для описания паттернов (правил) и представляет собой оптимизированный движок потоковой обработки поверх Apache Flink. Этот сервис может применяться для потоковой обработки данных, а также для частичной обработки данных. 

Принцип работы такой: есть исходные данные, на них пишутся правила (определенные условия поиска данных). Далее эти правила применяются на данных с помощью запроса, в котором мы указываем адреса источников данных, диапазон данных и условие проверки. Как итог, мы получаем запись о том, что найдено в соответствии правилу на определенном отрезке данных. Для запуска потоковой обработки (stream) используется kafka в качестве источника данных и хранилища инцидентов. Для запуска обработки данных по частям (batch) в качестве хранилища данных используется база данных clickhouse. В этой статье мы рассмотрим оба варианта запуска сервиса. 

После того как будет доступен адрес http://localhost:8081/ на локальной машине, можно приступать к настройке. 

Настройка запуска в режиме batch 

Начнем с настройки режима с запуском правил на части данных. Для этого нам понадобится клиент базы данных. Я покажу на примере клиента DataGrip https://www.jetbrains.com/ru-ru/datagrip/. 

Создаем соединение со стандартными настройками clickhouse (username = default, host = localhost, database = default).

Открываем консоль и создаем две таблицы. 

Таблицу для исходных данных (телеметрии): 

create table if not exists test_telemetry 

( 

dt DateTime default toDateTime(ts), 

ts Float64, 

sensor_id String, 

n UInt32, 

value_float Float32, 

equipment_id UInt32, 

engine_no String 

) 

engine = ReplacingMergeTree() 

PARTITION BY toYYYYMM(dt) 

ORDER BY (equipment_id, sensor_id, n, ts) 

SETTINGS index_granularity = 8192; 

И таблицу для хранения инцидентов: 

create table if not exists events_v2_event 

( 

uuid UUID default generateUUIDv4(), 

date_from DateTime64(6) default '0000000000.000000', 

date_to DateTime64(6) default '0000000000.000000', 

processing_date DateTime64(6) default now(), 

type_id UInt16, 

equipment_id UInt32, 

ph_node_id UInt32, 

algorithm_id UInt32, 

value Nullable(Float32), 

context Nullable(String), 

status UInt8 default 1, 

duration Float64 materialized (toUnixTimestamp64Milli(date_to) - toUnixTimestamp64Milli(date_from)) / 1000 

) 

engine = MergeTree() 

PARTITION BY (toYYYYMM(date_from), intDiv(equipment_id, 1000)) 

ORDER BY date_from 

SETTINGS index_granularity = 8192; 

Далее заполним тестовыми данными таблицу телеметрии с помощью следующих запросов: 

INSERT INTO default.test_telemetry (dt, ts, sensor_id, n, value_float, equipment_id, engine_no) VALUES ('2021-10-25 13:23:21', 1635157169, 'test_sensor_2', 1, 10, 1, '1'); 

default.test_telemetry (dt, ts, sensor_id, n, value_float, equipment_id, engine_no) VALUES ('2021-10-25 13:23:21', 1635157170, 'test_sensor_2', 1, 10, 1, '1'); 

INSERT INTO default.test_telemetry (dt, ts, sensor_id, n, value_float, equipment_id, engine_no) VALUES ('2021-10-25 13:23:23', 1635157171, 'test_sensor_2', 1, 10, 1, '1'); 

INSERT INTO default.test_telemetry (dt, ts, sensor_id, n, value_float, equipment_id, engine_no) VALUES ('2021-10-25 13:23:21', 1635157169, 'test_sensor_1', 1, 10, 1, '1'); 

INSERT INTO default.test_telemetry (dt, ts, sensor_id, n, value_float, equipment_id, engine_no) VALUES ('2021-10-25 13:23:21', 1635157170, 'test_sensor_1', 1, 10, 1, '1'); 

INSERT INTO default.test_telemetry (dt, ts, sensor_id, n, value_float, equipment_id, engine_no) VALUES ('2021-10-25 13:23:23', 1635157171, 'test_sensor_1', 1, 10, 1, '1'); 

INSERT INTO default.test_telemetry (dt, ts, sensor_id, n, value_float, equipment_id, engine_no) VALUES ('2021-10-25 13:23:21', 1635157169, 'test_sensor', 1, 10, 1, '1'); 

INSERT INTO default.test_telemetry (dt, ts, sensor_id, n, value_float, equipment_id, engine_no) VALUES ('2021-10-25 13:23:21', 1635157170, 'test_sensor', 1, 10, 1, '1'); 

INSERT INTO default.test_telemetry (dt, ts, sensor_id, n, value_float, equipment_id, engine_no) VALUES ('2021-10-25 13:23:23', 1635157171, 'test_sensor', 1, 10, 1, '1'); 

Теперь мы проверим работоспособность нашей системы выполнив REST запрос в сам TSP. 

Его можно выполнить, используя клиент для Rest. Я использую POSTMAN 

Url: http://localhost:8080/streamJob/from-jdbc/to-jdbc/?run_async=1  

Body: 

{ "sink": { 

"jdbcUrl": "jdbc:clickhouse://default:@clickhouse:8123/default", 

"rowSchema": { 

"toTsField": "date_to", 

"fromTsField": "date_from", 

"unitIdField": "equipment_id", 

"appIdFieldVal": [ 

"type_id", 

1 

], 

"patternIdField": "algorithm_id", 

"subunitIdField": "ph_node_id" 

}, 

"tableName": "events_v2_event", 

"driverName": "ru.yandex.clickhouse.ClickHouseDriver" 

}, 

"uuid": "59338fc6-087a-4f49-a6f8-f0f96cc31bcd_1", 

"source": { 

"query": "SELECT ts, equipment_id AS equipment_id, sensor_id, value_float \nFROM test_telemetry \nWHERE equipment_id IN (1) AND ts >= 1635157169.0 AND ts < 1635157171.0 ORDER BY ts ASC", 

"jdbcUrl": "jdbc:clickhouse://default:@clickhouse:8123/default", 

"sourceId": 1, 

"driverName": "ru.yandex.clickhouse.ClickHouseDriver", 

"unitIdField": "equipment_id", 

"datetimeField": "ts", 

"partitionFields": [ 

"equipment_id" 

], 

"dataTransformation": { 

"type": "NarrowDataUnfolding", 

"config": { 

"keyColumn": "sensor_id", 

"defaultTimeout": 60000, 

"fieldsTimeoutsMs": {}, 

"defaultValueColumn": "value_float" 

} 

} 

}, 

"patterns": [ 

{ 

"id": "1", 

"payload": { 

"subunit": "4" 

}, 

"sourceCode": "\"test_sensor\" = 10 andThen (\"test_sensor_1\" = 20 or \"test_sensor_2\" = 10)", 

"forwardedFields": [ 

"equipment_id" 

] 

} 

] 

} 

После получения успешного ответа мы переходим в ui-флинк и ждем смены статуса job с running на finished.

Далее мы переходим в базу данных, делаем запрос к таблице: select * from events_v2_event; и получаем строчку данных, которая соответствует условию:

Настройка запуска в режиме stream 

Рассмотрим пример запуска в потоке. Различие в том, что запущенная задача не завершается, а ждет новой порции данных в топике, указанном в запросе. 

Для начала мы идем в kafka ui и создаем два топика: 

топик для исходных данных, в данном примере название “test” и топик для инцидентов — events_v2_event. 

Пример: 

Далее с помощью python скрипта и csv файла с тестовыми данными мы заполняем данными топик test. 

Для этого мы создаем файл test_telemetry.csv со следующими данными: 

"dt","ts","sensor_id","n","value_float","equipment_id","engine_no" 
2021-10-25 13:23:21,1635157169,"test_sensor_2",1,10.0,1,"1" 
2021-10-25 13:23:21,1635157170,"test_sensor_2",1,10.0, 1,"1" 
2021-10-25 13:23:23,1635157171,"test_sensor_2",1,10.0,1,"1" 
2021-10-25 13:23:21,1635157169,"test_sensor_1",1,10.0,1,"1" 
2021-10-25 13:23:21,1635157170,"test_sensor_1",1,10.0,1,"1" 
2021-10-25 13:23:23,1635157171,"test_sensor_1",1,10.0,1,"1" 
2021-10-25 13:23:21,1635157169,"test_sensor",1,10.0,1,"1" 
2021-10-25 13:23:21,1635157170,"test_sensor",1,10.0,1,"1" 
2021-10-25 13:23:23,1635157171,"test_sensor",1,10.0,1,"1" 

Далее мы создаем python-file со следующим содержимым: 

import pandas as pd 
from kafka import KafkaProducer 
producer = KafkaProducer(bootstrap_servers='10.83.0.3:9092') 
 
df = pd.read_csv('test_telemetry.csv') 
 
df.apply(lambda x: producer.send(topic='test',value=x.to_json().encode('utf-8')), axis=1) 
 

Устанавливаем python >= 3.8, виртуальное окружение и библиотеку kafka-python. 

И запускаем сам скрипт для того, чтобы перенести эти данные в топик.  

 И теперь мы можем отправить запрос на запуск task со следующим body и url.

Url: http://localhost:8080/streamJob/from-kafka/to-kafka/?run_async=1 

Body:

{ 

                        "sink": { 

                            "topic": "events_v2_event", 

                            "broker": "10.83.0.3:9092", 

                            "rowSchema": { 

                                "toTsField": "date_to", 

                                "fromTsField": "date_from", 

                                "unitIdField": "equipment_id", 

                                "appIdFieldVal": [ 

                                    "type_id", 

                                    1 

                                ], 

                                "patternIdField": "algorithm_id", 

                                "subunitIdField": "ph_node_id", 

                                "incidentIdField": "uuid" 

                            }, 

                            "parallelism": 1, 

                            "datetimeField": "ts", 

                            "eventsMaxGapMs": 60000, 

                            "defaultEventsGapMs": 2000, 

                            "numParallelSources": 1, 

                            "patternsParallelism": 1 

                        }, 

                        "uuid": "test_status", 

                        "source": { 

                            "group": "local_1", 

                            "topic": "test", 

                            "brokers": "10.83.0.3:9092", 

                            "sourceId": 1, 

                            "fieldsTypes": { 

                                "ts": "float64", 

                                "sensor_id": "String", 

                                "n": "int32", 

                                "value_float": "float32", 

                                "equipment_id": "int32", 

                                "engine_no": "String" 

                            }, 

                            "unitIdField": "equipment_id", 

                            "datetimeField": "ts", 

                            "partitionFields": [ 

                                "equipment_id" 

                            ], 

                            "dataTransformation": { 

                                "type": "NarrowDataUnfolding", 

                                "config": { 

                                    "keyColumn": "sensor_id", 

                                    "defaultTimeout": 60000, 

                                    "fieldsTimeoutsMs": {}, 

                                    "defaultValueColumn": "value_float", 

                                    "valueColumnMapping": {} 

                                } 

                            } 

                        }, 

                        "patterns": [ 

                            { 

                                "id": "1", 

                                "subunit": 2, 

                                "sourceCode": "\"test_sensor\" = 10 andThen (\"test_sensor_1\" = 20 or \"test_sensor_2\" = 10)", 

                                "forwardedFields": [ 

                                    "equipment_id" 

                               ] 

                            } 

                        ] 

                    } 

Подождав некоторое время, мы можем перейти в топик с инцидентами и обнаружим там запись об инциденте:

Итоги: 

На примере настройки и установки сервиса через docker мы рассмотрели решение проблемы тестирования приложений на локальной машине и убедились, что это возможно при наличии определенных знаний по системам виртуализации. Конечно, тестирование в реальной среде (в кластере) имеет свои особенности, но большую часть функционала можно проверить именно на локальной машине. Я намеренно упустил шаги по созданию самой конфигурации, так как на просторах интернета есть множество статей как это сделать.  

Всем спасибо за внимание! 

Комментарии (0)