「.NET 開発基盤部会 Wiki」は、「Open棟梁Project」,「OSSコンソーシアム .NET開発基盤部会」によって運営されています。
目次 †
概要 †
セカンド・ステップでは、
- WebAPIを作成してみる。
HandleHttpRequest? / HandleHttpResponse?のProcessorを使用して、WebAPIを作成してみる。
などをやってみる。
チュートリアル †
以下を参考に、チュートリアルを遂行。
Learning the Ropes of Apache NiFi? · ijokarumawak/hdf-tutorials-ja Wiki
https://github.com/ijokarumawak/hdf-tutorials-ja/wiki/Learning-the-Ropes-of-Apache-NiFi
チュートリアル 1 †
- 概要
- 車両位置情報XMLのシミュレーションデータを取り込む。
- フローファイルから車両位置詳細属性を抽出する。
- それらの詳細属性が空でない場合、JSONファイルに変換。
Processorを追加する。 †
以下のProcessorを追加する。
- UnpackContent?
交通シミュレータZipファイルからフローファイルのコンテンツを解凍
- ControlRate?
プロセッサに流すフローファイルのフローのレートを制御
- EvaluateXPath
車両位置情報XMLから最終更新タイムスタンプを属性として抽出
- SplitXML
親の子要素を複数のフローファイルに分割
- UpdateAttribute?
各フローファイルのファイル名称属性を更新
- EvaluateXPath
車両位置情報XMLから車両ID、方向、緯度、軽度、速度を属性として抽出
- RouteOnAttribute?
各属性がフィルタ条件に一致する場合のみ、フローファイルを後続のフローに流す。
- AttributesToJSON
フローファイルから属性を抽出し、JSON形式に変換
- MergeContent?
JSON形式のフローファイルのグループを配列としてマージ
Connectionで接続する。 †
図がリンク切れだけど、コンテキストから推測するに多分、こう。
※ ConnectionのNameには適当な名称を付与。
全てのRelationshipsを選択し、ProcessorのTerminateは設定しなかった。
また、Selected Prioritizersには、FirstInFirstOutPrioritizer?, OldestFlowFileFirstPrioritizer?を設定。
Processorを設定する。 †
「1.1 学習の目的: データフロー作成プロセスの概要」を参考に設定。
(詳細は「Step 2」以降に書いてあるので、必要に応じてソレを参考にする)
- GetFile?
- Input Directory : ./data-in
- ControlRate?
- Rate Control Criteria : flowfile count
- Maximum Rate : 3
- setTime Duration : 10 second
- EvaluateXPath
- Destination : flowfile-attribute ※ XPath式の結果をフローファイルの属性に保存する
- XPath式
- Last_Time : //body/lastTime/@time ※ [+]ボタンでプロパティ追加する。
- UpdateAttribute?
- filename : ${UUID()} ※ [+]ボタンでプロパティ追加する。
- EvaluateXPath
- Destination : flowfile-attribute ※ XPath式の結果をフローファイルの属性に保存する
- XPath式
- Direction_of_Travel : //vehicle/@dirTag ※ [+]ボタンでプロパティ追加する。
- Latitude : //vehicle/@lat ※ [+]ボタンでプロパティ追加する。
- Longitude : //vehicle/@lon ※ [+]ボタンでプロパティ追加する。
- Vehicle_ID : //vehicle/@id ※ [+]ボタンでプロパティ追加する。
- Vehicle_Speed : //vehicle/@speedKmHr? ※ [+]ボタンでプロパティ追加する。
- AttributesToJSON
- Destination : flowfile-content
- Attributes List : Vehicle_ID, Direction_of_Travel, Latitude, Longitude, Vehicle_Speed, Last_Time
- MergeContent?
- Minimum Number of Entries : 10
- Maximum Number of Entries : 15
- Delimiter Strategy : Text
- Header : [
- Footer : ]
- Demarcator : , {press-shift+enter}
※ 波括弧内は要するに改行コードを意味している。
NiFi?のGUIで改行コードを入力するのにshiftが必要ということ。
- PutFile?
- Directory : ./data-out/filtered_transitLoc_data
- Terminate Relationships : failure, success をチェック
ココまでの設定が適切であれば、Processorのwarningマークはすべて取れているハズ。
※ RouteOnAttribute?だけ、Terminate Relationships : unmatched をチェックした。
データフローを実行する。 †
- Processorを選択していない状態でOperatorの再生ボタンを押下する。
- 入力フォルダにZIPファイルを放り込むと、上手いこと、出力フォルダにJSONが出力される。
- しかし、ファイル名がUUIDのものが出力される。これはマージ後のフローファイルもフローに流れているためと解る。
- MergeContent?のTerminate Relationships : original をチェックしてテストして上手く動作することを確認する。
チュートリアル 2 †
- 概要
- Google Places APIをNiFi?から利用し、
- 車両の移動(車両位置情報XMLのシミュレーションデータ)に応じて周辺情報を表示。
準備 †
Processorを追加する。 †
以下のProcessorを追加する。
- InvokeHTTP
Google Places APIから車両ロケーション付近のJSON場所データを取得。
- EvaluateJsonPath?
JSON場所データからneighborhoods_nearbyとcityデータ要素を抽出して属性に設定。
- RouteOnAttribute?
neighborhoods_nearbyとcity属性が空でないフローファイルをルーティング。
- AttributesToJSON
フローファイルから属性を抽出し、JSON形式に変換
- MergeContent?
JSON形式のフローファイルのグループを配列としてマージ
Connectionで接続する。 †
チュートリアル 1のRouteOnAttribute?から分岐して、合流せずに、
上記のProcessorをConnectionで順に繋ぎ、PutFile?で終わるデータフローを作成。
※ ConnectionのNameには適当な名称を付与。
全てのRelationshipsを選択し、ProcessorのTerminateは設定しなかった。
また、Selected Prioritizersには、FirstInFirstOutPrioritizer?, OldestFlowFileFirstPrioritizer?を設定。
Processorを設定する。 †
詳細が「Step 2」以降に書いてあるので、必要に応じてソレを参考にする。
- EvaluateXPath
- Destination : flowfile-attribute ※ JSONPath式の結果をフローファイルの属性に保存する
- Return Type : json
- JSONPath式
- city : $.results[0].vicinity ※ [+]ボタンでプロパティ追加する。
- neighborhoods_nearby : $.results[*].name ※ [+]ボタンでプロパティ追加する。
- AttributesToJSON
- Destination : flowfile-content
- Attributes List : Vehicle_ID, city, Latitude, Longitude, neighborhoods_nearby, Last_Time
- MergeContent?
- Minimum Number of Entries : 10
- Maximum Number of Entries : 15
- Delimiter Strategy : Text
- Header : [
- Footer : ]
- Demarcator : , {press-shift+enter}
※ 波括弧内は要するに改行コードを意味している。
NiFi?のGUIで改行コードを入力するのにshiftが必要ということ。
- Terminate Relationships : original をチェック
- PutFile?
- Directory : ./data-out/nearby_neighborhoods_search
- Terminate Relationships : failure, success をチェック
ココまでの設定が適切であれば、Processorのwarningマークはすべて取れているハズ。
データフローを実行する。 †
- Processorを選択していない状態でOperatorの再生ボタンを押下する。
- 入力フォルダにZIPを放り込むと、上手いこと、出力フォルダにJSONが出力される。
チュートリアル 3 †
- 概要
- 車両位置情報XMLのシミュレーションデータを生成するデータフローセクションを、
- NextBusのライブストリームデータを取込セクションで置換。
Processorを追加する。 †
以下のProcessorを追加する。
- GetHTTP
- GetFile?、UnpackContent?、ControlRate?プロセッサを削除。
- これらをGetHTTPプロセッサで置き換え、入力を
シミュレーションデータから、NextBus?ライブストリームに変更。
Connectionで接続する。 †
GetHTTPとEvaluateXPathを接続したデータフローを作成。
※ ConnectionのNameには適当な名称を付与。
全てのRelationshipsを選択し、ProcessorのTerminateは設定しなかった。
また、Selected Prioritizersには、FirstInFirstOutPrioritizer?, OldestFlowFileFirstPrioritizer?を設定。
Processorを設定する。 †
以下のProcessorを設定する。
※ APIの仕様が変わったのか、データが取得できなかったので、パラメタの"r=M"を削除した。
ココまでの設定が適切であれば、Processorのwarningマークはすべて取れているハズ。
データフローを実行する。 †
- Processorを選択していない状態でOperatorの再生ボタンを押下する。
- 1 sec毎に、NextBusのHTTPレスポンスから入力され、出力フォルダにJSONが出力される。
※ キューが詰まっている部分が赤く表示される。
追加(自習)コンテンツ †
WebAPIを作成してみる。 †
- HandleHttpRequest? / HandleHttpResponse?のProcessorを使用して、WebAPIを作成してみる。
- HandleHttpRequest? / HandleHttpResponse?の利用方法についてはコレが参考になる。
Processorを追加する。 †
以下のProcessorを追加する。
- HandleHttpRequest?
HTTPクライアントからのHTTPリクエストを受け取る。
- ReplaceText?
HTTPリクエストからHTTPレスポンスで返すボディの文字列を設定。
- HandleHttpResponse?
HTTPクライアントにHTTPレスポンスを返す。
- AttributesToJSON
フローファイルから属性を抽出し、JSON形式に変換(HTTPヘッダ)
- MergeContent?
JSON形式のHTTPヘッダとHTTPボディをマージ
- PutFile?
処理したファイルを出力(ココでは、確認のためHTTPリクエストを出力する)。
- LogAttribute?
ReplaceText?とHandleHttpResponse?のfailureをこちらに流す。
Connectionで接続する。 †
以下のように接続する。
※ ConnectionのNameには適当な名称を付与。
全てのRelationshipsを選択し、ProcessorのTerminateは設定しなかった。
また、Selected Prioritizersには、FirstInFirstOutPrioritizer?, OldestFlowFileFirstPrioritizer?を設定。
Processorを設定する。 †
以下のProcessorを設定する。
- HandleHttpRequest?
- Listening Port : 9095
- HTTP Context Map :
- DDLから[Create new service...]を選択する。
- [Add Controller Service]ダイアログで[APPLY]ボタンをクリック。
- [StandardHttpContextMap?]Controller Serviceが追加され[→]が表示される。
- コレを押下し、[Enable]アイコンをクリックし[StandardHttpContextMap?]を有効化。
- ReplaceText?
- Replacement Value : {"Result": "OK"}
- Replacement Strategy : Always Replace
- HandleHttpResponse?
- HTTP Status Code : 202
- HTTP Context Map :
DDLから、先ほど作成した[StandardHttpContextMap?]を選択する。
- Terminate Relationships : success をチェック
- AttributesToJSON
- Destination : flowfile-content
- Attributes List : 空文字列(全属性が対象)
- MergeContent?
- Minimum Number of Entries : 1
- Maximum Number of Entries : 2
- Delimiter Strategy : Text
- Header : [
- Footer : ]
- Demarcator : , {press-shift+enter}
※ 波括弧内は要するに改行コードを意味している。
NiFi?のGUIで改行コードを入力するのにshiftが必要ということ。
- PutFile?
- Directory : ./data-out
- Terminate Relationships : failure, success をチェック
- LogAttribute?
- Terminate Relationships : success をチェック
ココまでの設定が適切であれば、Processorのwarningマークはすべて取れているハズ。
※ Controller Serviceとは、コチラ?を参照。
StandardHttpContextMap?で、Processor間でのHttpContext?を共有する。
データフローを実行する。 †
※ HTTPヘッダは属性から取得可能なもよう。
スクレイピング処理を実装してみる。 †
大き過ぎるので、「本WikiのFrontPageのタイトルを抽出する」小さめのサンプルとしてみる。
準備 †
HTTPSアクセスのため、以下の手順を参考にして、Truststore.jksファイルを作成する。
- ロケーション・バーの保護された通信をクリックし、[証明書]を選択。
- 表示された[証明書]ダイアログの[詳細]タブで[ファイルにコピー]を選択。
Processorを追加する。 †
- GetHTMLElement
HTMLエレメントを取得する。
- AttributesToJSON
フローファイルから属性を抽出し、JSON形式に変換
Connectionで接続する。 †
上記のProcessorをConnectionで順に繋ぐ。
※ ConnectionのNameには適当な名称を付与。
全てのRelationshipsを選択し、ProcessorのTerminateは設定しなかった。
また、Selected Prioritizersには、FirstInFirstOutPrioritizer?, OldestFlowFileFirstPrioritizer?を設定。
Processorを設定する。 †
以下のProcessorを設定する。
- GetHTMLElement
- URL : hoge
- CSS Selector : h1.title
- Destination : flowfile-content
- Terminate Relationships : success以外を全てチェック
- ExtractText? ※ 正規表現(Regular Expression)の結果をフローファイルの属性に保存する
- AttributesToJSON
- Destination : flowfile-content
- Attributes List : 空文字列(全属性が対象)
- PutFile?
- Directory : ./data-out
- Terminate Relationships : failure, success をチェック
ココまでの設定が適切であれば、Processorのwarningマークはすべて取れているハズ。
データフローを実行する。 †
以下のような出力を得られる。
{
path:./
filename:FrontPage_dnetdevinf095740401.html
gethttp.remote.source:dotnetdevelopmentinfrastructure.osscons.jp
Title.1:FrontPage
Title.0:page=FrontPage">FrontPage</a>
Title:FrontPage
mime.type:text/html; charset=UTF-8
uuid:53fff8cb-64de-494a-a818-c7ee6295f6c0
}
DBMS処理を実装してみる。 †
以下を参考に、遂行。
Processorを追加する。 †
Connectionで接続する。 †
Processorを設定する。 †
データフローを実行する。 †
参考 †