Эта статья является живым очерком о буднях современного тестировщика в agile-команде и рассматривает подход в тестировании, при котором описывается вариант решения проблемы настройки и разворачивания тестируемого приложения. В качестве примера возьмем сервис для поиска данных во временных рядах — Time Series Pattern, разрабатываемый нами на аутсорс.
Меня зовут Роман Мостафин, и я работаю QA-инженером в компании Factory5.
Начинаем.
На данный момент многие команды сталкиваются с тем, что сложно обслуживать среду для тестирования в виде стендов, а также с многопользовательским доступом к одним и тем же данным и версионирование этих стендов. Порою эта проблема стоит довольно остро. Решением может быть поднятие сервисов и части систем на локальной машине. К его плюсам относится: легкость версионирования, быстрота разворачивания и настройки. Данное решение применяется в нашей компании для тестирования отдельных сервисов. И сейчас я расскажу, как мы его используем на примере нашего сервиса разрабатываемого на аутсорсе.
Установка сервисов
Для начала нам понадобится утилита Docker — система контейнеризации и виртуализации.
Команда сделала сборку в виде докер-файла, в которую входят следующие сервисы:
Kafka
Kafdrop
Zookeeper
Kafka UI
Clickhouse
Flink-jobmanager
Flink-taskmanager
TSP
Создадим директорию на локальной машине и перенесем туда docker-compose файл.
Затем мы воспользуемся терминалом и перейдем в данную директорию.
Пока поднимается данная сборка, расскажу про возможности самого сервиса.
TSP (Time Series Patterns) — аналитический сервис для поиска шаблонов в данных временных рядов большого объема. Имеет гибкий DSL для описания паттернов (правил) и представляет собой оптимизированный движок потоковой обработки поверх Apache Flink. Этот сервис может применяться для потоковой обработки данных, а также для частичной обработки данных.
Принцип работы такой: есть исходные данные, на них пишутся правила (определенные условия поиска данных). Далее эти правила применяются на данных с помощью запроса, в котором мы указываем адреса источников данных, диапазон данных и условие проверки. Как итог, мы получаем запись о том, что найдено в соответствии правилу на определенном отрезке данных. Для запуска потоковой обработки (stream) используется kafka в качестве источника данных и хранилища инцидентов. Для запуска обработки данных по частям (batch) в качестве хранилища данных используется база данных clickhouse. В этой статье мы рассмотрим оба варианта запуска сервиса.
После того как будет доступен адрес http://localhost:8081/ на локальной машине, можно приступать к настройке.
Настройка запуска в режиме batch
Начнем с настройки режима с запуском правил на части данных. Для этого нам понадобится клиент базы данных. Я покажу на примере клиента DataGrip https://www.jetbrains.com/ru-ru/datagrip/.
Создаем соединение со стандартными настройками clickhouse (username = default, host = localhost, database = default).
Открываем консоль и создаем две таблицы.
Таблицу для исходных данных (телеметрии):
create table if not exists test_telemetry
(
dt DateTime default toDateTime(ts),
ts Float64,
sensor_id String,
n UInt32,
value_float Float32,
equipment_id UInt32,
engine_no String
)
engine = ReplacingMergeTree()
PARTITION BY toYYYYMM(dt)
ORDER BY (equipment_id, sensor_id, n, ts)
SETTINGS index_granularity = 8192;
И таблицу для хранения инцидентов:
create table if not exists events_v2_event
(
uuid UUID default generateUUIDv4(),
date_from DateTime64(6) default '0000000000.000000',
date_to DateTime64(6) default '0000000000.000000',
processing_date DateTime64(6) default now(),
type_id UInt16,
equipment_id UInt32,
ph_node_id UInt32,
algorithm_id UInt32,
value Nullable(Float32),
context Nullable(String),
status UInt8 default 1,
duration Float64 materialized (toUnixTimestamp64Milli(date_to) - toUnixTimestamp64Milli(date_from)) / 1000
)
engine = MergeTree()
PARTITION BY (toYYYYMM(date_from), intDiv(equipment_id, 1000))
ORDER BY date_from
SETTINGS index_granularity = 8192;
Далее заполним тестовыми данными таблицу телеметрии с помощью следующих запросов:
INSERT INTO default.test_telemetry (dt, ts, sensor_id, n, value_float, equipment_id, engine_no) VALUES ('2021-10-25 13:23:21', 1635157169, 'test_sensor_2', 1, 10, 1, '1');
default.test_telemetry (dt, ts, sensor_id, n, value_float, equipment_id, engine_no) VALUES ('2021-10-25 13:23:21', 1635157170, 'test_sensor_2', 1, 10, 1, '1');
INSERT INTO default.test_telemetry (dt, ts, sensor_id, n, value_float, equipment_id, engine_no) VALUES ('2021-10-25 13:23:23', 1635157171, 'test_sensor_2', 1, 10, 1, '1');
INSERT INTO default.test_telemetry (dt, ts, sensor_id, n, value_float, equipment_id, engine_no) VALUES ('2021-10-25 13:23:21', 1635157169, 'test_sensor_1', 1, 10, 1, '1');
INSERT INTO default.test_telemetry (dt, ts, sensor_id, n, value_float, equipment_id, engine_no) VALUES ('2021-10-25 13:23:21', 1635157170, 'test_sensor_1', 1, 10, 1, '1');
INSERT INTO default.test_telemetry (dt, ts, sensor_id, n, value_float, equipment_id, engine_no) VALUES ('2021-10-25 13:23:23', 1635157171, 'test_sensor_1', 1, 10, 1, '1');
INSERT INTO default.test_telemetry (dt, ts, sensor_id, n, value_float, equipment_id, engine_no) VALUES ('2021-10-25 13:23:21', 1635157169, 'test_sensor', 1, 10, 1, '1');
INSERT INTO default.test_telemetry (dt, ts, sensor_id, n, value_float, equipment_id, engine_no) VALUES ('2021-10-25 13:23:21', 1635157170, 'test_sensor', 1, 10, 1, '1');
INSERT INTO default.test_telemetry (dt, ts, sensor_id, n, value_float, equipment_id, engine_no) VALUES ('2021-10-25 13:23:23', 1635157171, 'test_sensor', 1, 10, 1, '1');
Теперь мы проверим работоспособность нашей системы выполнив REST запрос в сам TSP.
Его можно выполнить, используя клиент для Rest. Я использую POSTMAN
Url: http://localhost:8080/streamJob/from-jdbc/to-jdbc/?run_async=1
Body:
{ "sink": {
"jdbcUrl": "jdbc:clickhouse://default:@clickhouse:8123/default",
"rowSchema": {
"toTsField": "date_to",
"fromTsField": "date_from",
"unitIdField": "equipment_id",
"appIdFieldVal": [
"type_id",
1
],
"patternIdField": "algorithm_id",
"subunitIdField": "ph_node_id"
},
"tableName": "events_v2_event",
"driverName": "ru.yandex.clickhouse.ClickHouseDriver"
},
"uuid": "59338fc6-087a-4f49-a6f8-f0f96cc31bcd_1",
"source": {
"query": "SELECT ts, equipment_id AS equipment_id, sensor_id, value_float \nFROM test_telemetry \nWHERE equipment_id IN (1) AND ts >= 1635157169.0 AND ts < 1635157171.0 ORDER BY ts ASC",
"jdbcUrl": "jdbc:clickhouse://default:@clickhouse:8123/default",
"sourceId": 1,
"driverName": "ru.yandex.clickhouse.ClickHouseDriver",
"unitIdField": "equipment_id",
"datetimeField": "ts",
"partitionFields": [
"equipment_id"
],
"dataTransformation": {
"type": "NarrowDataUnfolding",
"config": {
"keyColumn": "sensor_id",
"defaultTimeout": 60000,
"fieldsTimeoutsMs": {},
"defaultValueColumn": "value_float"
}
}
},
"patterns": [
{
"id": "1",
"payload": {
"subunit": "4"
},
"sourceCode": "\"test_sensor\" = 10 andThen (\"test_sensor_1\" = 20 or \"test_sensor_2\" = 10)",
"forwardedFields": [
"equipment_id"
]
}
]
}
После получения успешного ответа мы переходим в ui-флинк и ждем смены статуса job с running на finished.
Далее мы переходим в базу данных, делаем запрос к таблице: select * from events_v2_event; и получаем строчку данных, которая соответствует условию:
Настройка запуска в режиме stream
Рассмотрим пример запуска в потоке. Различие в том, что запущенная задача не завершается, а ждет новой порции данных в топике, указанном в запросе.
Для начала мы идем в kafka ui и создаем два топика:
топик для исходных данных, в данном примере название “test” и топик для инцидентов — events_v2_event.
Пример:
Далее с помощью python скрипта и csv файла с тестовыми данными мы заполняем данными топик test.
Для этого мы создаем файл test_telemetry.csv со следующими данными:
"dt","ts","sensor_id","n","value_float","equipment_id","engine_no"
2021-10-25 13:23:21,1635157169,"test_sensor_2",1,10.0,1,"1"
2021-10-25 13:23:21,1635157170,"test_sensor_2",1,10.0, 1,"1"
2021-10-25 13:23:23,1635157171,"test_sensor_2",1,10.0,1,"1"
2021-10-25 13:23:21,1635157169,"test_sensor_1",1,10.0,1,"1"
2021-10-25 13:23:21,1635157170,"test_sensor_1",1,10.0,1,"1"
2021-10-25 13:23:23,1635157171,"test_sensor_1",1,10.0,1,"1"
2021-10-25 13:23:21,1635157169,"test_sensor",1,10.0,1,"1"
2021-10-25 13:23:21,1635157170,"test_sensor",1,10.0,1,"1"
2021-10-25 13:23:23,1635157171,"test_sensor",1,10.0,1,"1"
Далее мы создаем python-file со следующим содержимым:
import pandas as pd
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='10.83.0.3:9092')
df = pd.read_csv('test_telemetry.csv')
df.apply(lambda x: producer.send(topic='test',value=x.to_json().encode('utf-8')), axis=1)
Устанавливаем python >= 3.8, виртуальное окружение и библиотеку kafka-python.
И запускаем сам скрипт для того, чтобы перенести эти данные в топик.
И теперь мы можем отправить запрос на запуск task со следующим body и url.
Url: http://localhost:8080/streamJob/from-kafka/to-kafka/?run_async=1
Body:
{
"sink": {
"topic": "events_v2_event",
"broker": "10.83.0.3:9092",
"rowSchema": {
"toTsField": "date_to",
"fromTsField": "date_from",
"unitIdField": "equipment_id",
"appIdFieldVal": [
"type_id",
1
],
"patternIdField": "algorithm_id",
"subunitIdField": "ph_node_id",
"incidentIdField": "uuid"
},
"parallelism": 1,
"datetimeField": "ts",
"eventsMaxGapMs": 60000,
"defaultEventsGapMs": 2000,
"numParallelSources": 1,
"patternsParallelism": 1
},
"uuid": "test_status",
"source": {
"group": "local_1",
"topic": "test",
"brokers": "10.83.0.3:9092",
"sourceId": 1,
"fieldsTypes": {
"ts": "float64",
"sensor_id": "String",
"n": "int32",
"value_float": "float32",
"equipment_id": "int32",
"engine_no": "String"
},
"unitIdField": "equipment_id",
"datetimeField": "ts",
"partitionFields": [
"equipment_id"
],
"dataTransformation": {
"type": "NarrowDataUnfolding",
"config": {
"keyColumn": "sensor_id",
"defaultTimeout": 60000,
"fieldsTimeoutsMs": {},
"defaultValueColumn": "value_float",
"valueColumnMapping": {}
}
}
},
"patterns": [
{
"id": "1",
"subunit": 2,
"sourceCode": "\"test_sensor\" = 10 andThen (\"test_sensor_1\" = 20 or \"test_sensor_2\" = 10)",
"forwardedFields": [
"equipment_id"
]
}
]
}
Подождав некоторое время, мы можем перейти в топик с инцидентами и обнаружим там запись об инциденте:
Итоги:
На примере настройки и установки сервиса через docker мы рассмотрели решение проблемы тестирования приложений на локальной машине и убедились, что это возможно при наличии определенных знаний по системам виртуализации. Конечно, тестирование в реальной среде (в кластере) имеет свои особенности, но большую часть функционала можно проверить именно на локальной машине. Я намеренно упустил шаги по созданию самой конфигурации, так как на просторах интернета есть множество статей как это сделать.
Всем спасибо за внимание!