Иногда требуется получать, маршрутизировать, преобразовывать, запрашивать и анализировать все данные о погоде в Соединенных Штатах - по мере появления этих данных. С FLaNK это довольно тривиальный процесс.
От Kafka до Kudu, для любой схемы и любого типа данных, без написания кода – потребуется всего два шага.
Вот эти схемы: их можно создавать, рекдактировать и сравнивать. Schema Registry поддерживает Swagger API, поэтому не будет проблем с настройкой CI/CD & DevOps пайплайнов:
Проверка данных по схеме с вашим уровнем допуска. Если вы хотите, чтобы были разрешены дополнительные поля, то вы это получите.
Далее эти данные готовы к анализу, например в BI инструменте Cloudera DataViz.
Быстро анализируйте свои данные в Apache Kudu с помощью Apache Hue и Apache Impala.
Давайте возьмем данные всех метеостанций США, даже если они представляют собой заархивированный каталог с множеством файлов XML.
NiFi флоу состоит из следующих процессоров:
GenerateFlowFile - иморт переодических обновлений о погоде от NOAA
InvokeHTTP - загрузка данных о погоде в ZIP архив
CompressContent - дкомпрессия ZIP
UnpackContent - экстракт файлов из ZIP архива
*RouteOnAttribute - фильтрация только по аэропортам (${filename:startsWith('K')}). опционально.
*QueryRecord - XMLReader в JsonRecordSetWriter. Query: SELECT * FROM FLOWFILE WHERE NOT location LIKE '%Unknown%'. Нужно для удаления неидентифицированных локаций. опционально.
Экспорт в хранилище. Можно использовать PutKudu, PutORC, PutHDFS, PutHiveStreaming, PutHbaseRecord, PutDatabaseRecord, PublishKafkaRecord2* или другие процессоры.
Все процессы можно легко автоматизировать в NiFi.
С помощью реестра схем доступны все данные вашей темы - даже в формате Avro.
Пример сообщения в Json:
[ {
"credit" : "NOAA's National Weather Service",
"credit_URL" : "http://weather.gov/",
"image" : {
"url" : "http://weather.gov/images/xml_logo.gif",
"title" : "NOAA's National Weather Service",
"link" : "http://weather.gov"
},
"suggested_pickup" : "15 minutes after the hour",
"suggested_pickup_period" : 60,
"location" : "Stanley Municipal Airport, ND",
"station_id" : "K08D",
"latitude" : 48.3008,
"longitude" : -102.4064,
"observation_time" : "Last Updated on Jul 10 2020, 9:55 am CDT",
"observation_time_rfc822" : "Fri, 10 Jul 2020 09:55:00 -0500",
"weather" : "Fair",
"temperature_string" : "66.0 F (19.0 C)",
"temp_f" : 66.0,
"temp_c" : 19.0,
"relative_humidity" : 83,
"wind_string" : "South at 6.9 MPH (6 KT)",
"wind_dir" : "South",
"wind_degrees" : 180,
"wind_mph" : 6.9,
"wind_kt" : 6,
"pressure_in" : 30.03,
"dewpoint_string" : "60.8 F (16.0 C)",
"dewpoint_f" : 60.8,
"dewpoint_c" : 16.0,
"visibility_mi" : 10.0,
"icon_url_base" : "http://forecast.weather.gov/images/wtf/small/",
"two_day_history_url" : "http://www.weather.gov/data/obhistory/K08D.html",
"icon_url_name" : "skc.png",
"ob_url" : "http://www.weather.gov/data/METAR/K08D.1.txt",
"disclaimer_url" : "http://weather.gov/disclaimer.html",
"copyright_url" : "http://weather.gov/disclaimer.html",
"privacy_policy_url" : "http://weather.gov/notice.html"
} ]
Код
Готовый скрипт: https://github.com/tspannhw/ApacheConAtHome2020/blob/main/scripts/setup.sh
NiFi флоу:
https://github.com/tspannhw/ClouderaFlowManagementWorkshop/tree/main/flows
SQL
INSERT INTO weathernj | |
SELECT `location`, station_id,latitude,longitude,observation_time,weather, | |
temperature_string, temp_f,temp_c,relative_humidity,wind_string,wind_dir,wind_degrees,wind_mph, | |
wind_kt, pressure_in,dewpoint_string,dewpoint_f,dewpoint_c | |
FROM weather | |
WHERE | |
`location` is not null and `location` <> 'null' and trim(`location`) <> '' and `location` like '%NJ'; |
Kafka Insert
https://github.com/tspannhw/ApacheConAtHome2020/blob/main/flinksql/weathernj.sql
Схемы
https://github.com/tspannhw/ApacheConAtHome2020/blob/main/schemas/weathernj.avsc
https://github.com/tspannhw/ApacheConAtHome2020/blob/main/schemas/weather.avsc
Пример вывода результата в Slack
Location Cincinnati/Northern Kentucky International Airport, KY Station KCVG
Temperature: 49.0 F (9.4 C)
Humdity: 83
Wind East at 3.5 MPH (3 KT)
Overcast
Dewpoint 44.1 F (6.7 C)Observed at Tue, 27 Oct 2020 11:52:00 -0400---- tracking info ---- UUID: 2cb6bd67-148c-497d-badf-dfffb4906b89
Kafka offset: 0
Kafka Timestamp: 1603818351260
=========================================================