![](https://habrastorage.org/getpro/habr/upload_files/1a2/168/bc5/1a2168bc521f06a268459076e14b07f6.png)
Иногда требуется получать, маршрутизировать, преобразовывать, запрашивать и анализировать все данные о погоде в Соединенных Штатах - по мере появления этих данных. С FLaNK это довольно тривиальный процесс.
![](https://habrastorage.org/getpro/habr/upload_files/436/40a/7a6/43640a7a66b5ccb06536ab054c325a97.png)
![](https://habrastorage.org/getpro/habr/upload_files/b0f/b70/da3/b0fb70da3706c0afa54745c408b97a7e.png)
![](https://habrastorage.org/getpro/habr/upload_files/f0c/18b/715/f0c18b71567cb03d294c496851854aee.png)
От Kafka до Kudu, для любой схемы и любого типа данных, без написания кода – потребуется всего два шага.
![](https://habrastorage.org/getpro/habr/upload_files/766/127/0fb/7661270fb0930701575fbc8a74cd5843.png)
Вот эти схемы: их можно создавать, рекдактировать и сравнивать. Schema Registry поддерживает Swagger API, поэтому не будет проблем с настройкой CI/CD & DevOps пайплайнов:
![](https://habrastorage.org/getpro/habr/upload_files/fbf/2d8/a3c/fbf2d8a3c829084183a2689a6bf9f516.png)
![](https://habrastorage.org/getpro/habr/upload_files/6d2/fb2/c4a/6d2fb2c4a36999f036d80544af534b06.png)
Проверка данных по схеме с вашим уровнем допуска. Если вы хотите, чтобы были разрешены дополнительные поля, то вы это получите.
![](https://habrastorage.org/getpro/habr/upload_files/4a1/c2a/a03/4a1c2aa036c7519b6d738dc6298d19c8.png)
Далее эти данные готовы к анализу, например в BI инструменте Cloudera DataViz.
![](https://habrastorage.org/getpro/habr/upload_files/1ed/f0a/0f7/1edf0a0f739428b97f9c3b7f80f2c9d7.png)
Быстро анализируйте свои данные в Apache Kudu с помощью Apache Hue и Apache Impala.
![](https://habrastorage.org/getpro/habr/upload_files/85c/1ee/c09/85c1eec09ed9e712f16b6bdaafe1e7e3.png)
Давайте возьмем данные всех метеостанций США, даже если они представляют собой заархивированный каталог с множеством файлов XML.
![](https://habrastorage.org/getpro/habr/upload_files/730/2d3/331/7302d333127cc416402707d7bd676296.png)
NiFi флоу состоит из следующих процессоров:
GenerateFlowFile - иморт переодических обновлений о погоде от NOAA
InvokeHTTP - загрузка данных о погоде в ZIP архив
CompressContent - дкомпрессия ZIP
UnpackContent - экстракт файлов из ZIP архива
*RouteOnAttribute - фильтрация только по аэропортам (${filename:startsWith('K')}). опционально.
*QueryRecord - XMLReader в JsonRecordSetWriter. Query: SELECT * FROM FLOWFILE WHERE NOT location LIKE '%Unknown%'. Нужно для удаления неидентифицированных локаций. опционально.
Экспорт в хранилище. Можно использовать PutKudu, PutORC, PutHDFS, PutHiveStreaming, PutHbaseRecord, PutDatabaseRecord, PublishKafkaRecord2* или другие процессоры.
Все процессы можно легко автоматизировать в NiFi.
![](https://habrastorage.org/getpro/habr/upload_files/fec/390/d94/fec390d944ee10a20760e4d393614ff6.png)
С помощью реестра схем доступны все данные вашей темы - даже в формате Avro.
![](https://habrastorage.org/getpro/habr/upload_files/262/c20/daf/262c20daf47cadd9c97955365119ff2c.png)
Пример сообщения в Json:
[ {
"credit" : "NOAA's National Weather Service",
"credit_URL" : "http://weather.gov/",
"image" : {
"url" : "http://weather.gov/images/xml_logo.gif",
"title" : "NOAA's National Weather Service",
"link" : "http://weather.gov"
},
"suggested_pickup" : "15 minutes after the hour",
"suggested_pickup_period" : 60,
"location" : "Stanley Municipal Airport, ND",
"station_id" : "K08D",
"latitude" : 48.3008,
"longitude" : -102.4064,
"observation_time" : "Last Updated on Jul 10 2020, 9:55 am CDT",
"observation_time_rfc822" : "Fri, 10 Jul 2020 09:55:00 -0500",
"weather" : "Fair",
"temperature_string" : "66.0 F (19.0 C)",
"temp_f" : 66.0,
"temp_c" : 19.0,
"relative_humidity" : 83,
"wind_string" : "South at 6.9 MPH (6 KT)",
"wind_dir" : "South",
"wind_degrees" : 180,
"wind_mph" : 6.9,
"wind_kt" : 6,
"pressure_in" : 30.03,
"dewpoint_string" : "60.8 F (16.0 C)",
"dewpoint_f" : 60.8,
"dewpoint_c" : 16.0,
"visibility_mi" : 10.0,
"icon_url_base" : "http://forecast.weather.gov/images/wtf/small/",
"two_day_history_url" : "http://www.weather.gov/data/obhistory/K08D.html",
"icon_url_name" : "skc.png",
"ob_url" : "http://www.weather.gov/data/METAR/K08D.1.txt",
"disclaimer_url" : "http://weather.gov/disclaimer.html",
"copyright_url" : "http://weather.gov/disclaimer.html",
"privacy_policy_url" : "http://weather.gov/notice.html"
} ]
Код
Готовый скрипт: https://github.com/tspannhw/ApacheConAtHome2020/blob/main/scripts/setup.sh
NiFi флоу:
https://github.com/tspannhw/ClouderaFlowManagementWorkshop/tree/main/flows
SQL
INSERT INTO weathernj | |
SELECT `location`, station_id,latitude,longitude,observation_time,weather, | |
temperature_string, temp_f,temp_c,relative_humidity,wind_string,wind_dir,wind_degrees,wind_mph, | |
wind_kt, pressure_in,dewpoint_string,dewpoint_f,dewpoint_c | |
FROM weather | |
WHERE | |
`location` is not null and `location` <> 'null' and trim(`location`) <> '' and `location` like '%NJ'; |
Kafka Insert
https://github.com/tspannhw/ApacheConAtHome2020/blob/main/flinksql/weathernj.sql
Схемы
https://github.com/tspannhw/ApacheConAtHome2020/blob/main/schemas/weathernj.avsc
https://github.com/tspannhw/ApacheConAtHome2020/blob/main/schemas/weather.avsc
Пример вывода результата в Slack
Location Cincinnati/Northern Kentucky International Airport, KY Station KCVG
Temperature: 49.0 F (9.4 C)
Humdity: 83
Wind East at 3.5 MPH (3 KT)
Overcast
Dewpoint 44.1 F (6.7 C)Observed at Tue, 27 Oct 2020 11:52:00 -0400---- tracking info ---- UUID: 2cb6bd67-148c-497d-badf-dfffb4906b89
Kafka offset: 0
Kafka Timestamp: 1603818351260
=========================================================