Иногда требуется получать, маршрутизировать, преобразовывать, запрашивать и анализировать все данные о погоде в Соединенных Штатах - по мере появления этих данных. С FLaNK это довольно тривиальный процесс.

От Kafka до Kudu, для любой схемы и любого типа данных, без написания кода – потребуется всего два шага.

Вот эти схемы: их можно создавать, рекдактировать и сравнивать. Schema Registry поддерживает Swagger API, поэтому не будет проблем с настройкой CI/CD & DevOps пайплайнов:

Проверка данных по схеме с вашим уровнем допуска. Если вы хотите, чтобы были разрешены дополнительные поля, то вы это получите.

Далее эти данные готовы к анализу, например в BI инструменте Cloudera DataViz.

Быстро анализируйте свои данные в Apache Kudu с помощью Apache Hue и Apache Impala.

Давайте возьмем данные всех метеостанций США, даже если они представляют собой заархивированный каталог с множеством файлов XML.

NiFi флоу состоит из следующих процессоров:

  1. GenerateFlowFile - иморт переодических обновлений о погоде от NOAA

  2. InvokeHTTP - загрузка данных о погоде в ZIP архив

  3. CompressContent - дкомпрессия ZIP

  4. UnpackContent - экстракт файлов из ZIP архива

  5. *RouteOnAttribute - фильтрация только по аэропортам (${filename:startsWith('K')}). опционально.

  6. *QueryRecord - XMLReader в JsonRecordSetWriter.   Query:  SELECT * FROM FLOWFILE WHERE NOT location LIKE '%Unknown%'.  Нужно для удаления неидентифицированных локаций.  опционально.

  7. Экспорт в хранилище. Можно использовать PutKudu, PutORC, PutHDFS, PutHiveStreaming, PutHbaseRecord, PutDatabaseRecord, PublishKafkaRecord2* или другие процессоры.

Все процессы можно легко автоматизировать в NiFi.

С помощью реестра схем доступны все данные вашей темы - даже в формате Avro.

Пример сообщения в Json:

[ {

  "credit" : "NOAA's National Weather Service",

  "credit_URL" : "http://weather.gov/",

  "image" : {

    "url" : "http://weather.gov/images/xml_logo.gif",

    "title" : "NOAA's National Weather Service",

    "link" : "http://weather.gov"

  },

  "suggested_pickup" : "15 minutes after the hour",

  "suggested_pickup_period" : 60,

  "location" : "Stanley Municipal Airport, ND",

  "station_id" : "K08D",

  "latitude" : 48.3008,

  "longitude" : -102.4064,

  "observation_time" : "Last Updated on Jul 10 2020, 9:55 am CDT",

  "observation_time_rfc822" : "Fri, 10 Jul 2020 09:55:00 -0500",

  "weather" : "Fair",

  "temperature_string" : "66.0 F (19.0 C)",

  "temp_f" : 66.0,

  "temp_c" : 19.0,

  "relative_humidity" : 83,

  "wind_string" : "South at 6.9 MPH (6 KT)",

  "wind_dir" : "South",

  "wind_degrees" : 180,

  "wind_mph" : 6.9,

  "wind_kt" : 6,

  "pressure_in" : 30.03,

  "dewpoint_string" : "60.8 F (16.0 C)",

  "dewpoint_f" : 60.8,

  "dewpoint_c" : 16.0,

  "visibility_mi" : 10.0,

  "icon_url_base" : "http://forecast.weather.gov/images/wtf/small/",

  "two_day_history_url" : "http://www.weather.gov/data/obhistory/K08D.html",

  "icon_url_name" : "skc.png",

  "ob_url" : "http://www.weather.gov/data/METAR/K08D.1.txt",

  "disclaimer_url" : "http://weather.gov/disclaimer.html",

  "copyright_url" : "http://weather.gov/disclaimer.html",

  "privacy_policy_url" : "http://weather.gov/notice.html"

} ]


Код

Готовый скрипт: https://github.com/tspannhw/ApacheConAtHome2020/blob/main/scripts/setup.sh

NiFi флоу:
https://github.com/tspannhw/ClouderaFlowManagementWorkshop/tree/main/flows

SQL

INSERT INTO weathernj

SELECT `location`, station_id,latitude,longitude,observation_time,weather,

temperature_string, temp_f,temp_c,relative_humidity,wind_string,wind_dir,wind_degrees,wind_mph,

wind_kt, pressure_in,dewpoint_string,dewpoint_f,dewpoint_c

FROM weather

WHERE

    `location` is not null and `location` <> 'null' and trim(`location`) <> '' and `location` like '%NJ';

 

Kafka Insert

https://github.com/tspannhw/ApacheConAtHome2020/blob/main/flinksql/weathernj.sql

Схемы

https://github.com/tspannhw/ApacheConAtHome2020/blob/main/schemas/weathernj.avsc

https://github.com/tspannhw/ApacheConAtHome2020/blob/main/schemas/weather.avsc

Пример вывода результата в Slack

Location Cincinnati/Northern Kentucky International Airport, KY Station KCVG
Temperature: 49.0 F (9.4 C)
Humdity: 83
Wind East at 3.5 MPH (3 KT)
Overcast
Dewpoint 44.1 F (6.7 C)Observed at Tue, 27 Oct 2020 11:52:00 -0400---- tracking info ----          UUID: 2cb6bd67-148c-497d-badf-dfffb4906b89
  Kafka offset: 0
Kafka Timestamp: 1603818351260
=========================================================