.NET 開発基盤部会 Wiki」は、「Open棟梁Project」,「OSSコンソーシアム .NET開発基盤部会」によって運営されています。

目次

概要

  • セカンド・ステップでは、

などをやってみる。

チュートリアル

以下を参考に、チュートリアルを遂行。

このチュートリアルは、ある都市計画委員会が新規の高速道路の建造の評価のために、
リアルタイムデータを活用し、交通パターンのより深い理解を得るというシナリオ

チュートリアル 1

  • 概要
    1. 車両位置情報XMLのシミュレーションデータを取り込む。
    2. フローファイルから車両位置詳細属性を抽出する。
    3. それらの詳細属性が空でない場合、JSONファイルに変換。
  • リンク切れ
    • Lab1-NiFi?-Learn-Ropes.xmlテンプレート
      リンク切れだったので演習後のものを以下に添付した。

      fileLab1-NiFi-Learn-Ropes.xml

Processorを追加する。

以下のProcessorを追加する。

  • GetFile?
    処理するファイルを入力。
  • UnpackContent?
    交通シミュレータZipファイルからフローファイルのコンテンツを解凍
  • ControlRate?
    プロセッサに流すフローファイルのフローのレートを制御
  • EvaluateXPath
    車両位置情報XMLから最終更新タイムスタンプを属性として抽出
  • SplitXML
    親の子要素を複数のフローファイルに分割
  • UpdateAttribute?
    各フローファイルのファイル名称属性を更新(ファイル名)
  • EvaluateXPath
    車両位置情報XMLから車両ID、方向、緯度、軽度、速度を属性として抽出
  • RouteOnAttribute?
    各属性がフィルタ条件に一致する場合のみ、フローファイルを後続のフローに流す。
  • AttributesToJSON
    フローファイルから属性を抽出し、JSON形式に変換
  • MergeContent?
    JSON形式のフローファイルのグループを配列としてマージ
  • PutFile?
    処理したファイルを出力。

Connectionで接続する。

図がリンク切れだけど、コンテキストから推測するに多分、こう。

手順1-1

※ ConnectionのNameには適当な名称を付与。
 全てのRelationshipsを選択し、ProcessorのTerminateは設定しなかった。
 また、Selected Prioritizersには、FirstInFirstOutPrioritizer?, OldestFlowFileFirstPrioritizer?を設定。
 

Processorを設定する。

1.1 学習の目的: データフロー作成プロセスの概要」を参考に設定。
(詳細は「Step 2」以降に書いてあるので、必要に応じてソレを参考にする)

  • GetFile?
    • Input Directory : ./data-in
  • UnpackContent?
    • Packaging Format : zip
  • ControlRate?
    • Rate Control Criteria : flowfile count
    • Maximum Rate : 3
    • setTime Duration : 10 second
  • EvaluateXPath
    • Destination : flowfile-attribute ※ XPath式の結果をフローファイルの属性に保存する
    • XPath式
      • Last_Time : //body/lastTime/@time ※ [+]ボタンでプロパティ追加する。
  • SplitXML : 既定値
  • UpdateAttribute?
    • filename : ${UUID()} ※ [+]ボタンでプロパティ追加する。
  • EvaluateXPath
    • Destination : flowfile-attribute ※ XPath式の結果をフローファイルの属性に保存する
    • XPath式
      • Direction_of_Travel : //vehicle/@dirTag ※ [+]ボタンでプロパティ追加する。
      • Latitude : //vehicle/@lat ※ [+]ボタンでプロパティ追加する。
      • Longitude : //vehicle/@lon ※ [+]ボタンでプロパティ追加する。
      • Vehicle_ID : //vehicle/@id ※ [+]ボタンでプロパティ追加する。
      • Vehicle_Speed : //vehicle/@speedKmHr? ※ [+]ボタンでプロパティ追加する。
  • RouteOnAttribute?
    • Filter_Attributes : 以下の式を記入 ※ [+]ボタンでプロパティ追加する。
      ${Direction_of_Travel:isEmpty():not():and(${Last_Time:isEmpty():not()}):and(${Latitude:isEmpty():not()}):and(${Longitude:isEmpty():not()}):and(${Vehicle_ID:isEmpty():not()}):and(${Vehicle_Speed:equals('0'):not()})}
    • Terminate Relationships : unmatched をチェック
       ※ ココだけは必要そうなので。ConnectionのRelationshipsも設定。
  • AttributesToJSON
    • Destination : flowfile-content
    • Attributes List : Vehicle_ID, Direction_of_Travel, Latitude, Longitude, Vehicle_Speed, Last_Time
  • MergeContent?
    • Minimum Number of Entries : 10
    • Maximum Number of Entries : 15
    • Delimiter Strategy : Text
    • Header : [
    • Footer : ]
    • Demarcator : , {press-shift+enter}

      ※ 波括弧内は要するに改行コードを意味している。
      NiFi?のGUIで改行コードを入力するのにshiftが必要ということ。

  • PutFile?
    • Directory : ./data-out/filtered_transitLoc_data
    • Terminate Relationships : failure, success をチェック

ココまでの設定が適切であれば、Processorのwarningマークはすべて取れているハズ。

手順1-2

RouteOnAttribute?だけ、Terminate Relationships : unmatched をチェックした。

データフローを実行する。

  • Processorを選択していない状態でOperatorの再生ボタンを押下する。
  • 入力フォルダにZIPファイルを放り込むと、上手いこと、出力フォルダにJSONが出力される。
  • しかし、ファイル名がUUIDのものが出力される。これはマージ後のフローファイルもフローに流れているためと解る。
  • MergeContent?のTerminate Relationships : original をチェックしてテストして上手く動作することを確認する。

チュートリアル 2

  • 概要
    • Google Places APIをNiFi?から利用し、
    • 車両の移動(車両位置情報XMLのシミュレーションデータ)に応じて周辺情報を表示。
  • リンク切れ
    • Lab2-NiFi?-Learn-Ropes.xmlテンプレート
      リンク切れだったので演習後のものを以下に添付した。

      fileLab2-NiFi-Learn-Ropes.xml
      ※ Google Places APIのAPIキーは削除してある。

準備

Processorを追加する。

以下のProcessorを追加する。

  • InvokeHTTP
    Google Places APIから車両ロケーション付近のJSON場所データを取得。
  • EvaluateJsonPath?
    JSON場所データからneighborhoods_nearbyとcityデータ要素を抽出して属性に設定。
  • RouteOnAttribute?
    neighborhoods_nearbyとcity属性が空でないフローファイルをルーティング。
  • AttributesToJSON
    フローファイルから属性を抽出し、JSON形式に変換
  • MergeContent?
    JSON形式のフローファイルのグループを配列としてマージ
  • PutFile?
    処理したファイルを出力。

Connectionで接続する。

チュートリアル 1RouteOnAttribute?から分岐して、合流せずに、
上記のProcessorをConnectionで順に繋ぎ、PutFile?で終わるデータフローを作成。

※ ConnectionのNameには適当な名称を付与。
 全てのRelationshipsを選択し、ProcessorのTerminateは設定しなかった。
 また、Selected Prioritizersには、FirstInFirstOutPrioritizer?, OldestFlowFileFirstPrioritizer?を設定。

Processorを設定する。

詳細が「Step 2」以降に書いてあるので、必要に応じてソレを参考にする。

  • InvokeHTTP
    • Remote URL : 以下を設定する。
      https://maps.googleapis.com/maps/api/place/nearbysearch/json?location=${Latitude},${Longitude}&radius=500&type=neighborhood&key=[取得したAPIキー]
    • Terminate Relationships : response以外を全てチェック
  • EvaluateXPath
    • Destination : flowfile-attribute ※ JSONPath式の結果をフローファイルの属性に保存する
    • Return Type : json
    • JSONPath式
      • city : $.results[0].vicinity ※ [+]ボタンでプロパティ追加する。
      • neighborhoods_nearby : $.results[*].name ※ [+]ボタンでプロパティ追加する。
  • RouteOnAttribute?
    • RouteNearbyNeighborhoods? : 以下の式を記入 ※ [+]ボタンでプロパティ追加する。
      ${city:isEmpty():not():and(${neighborhoods_nearby:isEmpty():not()})}
    • Terminate Relationships : unmatched をチェック
       ※ ココだけは必要そうなので。ConnectionのRelationshipsも設定。
  • AttributesToJSON
    • Destination : flowfile-content
    • Attributes List : Vehicle_ID, city, Latitude, Longitude, neighborhoods_nearby, Last_Time
  • MergeContent?
    • Minimum Number of Entries : 10
    • Maximum Number of Entries : 15
    • Delimiter Strategy : Text
    • Header : [
    • Footer : ]
    • Demarcator : , {press-shift+enter}

      ※ 波括弧内は要するに改行コードを意味している。
      NiFi?のGUIで改行コードを入力するのにshiftが必要ということ。

  • Terminate Relationships : original をチェック
  • PutFile?
    • Directory : ./data-out/nearby_neighborhoods_search
    • Terminate Relationships : failure, success をチェック

ココまでの設定が適切であれば、Processorのwarningマークはすべて取れているハズ。

手順2-2

データフローを実行する。

  • Processorを選択していない状態でOperatorの再生ボタンを押下する。
  • 入力フォルダにZIPを放り込むと、上手いこと、出力フォルダにJSONが出力される。

チュートリアル 3

  • 概要
    • 車両位置情報XMLのシミュレーションデータを生成するデータフローセクションを、
    • NextBusのライブストリームデータを取込セクションで置換。
  • リンク切れ
    • Lab3-NiFi?-Learn-Ropes.xmlテンプレート
      リンク切れだったので演習後のものを以下に添付した。

      fileLab3-NiFi-Learn-Ropes.xml
      ※ Google Places APIのAPIキーは削除してある。

Processorを追加する。

以下のProcessorを追加する。

  • GetHTTP
    • GetFile?UnpackContent?ControlRate?プロセッサを削除。
    • これらをGetHTTPプロセッサで置き換え、入力を シミュレーションデータから、NextBus?ライブストリームに変更。

Connectionで接続する。

GetHTTPとEvaluateXPathを接続したデータフローを作成。

※ ConnectionのNameには適当な名称を付与。
 全てのRelationshipsを選択し、ProcessorのTerminateは設定しなかった。
 また、Selected Prioritizersには、FirstInFirstOutPrioritizer?, OldestFlowFileFirstPrioritizer?を設定。

Processorを設定する。

以下のProcessorを設定する。

※ APIの仕様が変わったのか、データが取得できなかったので、パラメタの"r=M"を削除した。

ココまでの設定が適切であれば、Processorのwarningマークはすべて取れているハズ。

手順3-2

データフローを実行する。

  • Processorを選択していない状態でOperatorの再生ボタンを押下する。
  • 1 sec毎に、NextBusのHTTPレスポンスから入力され、出力フォルダにJSONが出力される。
手順3-3

※ キューが詰まっている部分が赤く表示される。

追加(自習)コンテンツ

WebAPIを作成してみる。

  • HandleHttpRequest?/ResponseのProcessorを使用して、WebAPIを作成する。
  • HandleHttpRequest?/Responseの利用方法についてはコレが参考になる。
  • テンプレート : 演習後のものを以下に添付した。

    fileCreateWebAPI.xml

Processorを追加する。

以下のProcessorを追加する。

  • HandleHttpRequest?
    HTTPクライアントからのHTTPリクエストを受け取る。
  • ReplaceText?
    HTTPリクエストからHTTPレスポンスで返すボディの文字列を設定。
  • HandleHttpResponse?
    HTTPクライアントにHTTPレスポンスを返す。
  • AttributesToJSON
    フローファイルから属性を抽出し、JSON形式に変換(HTTPヘッダ)
  • MergeContent?
    JSON形式のHTTPヘッダとHTTPボディをマージ
  • PutFile?
    処理したファイルを出力(ココでは、確認のためHTTPリクエストを出力する)。
  • LogAttribute?
    ReplaceText?HandleHttpResponse?のfailureをこちらに流す。

Connectionで接続する。

以下のように接続する。

手順4-1

※ ConnectionのNameには適当な名称を付与。
 全てのRelationshipsを選択し、ProcessorのTerminateは設定しなかった。
 また、Selected Prioritizersには、FirstInFirstOutPrioritizer?, OldestFlowFileFirstPrioritizer?を設定。

Processorを設定する。

以下のProcessorを設定する。

  • HandleHttpRequest?
    • Listening Port : 9095
    • HTTP Context Map :
      • DDLから[Create new service...]を選択する。
      • [Add Controller Service]ダイアログで[APPLY]ボタンをクリック。
      • [StandardHttpContextMap?]Controller Serviceが追加され[→]が表示される。
      • コレを押下し、[Enable]アイコンをクリックし[StandardHttpContextMap?]を有効化。
  • ReplaceText?
    • Replacement Value : {"Result": "OK"}
    • Replacement Strategy : Always Replace
  • HandleHttpResponse?
    • HTTP Status Code : 202
    • HTTP Context Map : DDLから、先ほど作成した[StandardHttpContextMap?]を選択する。
    • Terminate Relationships : success をチェック
  • AttributesToJSON
    • Destination : flowfile-content
    • Attributes List : 空文字列(全属性が対象)
  • MergeContent?
    • Minimum Number of Entries : 1
    • Maximum Number of Entries : 2
    • Delimiter Strategy : Text
    • Header : [
    • Footer : ]
    • Demarcator : , {press-shift+enter}

      ※ 波括弧内は要するに改行コードを意味している。
      NiFi?のGUIで改行コードを入力するのにshiftが必要ということ。

  • PutFile?
    • Directory : ./data-out
    • Terminate Relationships : failure, success をチェック
  • LogAttribute?
    • Terminate Relationships : success をチェック

ココまでの設定が適切であれば、Processorのwarningマークはすべて取れているハズ。

手順4-2

※ Controller Serviceとは、コチラ?を参照。
 StandardHttpContextMap?で、Processor間でのHttpContext?を共有する。

データフローを実行する。

  • cURLで以下を実行する。
    >curl -i -XPOST -H "Content-type: application/json" -d "{"name": "C", "age": 20}" localhost:9095
  • 以下のように結果が返り、生のHTTPリクエストのJSONが./data-outフォルダに保存される。
    HTTP/1.1 202 Accepted
    Date: Mon, 09 Jul 2018 05:05:32 GMT
    Transfer-Encoding: chunked
    Server: Jetty(9.4.3.v20170317)
    
    {"Result": "OK"}

※ 参考に従いLogAttribute?を追加してみたが、
 エラーが起きないのでfailureに流れない。

Webスクレイピング処理を実装してみる。

  • 以下が参考になるが、

大き過ぎるので、「本WikiのFrontPageのタイトルを抽出する」小さめのサンプルとしてみる。

準備

HTTPSアクセスのため、以下の手順を参考にして、Truststore.jksファイルを作成する。

  • ロケーション・バーの保護された通信をクリックし、[証明書]を選択。
  • 表示された[証明書]ダイアログの[詳細]タブで[ファイルにコピー]を選択。
  • 既定値でxxxx.cerのようにエクスポートし、
  • エクスポートしたファイルに対して、以下のコマンドを実行。
    keytool -import -alias dotnetdevelopmentinfrastructure.osscons.jp -file ...\xxxx.cer -keystore ...\certificate\Truststore.jks -storepass changeit
    • keytoolは、jdkのbinにある。
    • -alias はユニークなら何でもOK.
    • -storepass changeitは標準のパスフレーズ
    • -file, -keystore は適宜変更。-keystoreのフォルダは作成しておく。
      この証明書を信頼しますか。 [いいえ]:  Y
      証明書がキーストアに追加されました

Processorを追加する。

  • GetHTTP
    HTMLコンテンツを取得する。
  • GetHTMLElement
    HTMLエレメントを取得する。
  • ExtractText?
    正規表現で情報を抜く。
  • AttributesToJSON
    フローファイルから属性を抽出し、JSON形式に変換
  • PutFile?
    結果ファイルを出力する。

Connectionで接続する。

上記のProcessorをConnectionで順に繋ぐ。

※ ConnectionのNameには適当な名称を付与。
 全てのRelationshipsを選択し、ProcessorのTerminateは設定しなかった。
 また、Selected Prioritizersには、FirstInFirstOutPrioritizer?, OldestFlowFileFirstPrioritizer?を設定。

Processorを設定する。

以下のProcessorを設定する。

  • GetHTTP
    • URL : https://dotnetdevelopmentinfrastructure.osscons.jp/index.php?FrontPage
    • Filename : FrontPage_dnetdevinf${now():format("HHmmssSSS")}.html
    • SSL Context Service : 前述の要領で、Control Serviceを追加する。
      [Add Controller Service]ダイアログでDDLからStandardSSLContextService?を選択し、
      以下を設定後、Control ServiceをEnableにする。
      • Truststore Filename : .\certificate\Truststore.jks
      • Truststore Password : changeit
      • Truststore Type : JKS
      • TLS Protocol: SSL
  • Run Schedule : 1 sec
  • GetHTMLElement
    • URL : hoge
    • CSS Selector : h1.title
    • Destination : flowfile-content
    • Terminate Relationships : success以外を全てチェック
  • AttributesToJSON
    • Destination : flowfile-content
    • Attributes List : 空文字列(全属性が対象)
  • PutFile?
    • Directory : ./data-out
    • Terminate Relationships : failure, success をチェック

ココまでの設定が適切であれば、Processorのwarningマークはすべて取れているハズ。

手順5-2

データフローを実行する。

以下のような出力を得られる。

{
  path:./
  filename:FrontPage_dnetdevinf095740401.html
  gethttp.remote.source:dotnetdevelopmentinfrastructure.osscons.jp
  Title.1:FrontPage
  Title.0:page=FrontPage">FrontPage</a>
  Title:FrontPage
  mime.type:text/html; charset=UTF-8
  uuid:53fff8cb-64de-494a-a818-c7ee6295f6c0
}

RDBMS処理を実装してみる。

  • コレを参考に、以下のサンプルをコンテンツを開発。

準備

Processorを追加する。

  • GenerateFlowFile?
    JSON配列形式のフローファイルを生成
  • SplitJson?
    JSON配列を分割する。
  • EvaluateJsonPath?
    JSONのKey-Valueを属性に抽出(パラメタ値)。
  • UpdateAttribute?
    各フローファイルのファイル名称属性を更新(パラメタ型)
  • ExecuteSQL
    属性を使用し、Prepared statementにパラメタライズして実行する。
  • ConvertAvroToJSON
    結果をJSONに変換する。
  • UpdateAttribute?
    各フローファイルのファイル名称属性を更新(ファイル名)
  • PutFile?
    結果をファイルに出力。

Connectionで接続する。

上記のProcessorをConnectionで順に繋ぐ。

※ ConnectionのNameには適当な名称を付与。
 全てのRelationshipsを選択し、ProcessorのTerminateは設定しなかった。
 また、Selected Prioritizersには、FirstInFirstOutPrioritizer?, OldestFlowFileFirstPrioritizer?を設定。

Processorを設定する。

  • GenerateFlowFile?
    • Custom Text :
      [
        {
          "ShipperID" : 1,
          "CompanyName" : "",
          "Phone" : ""
        },
        {
          "ShipperID" : 2,
          "CompanyName" : "",
          "Phone" : ""
        },
        {
          "ShipperID" : 3,
          "CompanyName" : "",
          "Phone" : ""
        }
      ]
  • SplitJson?
    • JsonPath? Expression : $
    • Terminate Relationships : split以外を全てチェック
  • EvaluateJsonPath?
    • Destination : flowfile-attribute
    • sql.args.1.value : $.ShipperID ※ [+]ボタンでプロパティ追加する。
      ※ sql.args.N.typeのNでPrepared Statementのインデックスを指定
    • Terminate Relationships : matched以外を全てチェック
  • UpdateAttribute?
    • sql.args.1.type : 4 ※ [+]ボタンでプロパティ追加する。
      ※ sql.args.N.typeのNでPrepared Statementのインデックスを指定
  • ExecuteSQL
    • setSQL select query : SELECT * FROM Shippers WHERE ShipperID = ?
    • Database Connection Pooling Service : 前述の要領で、Control Serviceを追加する。
      [Add Controller Service]ダイアログでDDLからDBCPConnectionPool?を選択し、
      以下を設定後、Control ServiceをEnableにする。
      • Database Connection URL :
        jdbc:sqlserver://<ipaddress>or<hostname>:<portno>;database=<databasename>
        e.g. : jdbc:sqlserver://localhost:1433;database=northwind
      • Database Driver Class Name : com.microsoft.sqlserver.jdbc.SQLServerDriver?
      • Database Driver Location(s) : C:\Program Files (x86)\Java\jdbc\sqljdbc_6.0\jpn\jre8\sqljdbc42.jar
      • Database User : ****
      • Password : ****
    • Terminate Relationships : failureをチェック
  • ConvertAvroToJSON
    • 既定値
    • Terminate Relationships : failureをチェック
  • UpdateAttribute?
    • filename : ${UUID()} ※ [+]ボタンでプロパティ追加する。
  • PutFile?
    • Directory : ./data-out
    • Terminate Relationships : failure, success をチェック

ココまでの設定が適切であれば、Processorのwarningマークはすべて取れているハズ。

手順6-2

データフローを実行する。

以下のような出力を得られる。

{"ShipperID": 1, "CompanyName": "Speedy Express", "Phone": "(503) 555-9831"}
{"ShipperID": 2, "CompanyName": "United Package", "Phone": "(503) 555-3199"}
{"ShipperID": 3, "CompanyName": "Federal Shipping", "Phone": "(503) 555-9931"}

参考

参考資料

Apache NiFi - Processor


添付ファイル: fileExecuteSQLSample.xml 424件 [詳細] file6-2.png 548件 [詳細] fileScrapingImplementation.xml 420件 [詳細] file5-2.png 493件 [詳細] file4-1.png 624件 [詳細] fileCreateWebAPI.xml 452件 [詳細] file4-2.png 492件 [詳細] file3-2.png 513件 [詳細] fileLab1-NiFi-Learn-Ropes.xml 483件 [詳細] fileLab3-NiFi-Learn-Ropes.xml 419件 [詳細] file3-3.png 538件 [詳細] fileLab2-NiFi-Learn-Ropes.xml 424件 [詳細] file2-2.png 569件 [詳細] file1-2.png 534件 [詳細] file1-1.png 579件 [詳細] file16302-trafficlocs-data-for-simulator.zip 460件 [詳細]

トップ   編集 凍結 差分 バックアップ 添付 複製 名前変更 リロード   新規 一覧 単語検索 最終更新   ヘルプ   最終更新のRSS
Last-modified: 2021-01-12 (火) 11:30:13 (1194d)