[Apache, Big Data, Data Engineering] Умные погодные приложения с Flink SQL (перевод)

Автор Сообщение
news_bot ®

Стаж: 6 лет 9 месяцев
Сообщений: 27286

Создавать темы news_bot ® написал(а)
25-Мар-2021 13:33


Иногда требуется получать, маршрутизировать, преобразовывать, запрашивать и анализировать все данные о погоде в Соединенных Штатах - по мере появления этих данных. С FLaNK это довольно тривиальный процесс.
От Kafka до Kudu, для любой схемы и любого типа данных, без написания кода – потребуется всего два шага.
Вот эти схемы: их можно создавать, рекдактировать и сравнивать. Schema Registry поддерживает Swagger API, поэтому не будет проблем с настройкой CI/CD & DevOps пайплайнов:
Проверка данных по схеме с вашим уровнем допуска. Если вы хотите, чтобы были разрешены дополнительные поля, то вы это получите.
Далее эти данные готовы к анализу, например в BI инструменте Cloudera DataViz.
Быстро анализируйте свои данные в Apache Kudu с помощью Apache Hue и Apache Impala.
Давайте возьмем данные всех метеостанций США, даже если они представляют собой заархивированный каталог с множеством файлов XML.
NiFi флоу состоит из следующих процессоров:
  • GenerateFlowFile - иморт переодических обновлений о погоде от NOAA
  • InvokeHTTP - загрузка данных о погоде в ZIP архив
  • CompressContent - дкомпрессия ZIP
  • UnpackContent - экстракт файлов из ZIP архива
  • *RouteOnAttribute - фильтрация только по аэропортам (${filename:startsWith('K')}). опционально.
  • *QueryRecord - XMLReader в JsonRecordSetWriter.   Query:  SELECT * FROM FLOWFILE WHERE NOT location LIKE '%Unknown%'.  Нужно для удаления неидентифицированных локаций.  опционально.
  • Экспорт в хранилище. Можно использовать PutKudu, PutORC, PutHDFS, PutHiveStreaming, PutHbaseRecord, PutDatabaseRecord, PublishKafkaRecord2* или другие процессоры.
Все процессы можно легко автоматизировать в NiFi.
С помощью реестра схем доступны все данные вашей темы - даже в формате Avro.
Пример сообщения в Json:[ {  "credit" : "NOAA's National Weather Service",  "credit_URL" : "http://weather.gov/",  "image" : {    "url" : "http://weather.gov/images/xml_logo.gif",    "title" : "NOAA's National Weather Service",    "link" : "http://weather.gov"  },  "suggested_pickup" : "15 minutes after the hour",  "suggested_pickup_period" : 60,  "location" : "Stanley Municipal Airport, ND",  "station_id" : "K08D",  "latitude" : 48.3008,  "longitude" : -102.4064,  "observation_time" : "Last Updated on Jul 10 2020, 9:55 am CDT",  "observation_time_rfc822" : "Fri, 10 Jul 2020 09:55:00 -0500",  "weather" : "Fair",  "temperature_string" : "66.0 F (19.0 C)",  "temp_f" : 66.0,  "temp_c" : 19.0,  "relative_humidity" : 83,  "wind_string" : "South at 6.9 MPH (6 KT)",  "wind_dir" : "South",  "wind_degrees" : 180,  "wind_mph" : 6.9,  "wind_kt" : 6,  "pressure_in" : 30.03,  "dewpoint_string" : "60.8 F (16.0 C)",  "dewpoint_f" : 60.8,  "dewpoint_c" : 16.0,  "visibility_mi" : 10.0,  "icon_url_base" : "http://forecast.weather.gov/images/wtf/small/",  "two_day_history_url" : "http://www.weather.gov/data/obhistory/K08D.html",  "icon_url_name" : "skc.png",  "ob_url" : "http://www.weather.gov/data/METAR/K08D.1.txt",  "disclaimer_url" : "http://weather.gov/disclaimer.html",  "copyright_url" : "http://weather.gov/disclaimer.html",  "privacy_policy_url" : "http://weather.gov/notice.html"} ]КодГотовый скрипт: https://github.com/tspannhw/ApacheConAtHome2020/blo...scripts/setup.shNiFi флоу:
https://github.com/tspannhw/ClouderaFlowManagementW.../tree/main/flowsSQLINSERT INTO weathernjSELECT `location`, station_id,latitude,longitude,observation_time,weather,temperature_string, temp_f,temp_c,relative_humidity,wind_string,wind_dir,wind_degrees,wind_mph,wind_kt, pressure_in,dewpoint_string,dewpoint_f,dewpoint_cFROM weatherWHERE    `location` is not null and `location` <> 'null' and trim(`location`) <> '' and `location` like '%NJ'; Kafka Inserthttps://github.com/tspannhw/ApacheConAtHome2020/blo...ql/weathernj.sqlСхемыhttps://github.com/tspannhw/ApacheConAtHome2020/blo...s/weathernj.avschttps://github.com/tspannhw/ApacheConAtHome2020/blo...mas/weather.avsc Пример вывода результата в SlackLocation Cincinnati/Northern Kentucky International Airport, KY Station KCVG
Temperature: 49.0 F (9.4 C)
Humdity: 83
Wind East at 3.5 MPH (3 KT)
Overcast
Dewpoint 44.1 F (6.7 C)Observed at Tue, 27 Oct 2020 11:52:00 -0400---- tracking info ----          UUID: 2cb6bd67-148c-497d-badf-dfffb4906b89
  Kafka offset: 0
Kafka Timestamp: 1603818351260
=========================================================
===========
Источник:
habr.com
===========

===========
Автор оригинала: Timothy Spann
===========
Похожие новости: Теги для поиска: #_apache, #_big_data, #_data_engineering, #_prilozhenija (приложения), #_dannye (данные), #_potokovye_dannye (потоковые данные), #_blog_kompanii_cloudera (
Блог компании Cloudera
)
, #_apache, #_big_data, #_data_engineering
Профиль  ЛС 
Показать сообщения:     

Вы не можете начинать темы
Вы не можете отвечать на сообщения
Вы не можете редактировать свои сообщения
Вы не можете удалять свои сообщения
Вы не можете голосовать в опросах
Вы не можете прикреплять файлы к сообщениям
Вы не можете скачивать файлы

Текущее время: 22-Ноя 14:20
Часовой пояс: UTC + 5