![](https://habrastorage.org/webt/8h/e_/jp/8he_jpuc-mgf_sarxdvkm3n6xsg.png)
На первый взгляд, в этой истории есть всё, чтобы заслужить статус романтичного поста накануне 8 марта: самолёты, любовь, чуточка шпионажа и, наконец, котик (точнее, кошка). Трудно представить, что всё это имеет самое непосредственное отношение к Kafka, KSQL и эксперименту «как в домашних условиях с помощью информационных технологий найти самый шумный самолёт». Трудно, но придётся: именно такой эксперимент провёл Саймон Обьюри, а мы перевели статью его авторства с описанием всех подробностей процесса.
Наша новая кошка по имени Снежинка просыпается рано. Её будят звуки самолётов, пролетающих над нашим домом. А что если бы я, используя Apache Kafka, KSQL и Raspberry Pi, смог определить, какой именно самолёт не даёт моей кошке спать? Хорошо бы еще создать занятную панель слежения, на которую кошка могла бы переключить свое внимание — и дать мне ещё немножко поспать.
В общих чертах
![](https://habrastorage.org/webt/f8/df/kg/f8dfkgmmkghrnfhtjecrthwx8j8.png)
Переносим самолёты с неба в графики с помощью Kafka и KSQL
Самолёты определяют свое местоположение с помощью GPS приёмников. Бортовой передатчик периодически сообщает локацию, идентификационный номер, высоту и скорость корабля, используя короткие радиопередачи. Эти передачи вещательного автоматического зависимого наблюдения (АЗН-В) являются по сути пакетами данных, открытыми для доступа с наземных станций.
Один микрокомпьютер, такой как Raspberry Pi, и несколько вспомогательных компонентов — это всё, что требуется для получения сообщений от бортовых передатчиков самолётов, снующих над моим домом.
Бортовые сигналы самолётов выглядят, как запутанный клубок сообщений и требуют систематизации. Распознать эти хаотичные потоки данных — это всё равно, что подслушать беседу на шумной вечеринке. Поэтому, чтобы найти самолёт, который тревожит мою кошку, я решил использовать сочетание Kafka и KSQL.
![](https://habrastorage.org/webt/om/2c/o0/om2co03tws9dxaml4hsmwrgqzsg.png)
Разбуженная кошка и Raspberry Pi
Сбор показаний АЗН-В с помощью Raspberry Pi
Для сбора бортовых передач я использовал Raspberry Pi и RTL2832U — USB-модем, который изначально продавался как устройство для просмотра цифрового ТВ на компьютере. На Raspberry Pi я установил dump1090 — программу, которая получает данные с АЗН-В через RTL2832U с помощью небольшой антенны.
![](https://habrastorage.org/webt/_u/dx/rd/_udxrdhg_e8cvqomnkbfkiuvvgm.png)
Мой программный радиоприёмник из Raspberry Pi и RTL2832U
Преобразуем сигналы АЗН-В в темы Kafka
Теперь, когда я получил поток необработанных сигналов АЗН-В, нам следует обратить внимание на трафик. Raspberry Pi не имеет достаточной мощности для серьезных вычислений, поэтому мне пришлось передать обработку данных моему локальному кластеру на Kafka.
![](https://habrastorage.org/webt/ar/27/xb/ar27xbolmidgiscrmspvto0vgnk.png)
Получаемые сообщения делятся либо на сообщения о локации, либо на сообщения об идентификации борта. Локация выглядит как сообщение вида: «борт 7c6db8 летит на высоте 6250 футов в координате -33.8,151.0». Информация об идентификации борта будет выглядит как: «борт 7c451c совершает полет по маршруту QJE1726».
Небольшой Python-скрипт для моей Raspberry Pi разделяет все входящие сообщения АЗН-В. Я использовал прокси-сервер Confluent Rest Proxy для распределения данных с Raspberry Pi в темы location-topic и ident-topic на Kafka. Прокси-сервер предоставляет RESTful интерфейс для кластера Kafka, что позволяет легко создавать сообщения путём простого REST-вызова на Pi.
![](https://habrastorage.org/webt/6d/ox/35/6dox35w1d-qrzyaiowi80kxtglk.png)
Я хотел понять, какие самолёты летают над моей крышей и по каким маршрутам. База данных OpenFlights позволяет сопоставить код авиаборта, например 7C6DB8, присвоенный Международной организацией гражданской авиации (ИКАО), с типом самолёта — в нашем случае «Боинг-737». Я загрузил данные моего картографирования в тему icao-to-aircraft.
KSQL предоставляет «SQL-движок», который даёт возможность обрабатывать данные в режиме реального времени по темам Apache Kafka. Например, чтобы найти бортовой код 7C6DB8, мы можем написать следующий запрос:
CREATE TABLE icao_to_aircraft WITH (KAFKA_TOPIC='ICAO_TO_AIRCRAFT_REKEY', VALUE_FORMAT='AVRO', KEY='ICAO');
ksql> SELECT manufacturer, aircraft, registration \
FROM icao_to_aircraft \
WHERE icao = '7C6DB8';
Boeing | B738 | VH-VYI
Аналогично, в тему callsign-details я загрузил позывные (т. е. QFA563, это рейс авиакомпании Qantas из Брисбена в Сидней).
CREATE TABLE callsign_details WITH (KAFKA_TOPIC='CALLSIGN_DETAILS_REKEY', VALUE_FORMAT='AVRO', KEY='CALLSIGN');
ksql> SELECT operatorname, fromairport, toairport \
FROM callsign_details \
WHERE callsign = 'QFA563';
Qantas | Brisbane | Sydney
Теперь давайте взглянем на поток данных location-topic. Тут мы можем наблюдать постоянный поток входящих сообщений о местоположении пролетающего самолёта.
kafka-avro-console-consumer --bootstrap-server localhost:9092 --property --topic location-topic
{"ico":"7C6DB8","height":"6250","location":"-33.807724,151.091495"}
Запрос на KSQL будет выглядеть так:
ksql> SELECT TIMESTAMPTOSTRING(rowtime, 'yyyy-MM-dd HH:mm:ss'), \
ico, height, location \
FROM location_stream \
WHERE ico = '7C6DB8';
2018-09-19 07:13:33 | 7C6DB8 | 6250.0 | -33.807724,151.091495
KSQL: гармонизация потоков...
Настоящая ценность KSQL заключается в возможности объединения входящих потоков данных о местоположении с исходными данными тем (см. 03_ksql.sql) — то есть в добавлении полезных сведений к необработанному потоку данных. Это очень похоже на «left join» в традиционной БД. Результатом является еще одна тема Kafka, созданная без единой строчки кода на Java!
source>CREATE STREAM location_and_details_stream AS \
SELECT l.ico, l.height, l.location, t.aircraft \
FROM location_stream l \
LEFT JOIN icao_to_aircraft t ON l.ico = t.icao;
К тому же вы получаете запрос KSQL. Поток данных будет выглядеть так:
ksql> SELECT TIMESTAMPTOSTRING(rowtime, 'yy-MM-dd HH:mm:ss') \
, manufacturer \
, aircraft \
, registration \
, height \
, location \
FROM location_and_details_stream;
18-09-27 09:53:28 | Boeing | B738 | VH-YIA | 7225 | -33.821,151.052
18-09-27 09:53:31 | Boeing | B738 | VH-YIA | 7375 | -33.819,151.049
18-09-27 09:53:32 | Boeing | B738 | VH-YIA | 7425 | -33.818,151.048
Помимо этого, мы можем объединить входящий поток callsign с фиксированной темой callsign_details:
CREATE STREAM ident_callsign_stream AS \
SELECT i.ico \
, c.operatorname \
, c.callsign \
, c.fromairport \
, c.toairport \
FROM ident_stream i \
LEFT JOIN callsign_details c ON i.indentification = c.callsign;
ksql> SELECT TIMESTAMPTOSTRING(rowtime, 'yy-MM-dd HH:mm:ss') \
, operatorname \
, callsign \
, fromairport \
, toairport \
FROM ident_callsign_stream ;
18-09-27 13:33:19 | Qantas | QFA926 | Sydney | Cairns
18-09-27 13:44:11 | China Eastern | CES777 | Kunming | Sydney
18-09-27 14:00:54 | Air New Zealand | ANZ110 | Sydney | Auckland
Теперь у нас есть две информативные темы:
- location_and_details_stream, которая обеспечивает поток обновленной информации о местоположении и скорости самолёта;
- ident_callsign_stream, которая описывает подробности рейса, в том числе авиакомпанию и пункт назначения.
С этими постоянно обновляемыми темами мы можем создать несколько отличных обзорных панелей. Я использовал Kafka Connect, чтобы выгрузить темы Kafka, заполняемые KSQL, в Elasticsearch (полные скрипты здесь).
Обзорная панель Kibana
Вот пример обзорной панели, демонстрирующей местоположение самолёта на карте. Кроме того, вы можете увидеть диаграмму по авиакомпаниям, график высоты полета и облака слов по основным пунктам назначения. Тепловая карта показывает районы сосредоточения самолётов, то есть области с наивысшим уровнем шума.
![](https://habrastorage.org/webt/mh/fz/sy/mhfzsyskws6uoe5s1-b6m9sudzc.gif)
Назад, к кошке
Сегодня кошка разбудила меня в районе 6 часов утра. Может ли KSQL помочь мне найти тот самолёт, который пролетал в это время над моим домом на высоте меньше 3500 футов?
select timestamptostring(rowtime, 'yyyy-MM-dd HH:mm:ss')
, manufacturer
, aircraft
, registration
, height
from location_and_details_stream
where height < 3500 and rowtime > stringtotimestamp('18-09-27 06:10', 'yy-MM-dd HH:mm') and rowtime < stringtotimestamp('18-09-27 06:20', 'yy-MM-dd HH:mm');
2018-09-27 06:15:39 | Airbus | A388 | A6-EOD | 2100.0
2018-09-27 06:15:58 | Airbus | A388 | A6-EOD | 3050.0
Потрясающе! Я могу определить самолёт, оказавшийся над моей крышей в 6:15 утра. Оказывается, Снежинку разбудил Airbus А380 (огромный лайнер, кстати), который летел в Дубай.
Всего пара выходных дней, и у вас есть система потоковой обработки с KSQL. Которая, к тому же, позволяет быстро найти интересные события данных. Хотя Снежинка может отнестись к ним скептически.
![](https://habrastorage.org/webt/vs/vf/yp/vsvfypatv-grib-_dstwrco_r2y.png)
Комментарии (8)
dMac
07.03.2019 11:29/start humor/ В Австралии всё наоборот. И зима у них в июле, и Снежок чёрный /end humor/
Да и статья про акустический шум, оказывается. А я уж увидел в заголовке упоминание о радиоприемнике, да и обрадовался. Думал, что хоть кто-то наконец решил потрудиться над картой электромагнитного загрязнения, пусть хоть и в Австралии. А тут облом, радио только для того, чтобы самолёты определять…
Впрочем, самолёты определять — тоже интересно, спасибо за статью
kashey
Как же повезло Сиднею, что ночью над ним не летают.
4umak
Это, кстати, очень подлый момент, когда приезжаешь впервые к ним в порт ночью, а он закрыт. Очень неожиданно было для человека, много поездившего по азии, где всё круглосуточно работает:)
Но местным наверняка приятно, да