Apache NiFiセカンド・ステップ
をテンプレートにして作成
[
トップ
] [
新規
|
一覧
|
単語検索
|
最終更新
|
ヘルプ
]
開始行:
「[[.NET 開発基盤部会 Wiki>http://dotnetdevelopmentinfras...
-[[戻る>Apache NiFi]]
--[[Apache NiFiファースト・ステップ]]
--Apache NiFiセカンド・ステップ
*目次 [#r0303ee6]
#contents
*概要 [#r7ffacea]
-[[ファースト・ステップ>Apache NiFiファースト・ステップ]]...
--インストール
--初歩的な利用方法
--基本的な操作方法
-セカンド・ステップでは、
--[[チュートリアル>#j06a5c12]]
---[[チュートリアル 1: シンプルなNiFiデータフロー構築>#f2...
---[[チュートリアル 2: 地理情報でデータフローをエンリッチ...
---[[チュートリアル 3: NextBusライブストリームの取込>#o24...
--[[追加(自習)コンテンツ>#e59fd742]]
---[[WebAPIを作成してみる。>#f80dad4f]]~
HandleHttpRequest/ResponseのProcessorを使用して、WebAPIを...
---[[Webスクレイピング処理を実装してみる。>#n7f497dd]]~
本WikiのFront Pageのタイトルを抽出する。
---[[RDBMS処理を実装してみる。>#vbe20c60]]~
ExecuteSQLをパラメタライズして実行する。
>などをやってみる。
*チュートリアル [#j06a5c12]
以下を参考に、チュートリアルを遂行。
-Learning the Ropes of Apache NiFi · ijokarumawak/hdf-tut...
https://github.com/ijokarumawak/hdf-tutorials-ja/wiki/Lea...
このチュートリアルは、ある都市計画委員会が新規の高速道路...
リアルタイムデータを活用し、交通パターンのより深い理解を...
**チュートリアル 1 [#f20c0bee]
-「シンプルなNiFiデータフロー構築」をやってみる。~
https://github.com/ijokarumawak/hdf-tutorials-ja/wiki/Rop...
-概要
++車両位置情報XMLのシミュレーションデータを取り込む。
++フローファイルから車両位置詳細属性を抽出する。
++それらの詳細属性が空でない場合、JSONファイルに変換。
-リンク切れ
--Lab1-NiFi-Learn-Ropes.xmlテンプレート~
リンク切れだったので演習後のものを以下に添付した。
>&ref(Lab1-NiFi-Learn-Ropes.xml);
--trafficLocs_data_for_simulator.zip~
出だしのZIPがリンク切れなので、[[ココ>https://community.h...
>&ref(16302-trafficlocs-data-for-simulator.zip);
***Processorを追加する。 [#k488bdb8]
以下のProcessorを追加する。
-GetFile~
処理するファイルを入力。
-UnpackContent~
交通シミュレータZipファイルからフローファイルのコンテンツ...
-ControlRate~
プロセッサに流すフローファイルのフローのレートを制御
-EvaluateXPath~
車両位置情報XMLから最終更新タイムスタンプを属性として抽出
-SplitXML~
親の子要素を複数のフローファイルに分割
-UpdateAttribute~
各フローファイルのファイル名称属性を更新(ファイル名)
-EvaluateXPath~
車両位置情報XMLから車両ID、方向、緯度、軽度、速度を属性と...
-RouteOnAttribute~
各属性がフィルタ条件に一致する場合のみ、フローファイルを...
-AttributesToJSON~
フローファイルから属性を抽出し、JSON形式に変換
-MergeContent~
JSON形式のフローファイルのグループを配列としてマージ
-PutFile~
処理したファイルを出力。
***Connectionで接続する。 [#ib8c3d39]
図がリンク切れだけど、コンテキストから推測するに多分、こ...
#ref(1-1.png,left,nowrap,手順1-1,60%)
※ ConnectionのNameには適当な名称を付与。~
全てのRelationshipsを選択し、ProcessorのTerminateは設定...
また、Selected Prioritizersには、FirstInFirstOutPriorit...
***Processorを設定する。 [#s638be10]
「[[1.1 学習の目的: データフロー作成プロセスの概要>https:...
(詳細は「[[Step 2>https://github.com/ijokarumawak/hdf-tu...
-GetFile
--Input Directory : ./data-in
-UnpackContent
--Packaging Format : zip
-ControlRate
--Rate Control Criteria : flowfile count
--Maximum Rate : 3
--setTime Duration : 10 second
-EvaluateXPath
--Destination : flowfile-attribute ※ XPath式の結果をフロ...
--XPath式
---Last_Time : //body/lastTime/@time ※ [+]ボタンでプロパ...
-SplitXML : 既定値
-UpdateAttribute
--filename : ${UUID()} ※ [+]ボタンでプロパティ追加する。
-EvaluateXPath~
--Destination : flowfile-attribute ※ XPath式の結果をフロ...
--XPath式
---Direction_of_Travel : //vehicle/@dirTag ※ [+]ボタンで...
---Latitude : //vehicle/@lat ※ [+]ボタンでプロパティ追加...
---Longitude : //vehicle/@lon ※ [+]ボタンでプロパティ追加...
---Vehicle_ID : //vehicle/@id ※ [+]ボタンでプロパティ追加...
---Vehicle_Speed : //vehicle/@speedKmHr ※ [+]ボタンでプロ...
-RouteOnAttribute
--Filter_Attributes : 以下の式を記入 ※ [+]ボタンでプロパ...
${Direction_of_Travel:isEmpty():not():and(${Last_Time:is...
--Terminate Relationships : unmatched をチェック~
※ ココだけは必要そうなので。ConnectionのRelationshipsも...
-AttributesToJSON
--Destination : flowfile-content
--Attributes List : Vehicle_ID, Direction_of_Travel, Lati...
-MergeContent
--Minimum Number of Entries : 10
--Maximum Number of Entries : 15
--Delimiter Strategy : Text
--Header : [
--Footer : ]
--Demarcator : , {press-shift+enter}
>※ 波括弧内は要するに改行コードを意味している。~
NiFiのGUIで改行コードを入力するのにshiftが必要ということ。
-PutFile
--Directory : ./data-out/filtered_transitLoc_data
--Terminate Relationships : failure, success をチェック
ココまでの設定が適切であれば、Processorのwarningマークは...
#ref(1-2.png,left,nowrap,手順1-2,60%)
※ RouteOnAttributeだけ、Terminate Relationships : unmatch...
***データフローを実行する。 [#z60ae427]
-Processorを選択していない状態でOperatorの再生ボタンを押...
-入力フォルダにZIPファイルを放り込むと、上手いこと、出力...
-しかし、ファイル名がUUIDのものが出力される。これはマージ...
-MergeContentのTerminate Relationships : original をチェ...
**チュートリアル 2 [#g5d5e40d]
-「地理情報でデータフローをエンリッチメント」をやってみる...
https://github.com/ijokarumawak/hdf-tutorials-ja/wiki/Rop...
-概要
--Google Places APIをNiFiから利用し、
--車両の移動(車両位置情報XMLのシミュレーションデータ)に...
-リンク切れ
--Lab2-NiFi-Learn-Ropes.xmlテンプレート~
リンク切れだったので演習後のものを以下に添付した。
>&ref(Lab2-NiFi-Learn-Ropes.xml);~
※ Google Places APIのAPIキーは削除してある。
***準備 [#e67606b3]
-Google Places APIのAPIキーを取得しておく。~
https://developers.google.com/places/web-service/get-api-...
***Processorを追加する。 [#h3fc279e]
以下のProcessorを追加する。
-InvokeHTTP~
Google Places APIから車両ロケーション付近のJSON場所データ...
-EvaluateJsonPath~
JSON場所データからneighborhoods_nearbyとcityデータ要素を...
-RouteOnAttribute~
neighborhoods_nearbyとcity属性が空でないフローファイルを...
-AttributesToJSON~
フローファイルから属性を抽出し、JSON形式に変換
-MergeContent~
JSON形式のフローファイルのグループを配列としてマージ
-PutFile~
処理したファイルを出力。
***Connectionで接続する。 [#u87f8225]
[[チュートリアル 1>#f20c0bee]]のRouteOnAttributeから分岐...
[[上記のProcessor>#h3fc279e]]をConnectionで順に繋ぎ、PutF...
※ ConnectionのNameには適当な名称を付与。~
全てのRelationshipsを選択し、ProcessorのTerminateは設定...
また、Selected Prioritizersには、FirstInFirstOutPriorit...
***Processorを設定する。 [#y84b0ad5]
詳細が「[[Step 2>https://github.com/ijokarumawak/hdf-tuto...
-InvokeHTTP
--Remote URL : 以下を設定する。
https://maps.googleapis.com/maps/api/place/nearbysearch/...
--Terminate Relationships : response以外を全てチェック
-EvaluateXPath~
--Destination : flowfile-attribute ※ JSONPath式の結果をフ...
--Return Type : json
--JSONPath式
---city : $.results[0].vicinity ※ [+]ボタンでプロパティ追...
---neighborhoods_nearby : $.results[*].name ※ [+]ボタンで...
-RouteOnAttribute
--RouteNearbyNeighborhoods : 以下の式を記入 ※ [+]ボタンで...
${city:isEmpty():not():and(${neighborhoods_nearby:isEmpt...
--Terminate Relationships : unmatched をチェック~
※ ココだけは必要そうなので。ConnectionのRelationshipsも...
-AttributesToJSON
--Destination : flowfile-content
--Attributes List : Vehicle_ID, city, Latitude, Longitude...
-MergeContent
--Minimum Number of Entries : 10
--Maximum Number of Entries : 15
--Delimiter Strategy : Text
--Header : [
--Footer : ]
--Demarcator : , {press-shift+enter}
>※ 波括弧内は要するに改行コードを意味している。~
NiFiのGUIで改行コードを入力するのにshiftが必要ということ。
--Terminate Relationships : original をチェック
-PutFile
--Directory : ./data-out/nearby_neighborhoods_search
--Terminate Relationships : failure, success をチェック
ココまでの設定が適切であれば、Processorのwarningマークは...
#ref(2-2.png,left,nowrap,手順2-2,60%)
***データフローを実行する。 [#ff18a00c]
-Processorを選択していない状態でOperatorの再生ボタンを押...
-入力フォルダにZIPを放り込むと、上手いこと、出力フォルダ...
**チュートリアル 3 [#o248a2bd]
-「[[NextBus>https://www.nextbus.com/xmlFeedDocs/NextBusX...
https://github.com/ijokarumawak/hdf-tutorials-ja/wiki/Rop...
-概要
--車両位置情報XMLのシミュレーションデータを生成するデータ...
--[[NextBus>https://www.nextbus.com/xmlFeedDocs/NextBusXM...
-リンク切れ
--Lab3-NiFi-Learn-Ropes.xmlテンプレート~
リンク切れだったので演習後のものを以下に添付した。
>&ref(Lab3-NiFi-Learn-Ropes.xml);~
※ Google Places APIのAPIキーは削除してある。
***Processorを追加する。 [#zb9c714c]
以下のProcessorを追加する。
-GetHTTP
--GetFile、UnpackContent、ControlRateプロセッサを削除。
--これらをGetHTTPプロセッサで置き換え、入力を
シミュレーションデータから、NextBusライブストリームに変更。
***Connectionで接続する。 [#wbf604ee]
GetHTTPとEvaluateXPathを接続したデータフローを作成。
※ ConnectionのNameには適当な名称を付与。~
全てのRelationshipsを選択し、ProcessorのTerminateは設定...
また、Selected Prioritizersには、FirstInFirstOutPriorit...
***Processorを設定する。 [#g587eaf7]
以下のProcessorを設定する。
-GetHTTP
--URL : http://webservices.nextbus.com/service/publicXMLF...
--Filename : vehicleLoc_SF_OceanView_${now():format("HHmm...
--Run Schedule : 1 sec
※ APIの仕様が変わったのか、データが取得できなかったので、...
ココまでの設定が適切であれば、Processorのwarningマークは...
#ref(3-2.png,left,nowrap,手順3-2,60%)
***データフローを実行する。 [#p9d02a6f]
-Processorを選択していない状態でOperatorの再生ボタンを押...
-1 sec毎に、[[NextBus>https://www.nextbus.com/xmlFeedDocs...
#ref(3-3.png,left,nowrap,手順3-3,60%)
※ キューが詰まっている部分が赤く表示される。
*追加(自習)コンテンツ [#e59fd742]
**WebAPIを作成してみる。 [#f80dad4f]
-HandleHttpRequest/ResponseのProcessorを使用して、WebAPI...
-HandleHttpRequest/Responseの利用方法については[[コレ>htt...
-テンプレート : 演習後のものを以下に添付した。
>&ref(CreateWebAPI.xml);
***Processorを追加する。 [#f21305c2]
以下のProcessorを追加する。
-HandleHttpRequest~
HTTPクライアントからのHTTPリクエストを受け取る。
-ReplaceText~
HTTPリクエストからHTTPレスポンスで返すボディの文字列を設...
-HandleHttpResponse~
HTTPクライアントにHTTPレスポンスを返す。
-AttributesToJSON~
フローファイルから属性を抽出し、JSON形式に変換(HTTPヘッ...
-MergeContent~
JSON形式のHTTPヘッダとHTTPボディをマージ
-PutFile~
処理したファイルを出力(ココでは、確認のためHTTPリクエス...
-LogAttribute~
ReplaceTextとHandleHttpResponseのfailureをこちらに流す。
***Connectionで接続する。 [#pdfe404b]
以下のように接続する。
#ref(4-1.png,left,nowrap,手順4-1,60%)
※ ConnectionのNameには適当な名称を付与。~
全てのRelationshipsを選択し、ProcessorのTerminateは設定...
また、Selected Prioritizersには、FirstInFirstOutPriorit...
***Processorを設定する。 [#k2b4a7e0]
以下のProcessorを設定する。
-HandleHttpRequest
--Listening Port : 9095
--HTTP Context Map :
---DDLから[Create new service...]を選択する。
---[Add Controller Service]ダイアログで[APPLY]ボタンをク...
---[StandardHttpContextMap]Controller Serviceが追加され[→...
---コレを押下し、[Enable]アイコンをクリックし[StandardHtt...
-ReplaceText
--Replacement Value : {"Result": "OK"}
--Replacement Strategy : Always Replace
-HandleHttpResponse~
--HTTP Status Code : 202
--HTTP Context Map : DDLから、先ほど作成した[StandardHttp...
--Terminate Relationships : success をチェック
-AttributesToJSON
--Destination : flowfile-content
--Attributes List : 空文字列(全属性が対象)
-MergeContent
--Minimum Number of Entries : 1
--Maximum Number of Entries : 2
--Delimiter Strategy : Text
--Header : [
--Footer : ]
--Demarcator : , {press-shift+enter}
>※ 波括弧内は要するに改行コードを意味している。~
NiFiのGUIで改行コードを入力するのにshiftが必要ということ。
-PutFile
--Directory : ./data-out
--Terminate Relationships : failure, success をチェック
-LogAttribute~
--Terminate Relationships : success をチェック
ココまでの設定が適切であれば、Processorのwarningマークは...
#ref(4-2.png,left,nowrap,手順4-2,60%)
※ Controller Serviceとは、[[コチラ>ApacheNiFi#m40f0ef5]]...
StandardHttpContextMapで、Processor間でのHttpContextを...
***データフローを実行する。 [#yc42a9eb]
-[[cURL>https://techinfoofmicrosofttech.osscons.jp/index....
>curl -i -XPOST -H "Content-type: application/json" -d "...
-以下のように結果が返り、生のHTTPリクエストのJSONが./data...
HTTP/1.1 202 Accepted
Date: Mon, 09 Jul 2018 05:05:32 GMT
Transfer-Encoding: chunked
Server: Jetty(9.4.3.v20170317)
{"Result": "OK"}
※ 参考に従いLogAttributeを追加してみたが、~
エラーが起きないのでfailureに流れない。
**Webスクレイピング処理を実装してみる。 [#n7f497dd]
-以下が参考になるが、
--nifi-templates/Web_Scraper_Sample.xml at master · horto...
https://github.com/hortonworks-gallery/nifi-templates/blo...
>大き過ぎるので、「本WikiのFrontPageのタイトルを抽出する...
-テンプレート : 演習後のものを以下に添付した。
>&ref(ScrapingImplementation.xml);~
***準備 [#a8277084]
HTTPSアクセスのため、以下の手順を参考にして、Truststore.j...
-以下のURL(HTTPSアクセスするURL)をChromeのロケーション...
https://dotnetdevelopmentinfrastructure.osscons.jp/
-ロケーション・バーの保護された通信をクリックし、[証明書]...
-表示された[証明書]ダイアログの[詳細]タブで[ファイルにコ...
-既定値でxxxx.cerのようにエクスポートし、
-エクスポートしたファイルに対して、以下のコマンドを実行。
keytool -import -alias dotnetdevelopmentinfrastructure.o...
-- keytoolは、jdkのbinにある。
-- -alias はユニークなら何でもOK.
-- -storepass changeitは標準のパスフレーズ
-- -file, -keystore は適宜変更。-keystoreのフォルダは作成...
この証明書を信頼しますか。 [いいえ]: Y
証明書がキーストアに追加されました
***Processorを追加する。 [#y5e88841]
-GetHTTP~
HTMLコンテンツを取得する。
-GetHTMLElement~
HTMLエレメントを取得する。
-ExtractText~
正規表現で情報を抜く。
-AttributesToJSON~
フローファイルから属性を抽出し、JSON形式に変換
-PutFile~
結果ファイルを出力する。
***Connectionで接続する。 [#g26266bc]
[[上記のProcessor>#y5e88841]]をConnectionで順に繋ぐ。
※ ConnectionのNameには適当な名称を付与。~
全てのRelationshipsを選択し、ProcessorのTerminateは設定...
また、Selected Prioritizersには、FirstInFirstOutPriorit...
***Processorを設定する。 [#d0532ae1]
以下のProcessorを設定する。
-GetHTTP~
--URL : https://dotnetdevelopmentinfrastructure.osscons.j...
--Filename : FrontPage_dnetdevinf${now():format("HHmmssSS...
--SSL Context Service : [[前述の要領>#k2b4a7e0]]で、Contr...
[Add Controller Service]ダイアログでDDLからStandardSSLCon...
以下を設定後、Control ServiceをEnableにする。
---Truststore Filename : .\certificate\Truststore.jks
---Truststore Password : changeit
---Truststore Type : JKS
---TLS Protocol: SSL
--Run Schedule : 1 sec
-GetHTMLElement
--URL : hoge
--CSS Selector : h1.title
--Destination : flowfile-content
--Terminate Relationships : success以外を全てチェック
-ExtractText ※ 正規表現(Regular Expression)の結果をフロ...
--Title : page=FrontPage">(.+?)</a> ※ [+]ボタンでプロパテ...
--Terminate Relationships : unmatched をチェック
--参考 : [[Jmeterの正規表現 - マイクロソフト系技術情報 Wi...
-AttributesToJSON
--Destination : flowfile-content
--Attributes List : 空文字列(全属性が対象)
-PutFile
--Directory : ./data-out
--Terminate Relationships : failure, success をチェック
ココまでの設定が適切であれば、Processorのwarningマークは...
#ref(5-2.png,left,nowrap,手順5-2,60%)
***データフローを実行する。 [#d3ccec97]
以下のような出力を得られる。
{
path:./
filename:FrontPage_dnetdevinf095740401.html
gethttp.remote.source:dotnetdevelopmentinfrastructure....
Title.1:FrontPage
Title.0:page=FrontPage">FrontPage</a>
Title:FrontPage
mime.type:text/html; charset=UTF-8
uuid:53fff8cb-64de-494a-a818-c7ee6295f6c0
}
**RDBMS処理を実装してみる。 [#vbe20c60]
-[[コレ>Apache NiFi - RDBMS系のProcessor#dcf5b75e]]を参考...
-テンプレート : 演習後のものを以下に添付した。
>&ref(ExecuteSQLSample.xml);
***準備 [#v5f709b8]
-RDBMSの準備:ココではSQL ServerのNorthwindを選択。
-JDBCの準備:事前にMicrosoft SQL Server 用 JDBC Driverを...
--JDBC ドライバーの使用 | Microsoft Docs~
https://docs.microsoft.com/ja-jp/sql/connect/jdbc/using-t...
***Processorを追加する。 [#t52d0467]
-GenerateFlowFile~
JSON配列形式のフローファイルを生成
-SplitJson~
JSON配列を分割する。
-EvaluateJsonPath~
JSONのKey-Valueを属性に抽出(パラメタ値)。
-UpdateAttribute~
各フローファイルのファイル名称属性を更新(パラメタ型)
-ExecuteSQL~
属性を使用し、Prepared statementにパラメタライズして実行...
-ConvertAvroToJSON~
結果をJSONに変換する。
-UpdateAttribute~
各フローファイルのファイル名称属性を更新(ファイル名)
-PutFile~
結果をファイルに出力。
***Connectionで接続する。 [#t52d0467]
[[上記のProcessor>#y5e88841]]をConnectionで順に繋ぐ。
※ ConnectionのNameには適当な名称を付与。~
全てのRelationshipsを選択し、ProcessorのTerminateは設定...
また、Selected Prioritizersには、FirstInFirstOutPriorit...
***Processorを設定する。 [#e64bf9d7]
-GenerateFlowFile
--Custom Text :
[
{
"ShipperID" : 1,
"CompanyName" : "",
"Phone" : ""
},
{
"ShipperID" : 2,
"CompanyName" : "",
"Phone" : ""
},
{
"ShipperID" : 3,
"CompanyName" : "",
"Phone" : ""
}
]
-SplitJson
--JsonPath Expression : $
--Terminate Relationships : split以外を全てチェック
-EvaluateJsonPath
--Destination : flowfile-attribute
--sql.args.1.value : $.ShipperID ※ [+]ボタンでプロパティ...
※ sql.args.N.typeのNでPrepared Statementのインデックスを...
--Terminate Relationships : matched以外を全てチェック
-UpdateAttribute
--sql.args.1.type : 4 ※ [+]ボタンでプロパティ追加する。~
※ sql.args.N.typeのNでPrepared Statementのインデックスを...
-ExecuteSQL
--setSQL select query : SELECT * FROM Shippers WHERE Ship...
--Database Connection Pooling Service : [[前述の要領>#k2b...
[Add Controller Service]ダイアログでDDLからDBCPConnection...
以下を設定後、Control ServiceをEnableにする。
---Database Connection URL :~
jdbc:sqlserver://<ipaddress>or<hostname>:<portno>;databas...
e.g. : jdbc:sqlserver://localhost:1433;database=northwind
---Database Driver Class Name : com.microsoft.sqlserver.j...
---Database Driver Location(s) : C:\Program Files (x86)\J...
---Database User : ****
---Password : ****
--Terminate Relationships : failureをチェック
-ConvertAvroToJSON~
--既定値
--Terminate Relationships : failureをチェック
-UpdateAttribute
--filename : ${UUID()} ※ [+]ボタンでプロパティ追加する。
-PutFile
--Directory : ./data-out
--Terminate Relationships : failure, success をチェック
ココまでの設定が適切であれば、Processorのwarningマークは...
#ref(6-2.png,left,nowrap,手順6-2,60%)
***データフローを実行する。 [#l6a6e14f]
以下のような出力を得られる。
{"ShipperID": 1, "CompanyName": "Speedy Express", "Phone...
{"ShipperID": 2, "CompanyName": "United Package", "Phone...
{"ShipperID": 3, "CompanyName": "Federal Shipping", "Pho...
*参考 [#rdf45f1f]
**[[参考資料>Apache NiFi#ce179366]] [#e99bd3ab]
**[[Apache NiFi - Processor]] [#c62e636c]
終了行:
「[[.NET 開発基盤部会 Wiki>http://dotnetdevelopmentinfras...
-[[戻る>Apache NiFi]]
--[[Apache NiFiファースト・ステップ]]
--Apache NiFiセカンド・ステップ
*目次 [#r0303ee6]
#contents
*概要 [#r7ffacea]
-[[ファースト・ステップ>Apache NiFiファースト・ステップ]]...
--インストール
--初歩的な利用方法
--基本的な操作方法
-セカンド・ステップでは、
--[[チュートリアル>#j06a5c12]]
---[[チュートリアル 1: シンプルなNiFiデータフロー構築>#f2...
---[[チュートリアル 2: 地理情報でデータフローをエンリッチ...
---[[チュートリアル 3: NextBusライブストリームの取込>#o24...
--[[追加(自習)コンテンツ>#e59fd742]]
---[[WebAPIを作成してみる。>#f80dad4f]]~
HandleHttpRequest/ResponseのProcessorを使用して、WebAPIを...
---[[Webスクレイピング処理を実装してみる。>#n7f497dd]]~
本WikiのFront Pageのタイトルを抽出する。
---[[RDBMS処理を実装してみる。>#vbe20c60]]~
ExecuteSQLをパラメタライズして実行する。
>などをやってみる。
*チュートリアル [#j06a5c12]
以下を参考に、チュートリアルを遂行。
-Learning the Ropes of Apache NiFi · ijokarumawak/hdf-tut...
https://github.com/ijokarumawak/hdf-tutorials-ja/wiki/Lea...
このチュートリアルは、ある都市計画委員会が新規の高速道路...
リアルタイムデータを活用し、交通パターンのより深い理解を...
**チュートリアル 1 [#f20c0bee]
-「シンプルなNiFiデータフロー構築」をやってみる。~
https://github.com/ijokarumawak/hdf-tutorials-ja/wiki/Rop...
-概要
++車両位置情報XMLのシミュレーションデータを取り込む。
++フローファイルから車両位置詳細属性を抽出する。
++それらの詳細属性が空でない場合、JSONファイルに変換。
-リンク切れ
--Lab1-NiFi-Learn-Ropes.xmlテンプレート~
リンク切れだったので演習後のものを以下に添付した。
>&ref(Lab1-NiFi-Learn-Ropes.xml);
--trafficLocs_data_for_simulator.zip~
出だしのZIPがリンク切れなので、[[ココ>https://community.h...
>&ref(16302-trafficlocs-data-for-simulator.zip);
***Processorを追加する。 [#k488bdb8]
以下のProcessorを追加する。
-GetFile~
処理するファイルを入力。
-UnpackContent~
交通シミュレータZipファイルからフローファイルのコンテンツ...
-ControlRate~
プロセッサに流すフローファイルのフローのレートを制御
-EvaluateXPath~
車両位置情報XMLから最終更新タイムスタンプを属性として抽出
-SplitXML~
親の子要素を複数のフローファイルに分割
-UpdateAttribute~
各フローファイルのファイル名称属性を更新(ファイル名)
-EvaluateXPath~
車両位置情報XMLから車両ID、方向、緯度、軽度、速度を属性と...
-RouteOnAttribute~
各属性がフィルタ条件に一致する場合のみ、フローファイルを...
-AttributesToJSON~
フローファイルから属性を抽出し、JSON形式に変換
-MergeContent~
JSON形式のフローファイルのグループを配列としてマージ
-PutFile~
処理したファイルを出力。
***Connectionで接続する。 [#ib8c3d39]
図がリンク切れだけど、コンテキストから推測するに多分、こ...
#ref(1-1.png,left,nowrap,手順1-1,60%)
※ ConnectionのNameには適当な名称を付与。~
全てのRelationshipsを選択し、ProcessorのTerminateは設定...
また、Selected Prioritizersには、FirstInFirstOutPriorit...
***Processorを設定する。 [#s638be10]
「[[1.1 学習の目的: データフロー作成プロセスの概要>https:...
(詳細は「[[Step 2>https://github.com/ijokarumawak/hdf-tu...
-GetFile
--Input Directory : ./data-in
-UnpackContent
--Packaging Format : zip
-ControlRate
--Rate Control Criteria : flowfile count
--Maximum Rate : 3
--setTime Duration : 10 second
-EvaluateXPath
--Destination : flowfile-attribute ※ XPath式の結果をフロ...
--XPath式
---Last_Time : //body/lastTime/@time ※ [+]ボタンでプロパ...
-SplitXML : 既定値
-UpdateAttribute
--filename : ${UUID()} ※ [+]ボタンでプロパティ追加する。
-EvaluateXPath~
--Destination : flowfile-attribute ※ XPath式の結果をフロ...
--XPath式
---Direction_of_Travel : //vehicle/@dirTag ※ [+]ボタンで...
---Latitude : //vehicle/@lat ※ [+]ボタンでプロパティ追加...
---Longitude : //vehicle/@lon ※ [+]ボタンでプロパティ追加...
---Vehicle_ID : //vehicle/@id ※ [+]ボタンでプロパティ追加...
---Vehicle_Speed : //vehicle/@speedKmHr ※ [+]ボタンでプロ...
-RouteOnAttribute
--Filter_Attributes : 以下の式を記入 ※ [+]ボタンでプロパ...
${Direction_of_Travel:isEmpty():not():and(${Last_Time:is...
--Terminate Relationships : unmatched をチェック~
※ ココだけは必要そうなので。ConnectionのRelationshipsも...
-AttributesToJSON
--Destination : flowfile-content
--Attributes List : Vehicle_ID, Direction_of_Travel, Lati...
-MergeContent
--Minimum Number of Entries : 10
--Maximum Number of Entries : 15
--Delimiter Strategy : Text
--Header : [
--Footer : ]
--Demarcator : , {press-shift+enter}
>※ 波括弧内は要するに改行コードを意味している。~
NiFiのGUIで改行コードを入力するのにshiftが必要ということ。
-PutFile
--Directory : ./data-out/filtered_transitLoc_data
--Terminate Relationships : failure, success をチェック
ココまでの設定が適切であれば、Processorのwarningマークは...
#ref(1-2.png,left,nowrap,手順1-2,60%)
※ RouteOnAttributeだけ、Terminate Relationships : unmatch...
***データフローを実行する。 [#z60ae427]
-Processorを選択していない状態でOperatorの再生ボタンを押...
-入力フォルダにZIPファイルを放り込むと、上手いこと、出力...
-しかし、ファイル名がUUIDのものが出力される。これはマージ...
-MergeContentのTerminate Relationships : original をチェ...
**チュートリアル 2 [#g5d5e40d]
-「地理情報でデータフローをエンリッチメント」をやってみる...
https://github.com/ijokarumawak/hdf-tutorials-ja/wiki/Rop...
-概要
--Google Places APIをNiFiから利用し、
--車両の移動(車両位置情報XMLのシミュレーションデータ)に...
-リンク切れ
--Lab2-NiFi-Learn-Ropes.xmlテンプレート~
リンク切れだったので演習後のものを以下に添付した。
>&ref(Lab2-NiFi-Learn-Ropes.xml);~
※ Google Places APIのAPIキーは削除してある。
***準備 [#e67606b3]
-Google Places APIのAPIキーを取得しておく。~
https://developers.google.com/places/web-service/get-api-...
***Processorを追加する。 [#h3fc279e]
以下のProcessorを追加する。
-InvokeHTTP~
Google Places APIから車両ロケーション付近のJSON場所データ...
-EvaluateJsonPath~
JSON場所データからneighborhoods_nearbyとcityデータ要素を...
-RouteOnAttribute~
neighborhoods_nearbyとcity属性が空でないフローファイルを...
-AttributesToJSON~
フローファイルから属性を抽出し、JSON形式に変換
-MergeContent~
JSON形式のフローファイルのグループを配列としてマージ
-PutFile~
処理したファイルを出力。
***Connectionで接続する。 [#u87f8225]
[[チュートリアル 1>#f20c0bee]]のRouteOnAttributeから分岐...
[[上記のProcessor>#h3fc279e]]をConnectionで順に繋ぎ、PutF...
※ ConnectionのNameには適当な名称を付与。~
全てのRelationshipsを選択し、ProcessorのTerminateは設定...
また、Selected Prioritizersには、FirstInFirstOutPriorit...
***Processorを設定する。 [#y84b0ad5]
詳細が「[[Step 2>https://github.com/ijokarumawak/hdf-tuto...
-InvokeHTTP
--Remote URL : 以下を設定する。
https://maps.googleapis.com/maps/api/place/nearbysearch/...
--Terminate Relationships : response以外を全てチェック
-EvaluateXPath~
--Destination : flowfile-attribute ※ JSONPath式の結果をフ...
--Return Type : json
--JSONPath式
---city : $.results[0].vicinity ※ [+]ボタンでプロパティ追...
---neighborhoods_nearby : $.results[*].name ※ [+]ボタンで...
-RouteOnAttribute
--RouteNearbyNeighborhoods : 以下の式を記入 ※ [+]ボタンで...
${city:isEmpty():not():and(${neighborhoods_nearby:isEmpt...
--Terminate Relationships : unmatched をチェック~
※ ココだけは必要そうなので。ConnectionのRelationshipsも...
-AttributesToJSON
--Destination : flowfile-content
--Attributes List : Vehicle_ID, city, Latitude, Longitude...
-MergeContent
--Minimum Number of Entries : 10
--Maximum Number of Entries : 15
--Delimiter Strategy : Text
--Header : [
--Footer : ]
--Demarcator : , {press-shift+enter}
>※ 波括弧内は要するに改行コードを意味している。~
NiFiのGUIで改行コードを入力するのにshiftが必要ということ。
--Terminate Relationships : original をチェック
-PutFile
--Directory : ./data-out/nearby_neighborhoods_search
--Terminate Relationships : failure, success をチェック
ココまでの設定が適切であれば、Processorのwarningマークは...
#ref(2-2.png,left,nowrap,手順2-2,60%)
***データフローを実行する。 [#ff18a00c]
-Processorを選択していない状態でOperatorの再生ボタンを押...
-入力フォルダにZIPを放り込むと、上手いこと、出力フォルダ...
**チュートリアル 3 [#o248a2bd]
-「[[NextBus>https://www.nextbus.com/xmlFeedDocs/NextBusX...
https://github.com/ijokarumawak/hdf-tutorials-ja/wiki/Rop...
-概要
--車両位置情報XMLのシミュレーションデータを生成するデータ...
--[[NextBus>https://www.nextbus.com/xmlFeedDocs/NextBusXM...
-リンク切れ
--Lab3-NiFi-Learn-Ropes.xmlテンプレート~
リンク切れだったので演習後のものを以下に添付した。
>&ref(Lab3-NiFi-Learn-Ropes.xml);~
※ Google Places APIのAPIキーは削除してある。
***Processorを追加する。 [#zb9c714c]
以下のProcessorを追加する。
-GetHTTP
--GetFile、UnpackContent、ControlRateプロセッサを削除。
--これらをGetHTTPプロセッサで置き換え、入力を
シミュレーションデータから、NextBusライブストリームに変更。
***Connectionで接続する。 [#wbf604ee]
GetHTTPとEvaluateXPathを接続したデータフローを作成。
※ ConnectionのNameには適当な名称を付与。~
全てのRelationshipsを選択し、ProcessorのTerminateは設定...
また、Selected Prioritizersには、FirstInFirstOutPriorit...
***Processorを設定する。 [#g587eaf7]
以下のProcessorを設定する。
-GetHTTP
--URL : http://webservices.nextbus.com/service/publicXMLF...
--Filename : vehicleLoc_SF_OceanView_${now():format("HHmm...
--Run Schedule : 1 sec
※ APIの仕様が変わったのか、データが取得できなかったので、...
ココまでの設定が適切であれば、Processorのwarningマークは...
#ref(3-2.png,left,nowrap,手順3-2,60%)
***データフローを実行する。 [#p9d02a6f]
-Processorを選択していない状態でOperatorの再生ボタンを押...
-1 sec毎に、[[NextBus>https://www.nextbus.com/xmlFeedDocs...
#ref(3-3.png,left,nowrap,手順3-3,60%)
※ キューが詰まっている部分が赤く表示される。
*追加(自習)コンテンツ [#e59fd742]
**WebAPIを作成してみる。 [#f80dad4f]
-HandleHttpRequest/ResponseのProcessorを使用して、WebAPI...
-HandleHttpRequest/Responseの利用方法については[[コレ>htt...
-テンプレート : 演習後のものを以下に添付した。
>&ref(CreateWebAPI.xml);
***Processorを追加する。 [#f21305c2]
以下のProcessorを追加する。
-HandleHttpRequest~
HTTPクライアントからのHTTPリクエストを受け取る。
-ReplaceText~
HTTPリクエストからHTTPレスポンスで返すボディの文字列を設...
-HandleHttpResponse~
HTTPクライアントにHTTPレスポンスを返す。
-AttributesToJSON~
フローファイルから属性を抽出し、JSON形式に変換(HTTPヘッ...
-MergeContent~
JSON形式のHTTPヘッダとHTTPボディをマージ
-PutFile~
処理したファイルを出力(ココでは、確認のためHTTPリクエス...
-LogAttribute~
ReplaceTextとHandleHttpResponseのfailureをこちらに流す。
***Connectionで接続する。 [#pdfe404b]
以下のように接続する。
#ref(4-1.png,left,nowrap,手順4-1,60%)
※ ConnectionのNameには適当な名称を付与。~
全てのRelationshipsを選択し、ProcessorのTerminateは設定...
また、Selected Prioritizersには、FirstInFirstOutPriorit...
***Processorを設定する。 [#k2b4a7e0]
以下のProcessorを設定する。
-HandleHttpRequest
--Listening Port : 9095
--HTTP Context Map :
---DDLから[Create new service...]を選択する。
---[Add Controller Service]ダイアログで[APPLY]ボタンをク...
---[StandardHttpContextMap]Controller Serviceが追加され[→...
---コレを押下し、[Enable]アイコンをクリックし[StandardHtt...
-ReplaceText
--Replacement Value : {"Result": "OK"}
--Replacement Strategy : Always Replace
-HandleHttpResponse~
--HTTP Status Code : 202
--HTTP Context Map : DDLから、先ほど作成した[StandardHttp...
--Terminate Relationships : success をチェック
-AttributesToJSON
--Destination : flowfile-content
--Attributes List : 空文字列(全属性が対象)
-MergeContent
--Minimum Number of Entries : 1
--Maximum Number of Entries : 2
--Delimiter Strategy : Text
--Header : [
--Footer : ]
--Demarcator : , {press-shift+enter}
>※ 波括弧内は要するに改行コードを意味している。~
NiFiのGUIで改行コードを入力するのにshiftが必要ということ。
-PutFile
--Directory : ./data-out
--Terminate Relationships : failure, success をチェック
-LogAttribute~
--Terminate Relationships : success をチェック
ココまでの設定が適切であれば、Processorのwarningマークは...
#ref(4-2.png,left,nowrap,手順4-2,60%)
※ Controller Serviceとは、[[コチラ>ApacheNiFi#m40f0ef5]]...
StandardHttpContextMapで、Processor間でのHttpContextを...
***データフローを実行する。 [#yc42a9eb]
-[[cURL>https://techinfoofmicrosofttech.osscons.jp/index....
>curl -i -XPOST -H "Content-type: application/json" -d "...
-以下のように結果が返り、生のHTTPリクエストのJSONが./data...
HTTP/1.1 202 Accepted
Date: Mon, 09 Jul 2018 05:05:32 GMT
Transfer-Encoding: chunked
Server: Jetty(9.4.3.v20170317)
{"Result": "OK"}
※ 参考に従いLogAttributeを追加してみたが、~
エラーが起きないのでfailureに流れない。
**Webスクレイピング処理を実装してみる。 [#n7f497dd]
-以下が参考になるが、
--nifi-templates/Web_Scraper_Sample.xml at master · horto...
https://github.com/hortonworks-gallery/nifi-templates/blo...
>大き過ぎるので、「本WikiのFrontPageのタイトルを抽出する...
-テンプレート : 演習後のものを以下に添付した。
>&ref(ScrapingImplementation.xml);~
***準備 [#a8277084]
HTTPSアクセスのため、以下の手順を参考にして、Truststore.j...
-以下のURL(HTTPSアクセスするURL)をChromeのロケーション...
https://dotnetdevelopmentinfrastructure.osscons.jp/
-ロケーション・バーの保護された通信をクリックし、[証明書]...
-表示された[証明書]ダイアログの[詳細]タブで[ファイルにコ...
-既定値でxxxx.cerのようにエクスポートし、
-エクスポートしたファイルに対して、以下のコマンドを実行。
keytool -import -alias dotnetdevelopmentinfrastructure.o...
-- keytoolは、jdkのbinにある。
-- -alias はユニークなら何でもOK.
-- -storepass changeitは標準のパスフレーズ
-- -file, -keystore は適宜変更。-keystoreのフォルダは作成...
この証明書を信頼しますか。 [いいえ]: Y
証明書がキーストアに追加されました
***Processorを追加する。 [#y5e88841]
-GetHTTP~
HTMLコンテンツを取得する。
-GetHTMLElement~
HTMLエレメントを取得する。
-ExtractText~
正規表現で情報を抜く。
-AttributesToJSON~
フローファイルから属性を抽出し、JSON形式に変換
-PutFile~
結果ファイルを出力する。
***Connectionで接続する。 [#g26266bc]
[[上記のProcessor>#y5e88841]]をConnectionで順に繋ぐ。
※ ConnectionのNameには適当な名称を付与。~
全てのRelationshipsを選択し、ProcessorのTerminateは設定...
また、Selected Prioritizersには、FirstInFirstOutPriorit...
***Processorを設定する。 [#d0532ae1]
以下のProcessorを設定する。
-GetHTTP~
--URL : https://dotnetdevelopmentinfrastructure.osscons.j...
--Filename : FrontPage_dnetdevinf${now():format("HHmmssSS...
--SSL Context Service : [[前述の要領>#k2b4a7e0]]で、Contr...
[Add Controller Service]ダイアログでDDLからStandardSSLCon...
以下を設定後、Control ServiceをEnableにする。
---Truststore Filename : .\certificate\Truststore.jks
---Truststore Password : changeit
---Truststore Type : JKS
---TLS Protocol: SSL
--Run Schedule : 1 sec
-GetHTMLElement
--URL : hoge
--CSS Selector : h1.title
--Destination : flowfile-content
--Terminate Relationships : success以外を全てチェック
-ExtractText ※ 正規表現(Regular Expression)の結果をフロ...
--Title : page=FrontPage">(.+?)</a> ※ [+]ボタンでプロパテ...
--Terminate Relationships : unmatched をチェック
--参考 : [[Jmeterの正規表現 - マイクロソフト系技術情報 Wi...
-AttributesToJSON
--Destination : flowfile-content
--Attributes List : 空文字列(全属性が対象)
-PutFile
--Directory : ./data-out
--Terminate Relationships : failure, success をチェック
ココまでの設定が適切であれば、Processorのwarningマークは...
#ref(5-2.png,left,nowrap,手順5-2,60%)
***データフローを実行する。 [#d3ccec97]
以下のような出力を得られる。
{
path:./
filename:FrontPage_dnetdevinf095740401.html
gethttp.remote.source:dotnetdevelopmentinfrastructure....
Title.1:FrontPage
Title.0:page=FrontPage">FrontPage</a>
Title:FrontPage
mime.type:text/html; charset=UTF-8
uuid:53fff8cb-64de-494a-a818-c7ee6295f6c0
}
**RDBMS処理を実装してみる。 [#vbe20c60]
-[[コレ>Apache NiFi - RDBMS系のProcessor#dcf5b75e]]を参考...
-テンプレート : 演習後のものを以下に添付した。
>&ref(ExecuteSQLSample.xml);
***準備 [#v5f709b8]
-RDBMSの準備:ココではSQL ServerのNorthwindを選択。
-JDBCの準備:事前にMicrosoft SQL Server 用 JDBC Driverを...
--JDBC ドライバーの使用 | Microsoft Docs~
https://docs.microsoft.com/ja-jp/sql/connect/jdbc/using-t...
***Processorを追加する。 [#t52d0467]
-GenerateFlowFile~
JSON配列形式のフローファイルを生成
-SplitJson~
JSON配列を分割する。
-EvaluateJsonPath~
JSONのKey-Valueを属性に抽出(パラメタ値)。
-UpdateAttribute~
各フローファイルのファイル名称属性を更新(パラメタ型)
-ExecuteSQL~
属性を使用し、Prepared statementにパラメタライズして実行...
-ConvertAvroToJSON~
結果をJSONに変換する。
-UpdateAttribute~
各フローファイルのファイル名称属性を更新(ファイル名)
-PutFile~
結果をファイルに出力。
***Connectionで接続する。 [#t52d0467]
[[上記のProcessor>#y5e88841]]をConnectionで順に繋ぐ。
※ ConnectionのNameには適当な名称を付与。~
全てのRelationshipsを選択し、ProcessorのTerminateは設定...
また、Selected Prioritizersには、FirstInFirstOutPriorit...
***Processorを設定する。 [#e64bf9d7]
-GenerateFlowFile
--Custom Text :
[
{
"ShipperID" : 1,
"CompanyName" : "",
"Phone" : ""
},
{
"ShipperID" : 2,
"CompanyName" : "",
"Phone" : ""
},
{
"ShipperID" : 3,
"CompanyName" : "",
"Phone" : ""
}
]
-SplitJson
--JsonPath Expression : $
--Terminate Relationships : split以外を全てチェック
-EvaluateJsonPath
--Destination : flowfile-attribute
--sql.args.1.value : $.ShipperID ※ [+]ボタンでプロパティ...
※ sql.args.N.typeのNでPrepared Statementのインデックスを...
--Terminate Relationships : matched以外を全てチェック
-UpdateAttribute
--sql.args.1.type : 4 ※ [+]ボタンでプロパティ追加する。~
※ sql.args.N.typeのNでPrepared Statementのインデックスを...
-ExecuteSQL
--setSQL select query : SELECT * FROM Shippers WHERE Ship...
--Database Connection Pooling Service : [[前述の要領>#k2b...
[Add Controller Service]ダイアログでDDLからDBCPConnection...
以下を設定後、Control ServiceをEnableにする。
---Database Connection URL :~
jdbc:sqlserver://<ipaddress>or<hostname>:<portno>;databas...
e.g. : jdbc:sqlserver://localhost:1433;database=northwind
---Database Driver Class Name : com.microsoft.sqlserver.j...
---Database Driver Location(s) : C:\Program Files (x86)\J...
---Database User : ****
---Password : ****
--Terminate Relationships : failureをチェック
-ConvertAvroToJSON~
--既定値
--Terminate Relationships : failureをチェック
-UpdateAttribute
--filename : ${UUID()} ※ [+]ボタンでプロパティ追加する。
-PutFile
--Directory : ./data-out
--Terminate Relationships : failure, success をチェック
ココまでの設定が適切であれば、Processorのwarningマークは...
#ref(6-2.png,left,nowrap,手順6-2,60%)
***データフローを実行する。 [#l6a6e14f]
以下のような出力を得られる。
{"ShipperID": 1, "CompanyName": "Speedy Express", "Phone...
{"ShipperID": 2, "CompanyName": "United Package", "Phone...
{"ShipperID": 3, "CompanyName": "Federal Shipping", "Pho...
*参考 [#rdf45f1f]
**[[参考資料>Apache NiFi#ce179366]] [#e99bd3ab]
**[[Apache NiFi - Processor]] [#c62e636c]
ページ名: