「.NET 開発基盤部会 Wiki」は、「Open棟梁Project」,「OSSコンソーシアム .NET開発基盤部会」によって運営されています。
目次 †
概要 †
などをやってみる。
チュートリアル †
以下を参考に、チュートリアルを遂行。
このチュートリアルは、ある都市計画委員会が新規の高速道路の建造の評価のために、
リアルタイムデータを活用し、交通パターンのより深い理解を得るというシナリオ
チュートリアル 1 †
- 概要
- 車両位置情報XMLのシミュレーションデータを取り込む。
- フローファイルから車両位置詳細属性を抽出する。
- それらの詳細属性が空でない場合、JSONファイルに変換。
Processorを追加する。 †
以下のProcessorを追加する。
- UnpackContent?
交通シミュレータZipファイルからフローファイルのコンテンツを解凍
- ControlRate?
プロセッサに流すフローファイルのフローのレートを制御
- EvaluateXPath
車両位置情報XMLから最終更新タイムスタンプを属性として抽出
- SplitXML
親の子要素を複数のフローファイルに分割
- UpdateAttribute?
各フローファイルのファイル名称属性を更新(ファイル名)
- EvaluateXPath
車両位置情報XMLから車両ID、方向、緯度、軽度、速度を属性として抽出
- RouteOnAttribute?
各属性がフィルタ条件に一致する場合のみ、フローファイルを後続のフローに流す。
- AttributesToJSON
フローファイルから属性を抽出し、JSON形式に変換
- MergeContent?
JSON形式のフローファイルのグループを配列としてマージ
Connectionで接続する。 †
図がリンク切れだけど、コンテキストから推測するに多分、こう。
※ ConnectionのNameには適当な名称を付与。
全てのRelationshipsを選択し、ProcessorのTerminateは設定しなかった。
また、Selected Prioritizersには、FirstInFirstOutPrioritizer?, OldestFlowFileFirstPrioritizer?を設定。
Processorを設定する。 †
「1.1 学習の目的: データフロー作成プロセスの概要」を参考に設定。
(詳細は「Step 2」以降に書いてあるので、必要に応じてソレを参考にする)
- GetFile?
- Input Directory : ./data-in
- ControlRate?
- Rate Control Criteria : flowfile count
- Maximum Rate : 3
- setTime Duration : 10 second
- EvaluateXPath
- Destination : flowfile-attribute ※ XPath式の結果をフローファイルの属性に保存する
- XPath式
- Last_Time : //body/lastTime/@time ※ [+]ボタンでプロパティ追加する。
- UpdateAttribute?
- filename : ${UUID()} ※ [+]ボタンでプロパティ追加する。
- EvaluateXPath
- Destination : flowfile-attribute ※ XPath式の結果をフローファイルの属性に保存する
- XPath式
- Direction_of_Travel : //vehicle/@dirTag ※ [+]ボタンでプロパティ追加する。
- Latitude : //vehicle/@lat ※ [+]ボタンでプロパティ追加する。
- Longitude : //vehicle/@lon ※ [+]ボタンでプロパティ追加する。
- Vehicle_ID : //vehicle/@id ※ [+]ボタンでプロパティ追加する。
- Vehicle_Speed : //vehicle/@speedKmHr? ※ [+]ボタンでプロパティ追加する。
- AttributesToJSON
- Destination : flowfile-content
- Attributes List : Vehicle_ID, Direction_of_Travel, Latitude, Longitude, Vehicle_Speed, Last_Time
- MergeContent?
- Minimum Number of Entries : 10
- Maximum Number of Entries : 15
- Delimiter Strategy : Text
- Header : [
- Footer : ]
- Demarcator : , {press-shift+enter}
※ 波括弧内は要するに改行コードを意味している。
NiFi?のGUIで改行コードを入力するのにshiftが必要ということ。
- PutFile?
- Directory : ./data-out/filtered_transitLoc_data
- Terminate Relationships : failure, success をチェック
ココまでの設定が適切であれば、Processorのwarningマークはすべて取れているハズ。
※ RouteOnAttribute?だけ、Terminate Relationships : unmatched をチェックした。
データフローを実行する。 †
- Processorを選択していない状態でOperatorの再生ボタンを押下する。
- 入力フォルダにZIPファイルを放り込むと、上手いこと、出力フォルダにJSONが出力される。
- しかし、ファイル名がUUIDのものが出力される。これはマージ後のフローファイルもフローに流れているためと解る。
- MergeContent?のTerminate Relationships : original をチェックしてテストして上手く動作することを確認する。
チュートリアル 2 †
- 概要
- Google Places APIをNiFi?から利用し、
- 車両の移動(車両位置情報XMLのシミュレーションデータ)に応じて周辺情報を表示。
準備 †
Processorを追加する。 †
以下のProcessorを追加する。
- InvokeHTTP
Google Places APIから車両ロケーション付近のJSON場所データを取得。
- EvaluateJsonPath?
JSON場所データからneighborhoods_nearbyとcityデータ要素を抽出して属性に設定。
- RouteOnAttribute?
neighborhoods_nearbyとcity属性が空でないフローファイルをルーティング。
- AttributesToJSON
フローファイルから属性を抽出し、JSON形式に変換
- MergeContent?
JSON形式のフローファイルのグループを配列としてマージ
Connectionで接続する。 †
チュートリアル 1のRouteOnAttribute?から分岐して、合流せずに、
上記のProcessorをConnectionで順に繋ぎ、PutFile?で終わるデータフローを作成。
※ ConnectionのNameには適当な名称を付与。
全てのRelationshipsを選択し、ProcessorのTerminateは設定しなかった。
また、Selected Prioritizersには、FirstInFirstOutPrioritizer?, OldestFlowFileFirstPrioritizer?を設定。
Processorを設定する。 †
詳細が「Step 2」以降に書いてあるので、必要に応じてソレを参考にする。
- EvaluateXPath
- Destination : flowfile-attribute ※ JSONPath式の結果をフローファイルの属性に保存する
- Return Type : json
- JSONPath式
- city : $.results[0].vicinity ※ [+]ボタンでプロパティ追加する。
- neighborhoods_nearby : $.results[*].name ※ [+]ボタンでプロパティ追加する。
- AttributesToJSON
- Destination : flowfile-content
- Attributes List : Vehicle_ID, city, Latitude, Longitude, neighborhoods_nearby, Last_Time
- MergeContent?
- Minimum Number of Entries : 10
- Maximum Number of Entries : 15
- Delimiter Strategy : Text
- Header : [
- Footer : ]
- Demarcator : , {press-shift+enter}
※ 波括弧内は要するに改行コードを意味している。
NiFi?のGUIで改行コードを入力するのにshiftが必要ということ。
- Terminate Relationships : original をチェック
- PutFile?
- Directory : ./data-out/nearby_neighborhoods_search
- Terminate Relationships : failure, success をチェック
ココまでの設定が適切であれば、Processorのwarningマークはすべて取れているハズ。
データフローを実行する。 †
- Processorを選択していない状態でOperatorの再生ボタンを押下する。
- 入力フォルダにZIPを放り込むと、上手いこと、出力フォルダにJSONが出力される。
チュートリアル 3 †
- 概要
- 車両位置情報XMLのシミュレーションデータを生成するデータフローセクションを、
- NextBusのライブストリームデータを取込セクションで置換。
Processorを追加する。 †
以下のProcessorを追加する。
- GetHTTP
- GetFile?、UnpackContent?、ControlRate?プロセッサを削除。
- これらをGetHTTPプロセッサで置き換え、入力を
シミュレーションデータから、NextBus?ライブストリームに変更。
Connectionで接続する。 †
GetHTTPとEvaluateXPathを接続したデータフローを作成。
※ ConnectionのNameには適当な名称を付与。
全てのRelationshipsを選択し、ProcessorのTerminateは設定しなかった。
また、Selected Prioritizersには、FirstInFirstOutPrioritizer?, OldestFlowFileFirstPrioritizer?を設定。
Processorを設定する。 †
以下のProcessorを設定する。
※ APIの仕様が変わったのか、データが取得できなかったので、パラメタの"r=M"を削除した。
ココまでの設定が適切であれば、Processorのwarningマークはすべて取れているハズ。
データフローを実行する。 †
- Processorを選択していない状態でOperatorの再生ボタンを押下する。
- 1 sec毎に、NextBusのHTTPレスポンスから入力され、出力フォルダにJSONが出力される。
※ キューが詰まっている部分が赤く表示される。
追加(自習)コンテンツ †
WebAPIを作成してみる。 †
- HandleHttpRequest?/ResponseのProcessorを使用して、WebAPIを作成する。
- HandleHttpRequest?/Responseの利用方法についてはコレが参考になる。
Processorを追加する。 †
以下のProcessorを追加する。
- HandleHttpRequest?
HTTPクライアントからのHTTPリクエストを受け取る。
- ReplaceText?
HTTPリクエストからHTTPレスポンスで返すボディの文字列を設定。
- HandleHttpResponse?
HTTPクライアントにHTTPレスポンスを返す。
- AttributesToJSON
フローファイルから属性を抽出し、JSON形式に変換(HTTPヘッダ)
- MergeContent?
JSON形式のHTTPヘッダとHTTPボディをマージ
- PutFile?
処理したファイルを出力(ココでは、確認のためHTTPリクエストを出力する)。
- LogAttribute?
ReplaceText?とHandleHttpResponse?のfailureをこちらに流す。
Connectionで接続する。 †
以下のように接続する。
※ ConnectionのNameには適当な名称を付与。
全てのRelationshipsを選択し、ProcessorのTerminateは設定しなかった。
また、Selected Prioritizersには、FirstInFirstOutPrioritizer?, OldestFlowFileFirstPrioritizer?を設定。
Processorを設定する。 †
以下のProcessorを設定する。
- HandleHttpRequest?
- Listening Port : 9095
- HTTP Context Map :
- DDLから[Create new service...]を選択する。
- [Add Controller Service]ダイアログで[APPLY]ボタンをクリック。
- [StandardHttpContextMap?]Controller Serviceが追加され[→]が表示される。
- コレを押下し、[Enable]アイコンをクリックし[StandardHttpContextMap?]を有効化。
- ReplaceText?
- Replacement Value : {"Result": "OK"}
- Replacement Strategy : Always Replace
- HandleHttpResponse?
- HTTP Status Code : 202
- HTTP Context Map : DDLから、先ほど作成した[StandardHttpContextMap?]を選択する。
- Terminate Relationships : success をチェック
- AttributesToJSON
- Destination : flowfile-content
- Attributes List : 空文字列(全属性が対象)
- MergeContent?
- Minimum Number of Entries : 1
- Maximum Number of Entries : 2
- Delimiter Strategy : Text
- Header : [
- Footer : ]
- Demarcator : , {press-shift+enter}
※ 波括弧内は要するに改行コードを意味している。
NiFi?のGUIで改行コードを入力するのにshiftが必要ということ。
- PutFile?
- Directory : ./data-out
- Terminate Relationships : failure, success をチェック
- LogAttribute?
- Terminate Relationships : success をチェック
ココまでの設定が適切であれば、Processorのwarningマークはすべて取れているハズ。
※ Controller Serviceとは、コチラ?を参照。
StandardHttpContextMap?で、Processor間でのHttpContext?を共有する。
データフローを実行する。 †
※ 参考に従いLogAttribute?を追加してみたが、
エラーが起きないのでfailureに流れない。
Webスクレイピング処理を実装してみる。 †
大き過ぎるので、「本WikiのFrontPageのタイトルを抽出する」小さめのサンプルとしてみる。
準備 †
HTTPSアクセスのため、以下の手順を参考にして、Truststore.jksファイルを作成する。
- ロケーション・バーの保護された通信をクリックし、[証明書]を選択。
- 表示された[証明書]ダイアログの[詳細]タブで[ファイルにコピー]を選択。
Processorを追加する。 †
- GetHTMLElement
HTMLエレメントを取得する。
- AttributesToJSON
フローファイルから属性を抽出し、JSON形式に変換
Connectionで接続する。 †
上記のProcessorをConnectionで順に繋ぐ。
※ ConnectionのNameには適当な名称を付与。
全てのRelationshipsを選択し、ProcessorのTerminateは設定しなかった。
また、Selected Prioritizersには、FirstInFirstOutPrioritizer?, OldestFlowFileFirstPrioritizer?を設定。
Processorを設定する。 †
以下のProcessorを設定する。
- GetHTTP
- URL : https://dotnetdevelopmentinfrastructure.osscons.jp/index.php?FrontPage
- Filename : FrontPage_dnetdevinf${now():format("HHmmssSSS")}.html
- SSL Context Service : 前述の要領で、Control Serviceを追加する。
[Add Controller Service]ダイアログでDDLからStandardSSLContextService?を選択し、
以下を設定後、Control ServiceをEnableにする。
- Truststore Filename : .\certificate\Truststore.jks
- Truststore Password : changeit
- Truststore Type : JKS
- TLS Protocol: SSL
- GetHTMLElement
- URL : hoge
- CSS Selector : h1.title
- Destination : flowfile-content
- Terminate Relationships : success以外を全てチェック
- ExtractText? ※ 正規表現(Regular Expression)の結果をフローファイルの属性に保存する
- AttributesToJSON
- Destination : flowfile-content
- Attributes List : 空文字列(全属性が対象)
- PutFile?
- Directory : ./data-out
- Terminate Relationships : failure, success をチェック
ココまでの設定が適切であれば、Processorのwarningマークはすべて取れているハズ。
データフローを実行する。 †
以下のような出力を得られる。
{
path:./
filename:FrontPage_dnetdevinf095740401.html
gethttp.remote.source:dotnetdevelopmentinfrastructure.osscons.jp
Title.1:FrontPage
Title.0:page=FrontPage">FrontPage</a>
Title:FrontPage
mime.type:text/html; charset=UTF-8
uuid:53fff8cb-64de-494a-a818-c7ee6295f6c0
}
RDBMS処理を実装してみる。 †
準備 †
- RDBMSの準備:ココではSQL ServerのNorthwindを選択。
- JDBCの準備:事前にMicrosoft SQL Server 用 JDBC Driverをインストールしておく。
Processorを追加する。 †
- GenerateFlowFile?
JSON配列形式のフローファイルを生成
- EvaluateJsonPath?
JSONのKey-Valueを属性に抽出(パラメタ値)。
- UpdateAttribute?
各フローファイルのファイル名称属性を更新(パラメタ型)
- ExecuteSQL
属性を使用し、Prepared statementにパラメタライズして実行する。
- ConvertAvroToJSON
結果をJSONに変換する。
- UpdateAttribute?
各フローファイルのファイル名称属性を更新(ファイル名)
Connectionで接続する。 †
上記のProcessorをConnectionで順に繋ぐ。
※ ConnectionのNameには適当な名称を付与。
全てのRelationshipsを選択し、ProcessorのTerminateは設定しなかった。
また、Selected Prioritizersには、FirstInFirstOutPrioritizer?, OldestFlowFileFirstPrioritizer?を設定。
Processorを設定する。 †
- GenerateFlowFile?
- Custom Text :
[
{
"ShipperID" : 1,
"CompanyName" : "",
"Phone" : ""
},
{
"ShipperID" : 2,
"CompanyName" : "",
"Phone" : ""
},
{
"ShipperID" : 3,
"CompanyName" : "",
"Phone" : ""
}
]
- SplitJson?
- JsonPath? Expression : $
- Terminate Relationships : split以外を全てチェック
- EvaluateJsonPath?
- Destination : flowfile-attribute
- sql.args.1.value : $.ShipperID ※ [+]ボタンでプロパティ追加する。
※ sql.args.N.typeのNでPrepared Statementのインデックスを指定
- Terminate Relationships : matched以外を全てチェック
- UpdateAttribute?
- sql.args.1.type : 4 ※ [+]ボタンでプロパティ追加する。
※ sql.args.N.typeのNでPrepared Statementのインデックスを指定
- ExecuteSQL
- setSQL select query : SELECT * FROM Shippers WHERE ShipperID = ?
- Database Connection Pooling Service : 前述の要領で、Control Serviceを追加する。
[Add Controller Service]ダイアログでDDLからDBCPConnectionPool?を選択し、
以下を設定後、Control ServiceをEnableにする。
- Database Connection URL :
jdbc:sqlserver://<ipaddress>or<hostname>:<portno>;database=<databasename>
e.g. : jdbc:sqlserver://localhost:1433;database=northwind
- Database Driver Class Name : com.microsoft.sqlserver.jdbc.SQLServerDriver?
- Database Driver Location(s) : C:\Program Files (x86)\Java\jdbc\sqljdbc_6.0\jpn\jre8\sqljdbc42.jar
- Database User : ****
- Password : ****
- Terminate Relationships : failureをチェック
- ConvertAvroToJSON
- 既定値
- Terminate Relationships : failureをチェック
- UpdateAttribute?
- filename : ${UUID()} ※ [+]ボタンでプロパティ追加する。
- PutFile?
- Directory : ./data-out
- Terminate Relationships : failure, success をチェック
ココまでの設定が適切であれば、Processorのwarningマークはすべて取れているハズ。
データフローを実行する。 †
以下のような出力を得られる。
{"ShipperID": 1, "CompanyName": "Speedy Express", "Phone": "(503) 555-9831"}
{"ShipperID": 2, "CompanyName": "United Package", "Phone": "(503) 555-3199"}
{"ShipperID": 3, "CompanyName": "Federal Shipping", "Phone": "(503) 555-9931"}
参考 †