「[[.NET 開発基盤部会 Wiki>http://dotnetdevelopmentinfrastructure.osscons.jp]]」は、「[[Open棟梁Project>https://github.com/OpenTouryoProject/]]」,「[[OSSコンソーシアム .NET開発基盤部会>https://www.osscons.jp/dotNetDevelopmentInfrastructure/]]」によって運営されています。 -[[戻る>Apache NiFi]] --[[Apache NiFiファースト・ステップ]] --Apache NiFiセカンド・ステップ *目次 [#r0303ee6] #contents *概要 [#r7ffacea] -[[ファースト・ステップ>Apache NiFiファースト・ステップ]]では、下記を行った。 --インストール --初歩的な利用方法 --基本的な操作方法 -セカンド・ステップでは、 --[[チュートリアル>#j06a5c12]] ---[[チュートリアル 1: シンプルなNiFiデータフロー構築>#f20c0bee]] ---[[チュートリアル 2: 地理情報でデータフローをエンリッチメント>#g5d5e40d]] ---[[チュートリアル 3: NextBusライブストリームの取込>#o248a2bd]] --[[追加(自習)コンテンツ>#e59fd742]] ---[[WebAPIを作成してみる。>#f80dad4f]]~ HandleHttpRequest/ResponseのProcessorを使用して、WebAPIを作成する。 ---[[Webスクレイピング処理を実装してみる。>#n7f497dd]]~ 本WikiのFrontPageのタイトルを抽出する。 本WikiのFront Pageのタイトルを抽出する。 ---[[RDBMS処理を実装してみる。>#vbe20c60]]~ ExecuteSQLをパラメタライズして実行する。 >などをやってみる。 *チュートリアル [#j06a5c12] 以下を参考に、チュートリアルを遂行。 -Learning the Ropes of Apache NiFi · ijokarumawak/hdf-tutorials-ja Wiki~ https://github.com/ijokarumawak/hdf-tutorials-ja/wiki/Learning-the-Ropes-of-Apache-NiFi このチュートリアルは、ある都市計画委員会が新規の高速道路の建造の評価のために、~ リアルタイムデータを活用し、交通パターンのより深い理解を得るというシナリオ **チュートリアル 1 [#f20c0bee] -「シンプルなNiFiデータフロー構築」をやってみる。~ https://github.com/ijokarumawak/hdf-tutorials-ja/wiki/Ropes-of-Apache-NiFi:-Tutorial-1 -概要 ++車両位置情報XMLのシミュレーションデータを取り込む。 ++フローファイルから車両位置詳細属性を抽出する。 ++それらの詳細属性が空でない場合、JSONファイルに変換。 -リンク切れ --Lab1-NiFi-Learn-Ropes.xmlテンプレート~ リンク切れだったので演習後のものを以下に添付した。 >&ref(Lab1-NiFi-Learn-Ropes.xml); --trafficLocs_data_for_simulator.zip~ 出だしのZIPがリンク切れなので、[[ココ>https://community.hortonworks.com/questions/107442/file-analyze-traffic-pattern-with-apache-nifiasset.html]]から取得したものを以下に添付した。 >&ref(16302-trafficlocs-data-for-simulator.zip); ***Processorを追加する。 [#k488bdb8] 以下のProcessorを追加する。 -GetFile~ 処理するファイルを入力。 -UnpackContent~ 交通シミュレータZipファイルからフローファイルのコンテンツを解凍 -ControlRate~ プロセッサに流すフローファイルのフローのレートを制御 -EvaluateXPath~ 車両位置情報XMLから最終更新タイムスタンプを属性として抽出 -SplitXML~ 親の子要素を複数のフローファイルに分割 -UpdateAttribute~ 各フローファイルのファイル名称属性を更新(ファイル名) -EvaluateXPath~ 車両位置情報XMLから車両ID、方向、緯度、軽度、速度を属性として抽出 -RouteOnAttribute~ 各属性がフィルタ条件に一致する場合のみ、フローファイルを後続のフローに流す。 -AttributesToJSON~ フローファイルから属性を抽出し、JSON形式に変換 -MergeContent~ JSON形式のフローファイルのグループを配列としてマージ -PutFile~ 処理したファイルを出力。 ***Connectionで接続する。 [#ib8c3d39] 図がリンク切れだけど、コンテキストから推測するに多分、こう。 #ref(1-1.png,left,nowrap,手順1-1,60%) ※ ConnectionのNameには適当な名称を付与。~ 全てのRelationshipsを選択し、ProcessorのTerminateは設定しなかった。~ また、Selected Prioritizersには、FirstInFirstOutPrioritizer, OldestFlowFileFirstPrioritizerを設定。~ ***Processorを設定する。 [#s638be10] 「[[1.1 学習の目的: データフロー作成プロセスの概要>https://github.com/ijokarumawak/hdf-tutorials-ja/wiki/Ropes-of-Apache-NiFi%3A-Tutorial-1#11-%E5%AD%A6%E7%BF%92%E3%81%AE%E7%9B%AE%E7%9A%84-%E3%83%87%E3%83%BC%E3%82%BF%E3%83%95%E3%83%AD%E3%83%BC%E4%BD%9C%E6%88%90%E3%83%97%E3%83%AD%E3%82%BB%E3%82%B9%E3%81%AE%E6%A6%82%E8%A6%81]]」を参考に設定。~ (詳細は「[[Step 2>https://github.com/ijokarumawak/hdf-tutorials-ja/wiki/Ropes-of-Apache-NiFi%3A-Tutorial-1#step-2-xml%E3%82%B7%E3%83%9F%E3%83%A5%E3%83%AC%E3%83%BC%E3%82%BF%E3%83%87%E3%83%BC%E3%82%BF%E3%83%95%E3%83%AD%E3%83%BC%E3%82%BB%E3%82%AF%E3%82%B7%E3%83%A7%E3%83%B3%E3%81%AE%E4%BD%9C%E6%88%90]]」以降に書いてあるので、必要に応じてソレを参考にする) -GetFile --Input Directory : ./data-in -UnpackContent --Packaging Format : zip -ControlRate --Rate Control Criteria : flowfile count --Maximum Rate : 3 --setTime Duration : 10 second -EvaluateXPath --Destination : flowfile-attribute ※ XPath式の結果をフローファイルの属性に保存する --XPath式 ---Last_Time : //body/lastTime/@time ※ [+]ボタンでプロパティ追加する。 -SplitXML : 既定値 -UpdateAttribute --filename : ${UUID()} ※ [+]ボタンでプロパティ追加する。 -EvaluateXPath~ --Destination : flowfile-attribute ※ XPath式の結果をフローファイルの属性に保存する --XPath式 ---Direction_of_Travel : //vehicle/@dirTag ※ [+]ボタンでプロパティ追加する。 ---Latitude : //vehicle/@lat ※ [+]ボタンでプロパティ追加する。 ---Longitude : //vehicle/@lon ※ [+]ボタンでプロパティ追加する。 ---Vehicle_ID : //vehicle/@id ※ [+]ボタンでプロパティ追加する。 ---Vehicle_Speed : //vehicle/@speedKmHr ※ [+]ボタンでプロパティ追加する。 -RouteOnAttribute --Filter_Attributes : 以下の式を記入 ※ [+]ボタンでプロパティ追加する。 ${Direction_of_Travel:isEmpty():not():and(${Last_Time:isEmpty():not()}):and(${Latitude:isEmpty():not()}):and(${Longitude:isEmpty():not()}):and(${Vehicle_ID:isEmpty():not()}):and(${Vehicle_Speed:equals('0'):not()})} --Terminate Relationships : unmatched をチェック~ ※ ココだけは必要そうなので。ConnectionのRelationshipsも設定。 -AttributesToJSON --Destination : flowfile-content --Attributes List : Vehicle_ID, Direction_of_Travel, Latitude, Longitude, Vehicle_Speed, Last_Time -MergeContent --Minimum Number of Entries : 10 --Maximum Number of Entries : 15 --Delimiter Strategy : Text --Header : [ --Footer : ] --Demarcator : , {press-shift+enter} >※ 波括弧内は要するに改行コードを意味している。~ NiFiのGUIで改行コードを入力するのにshiftが必要ということ。 -PutFile --Directory : ./data-out/filtered_transitLoc_data --Terminate Relationships : failure, success をチェック ココまでの設定が適切であれば、Processorのwarningマークはすべて取れているハズ。 #ref(1-2.png,left,nowrap,手順1-2,60%) ※ RouteOnAttributeだけ、Terminate Relationships : unmatched をチェックした。 ***データフローを実行する。 [#z60ae427] -Processorを選択していない状態でOperatorの再生ボタンを押下する。 -入力フォルダにZIPファイルを放り込むと、上手いこと、出力フォルダにJSONが出力される。 -しかし、ファイル名がUUIDのものが出力される。これはマージ後のフローファイルもフローに流れているためと解る。 -MergeContentのTerminate Relationships : original をチェックしてテストして上手く動作することを確認する。 **チュートリアル 2 [#g5d5e40d] -「地理情報でデータフローをエンリッチメント」をやってみる。エンリッチメント=拡充的な意味か。~ https://github.com/ijokarumawak/hdf-tutorials-ja/wiki/Ropes-of-Apache-NiFi:-Tutorial-2 -概要 --Google Places APIをNiFiから利用し、 --車両の移動(車両位置情報XMLのシミュレーションデータ)に応じて周辺情報を表示。 -リンク切れ --Lab2-NiFi-Learn-Ropes.xmlテンプレート~ リンク切れだったので演習後のものを以下に添付した。 >&ref(Lab2-NiFi-Learn-Ropes.xml);~ ※ Google Places APIのAPIキーは削除してある。 ***準備 [#e67606b3] -Google Places APIのAPIキーを取得しておく。~ https://developers.google.com/places/web-service/get-api-key ***Processorを追加する。 [#h3fc279e] 以下のProcessorを追加する。 -InvokeHTTP~ Google Places APIから車両ロケーション付近のJSON場所データを取得。 -EvaluateJsonPath~ JSON場所データからneighborhoods_nearbyとcityデータ要素を抽出して属性に設定。 -RouteOnAttribute~ neighborhoods_nearbyとcity属性が空でないフローファイルをルーティング。 -AttributesToJSON~ フローファイルから属性を抽出し、JSON形式に変換 -MergeContent~ JSON形式のフローファイルのグループを配列としてマージ -PutFile~ 処理したファイルを出力。 ***Connectionで接続する。 [#u87f8225] [[チュートリアル 1>#f20c0bee]]のRouteOnAttributeから分岐して、合流せずに、~ [[上記のProcessor>#h3fc279e]]をConnectionで順に繋ぎ、PutFileで終わるデータフローを作成。 ※ ConnectionのNameには適当な名称を付与。~ 全てのRelationshipsを選択し、ProcessorのTerminateは設定しなかった。~ また、Selected Prioritizersには、FirstInFirstOutPrioritizer, OldestFlowFileFirstPrioritizerを設定。~ ***Processorを設定する。 [#y84b0ad5] 詳細が「[[Step 2>https://github.com/ijokarumawak/hdf-tutorials-ja/wiki/Ropes-of-Apache-NiFi:-Tutorial-2#step-2-%E5%9C%B0%E7%90%86%E7%9A%84%E4%BD%8D%E7%BD%AE%E6%83%85%E5%A0%B1%E3%82%A8%E3%83%B3%E3%83%AA%E3%83%83%E3%83%81%E3%83%A1%E3%83%B3%E3%83%88%E3%83%87%E3%83%BC%E3%82%BF%E3%83%95%E3%83%AD%E3%83%BC%E3%82%BB%E3%82%AF%E3%82%B7%E3%83%A7%E3%83%B3%E3%81%AE%E4%BD%9C%E6%88%90]]」以降に書いてあるので、必要に応じてソレを参考にする。 -InvokeHTTP --Remote URL : 以下を設定する。 https://maps.googleapis.com/maps/api/place/nearbysearch/json?location=${Latitude},${Longitude}&radius=500&type=neighborhood&key=[取得したAPIキー] --Terminate Relationships : response以外を全てチェック -EvaluateXPath~ --Destination : flowfile-attribute ※ JSONPath式の結果をフローファイルの属性に保存する --Return Type : json --JSONPath式 ---city : $.results[0].vicinity ※ [+]ボタンでプロパティ追加する。 ---neighborhoods_nearby : $.results[*].name ※ [+]ボタンでプロパティ追加する。 -RouteOnAttribute --RouteNearbyNeighborhoods : 以下の式を記入 ※ [+]ボタンでプロパティ追加する。 ${city:isEmpty():not():and(${neighborhoods_nearby:isEmpty():not()})} --Terminate Relationships : unmatched をチェック~ ※ ココだけは必要そうなので。ConnectionのRelationshipsも設定。 -AttributesToJSON --Destination : flowfile-content --Attributes List : Vehicle_ID, city, Latitude, Longitude, neighborhoods_nearby, Last_Time -MergeContent --Minimum Number of Entries : 10 --Maximum Number of Entries : 15 --Delimiter Strategy : Text --Header : [ --Footer : ] --Demarcator : , {press-shift+enter} >※ 波括弧内は要するに改行コードを意味している。~ NiFiのGUIで改行コードを入力するのにshiftが必要ということ。 --Terminate Relationships : original をチェック -PutFile --Directory : ./data-out/nearby_neighborhoods_search --Terminate Relationships : failure, success をチェック ココまでの設定が適切であれば、Processorのwarningマークはすべて取れているハズ。 #ref(2-2.png,left,nowrap,手順2-2,60%) ***データフローを実行する。 [#ff18a00c] -Processorを選択していない状態でOperatorの再生ボタンを押下する。 -入力フォルダにZIPを放り込むと、上手いこと、出力フォルダにJSONが出力される。 **チュートリアル 3 [#o248a2bd] -「[[NextBus>https://www.nextbus.com/xmlFeedDocs/NextBusXMLFeed.pdf]]ライブストリームの取込」をやってみる。~ https://github.com/ijokarumawak/hdf-tutorials-ja/wiki/Ropes-of-Apache-NiFi:-Tutorial-3 -概要 --車両位置情報XMLのシミュレーションデータを生成するデータフローセクションを、 --[[NextBus>https://www.nextbus.com/xmlFeedDocs/NextBusXMLFeed.pdf]]のライブストリームデータを取込セクションで置換。 -リンク切れ --Lab3-NiFi-Learn-Ropes.xmlテンプレート~ リンク切れだったので演習後のものを以下に添付した。 >&ref(Lab3-NiFi-Learn-Ropes.xml);~ ※ Google Places APIのAPIキーは削除してある。 ***Processorを追加する。 [#zb9c714c] 以下のProcessorを追加する。 -GetHTTP --GetFile、UnpackContent、ControlRateプロセッサを削除。 --これらをGetHTTPプロセッサで置き換え、入力を シミュレーションデータから、NextBusライブストリームに変更。 ***Connectionで接続する。 [#wbf604ee] GetHTTPとEvaluateXPathを接続したデータフローを作成。 ※ ConnectionのNameには適当な名称を付与。~ 全てのRelationshipsを選択し、ProcessorのTerminateは設定しなかった。~ また、Selected Prioritizersには、FirstInFirstOutPrioritizer, OldestFlowFileFirstPrioritizerを設定。~ ***Processorを設定する。 [#g587eaf7] 以下のProcessorを設定する。 -GetHTTP --URL : http://webservices.nextbus.com/service/publicXMLFeed?command=vehicleLocations&a=sf-muni&t=0 --Filename : vehicleLoc_SF_OceanView_${now():format("HHmmssSSS")}.xml --Run Schedule : 1 sec ※ APIの仕様が変わったのか、データが取得できなかったので、パラメタの"r=M"を削除した。 ココまでの設定が適切であれば、Processorのwarningマークはすべて取れているハズ。 #ref(3-2.png,left,nowrap,手順3-2,60%) ***データフローを実行する。 [#p9d02a6f] -Processorを選択していない状態でOperatorの再生ボタンを押下する。 -1 sec毎に、[[NextBus>https://www.nextbus.com/xmlFeedDocs/NextBusXMLFeed.pdf]]のHTTPレスポンスから入力され、出力フォルダにJSONが出力される。 #ref(3-3.png,left,nowrap,手順3-3,60%) ※ キューが詰まっている部分が赤く表示される。 *追加(自習)コンテンツ [#e59fd742] **WebAPIを作成してみる。 [#f80dad4f] -HandleHttpRequest/ResponseのProcessorを使用して、WebAPIを作成する。 -HandleHttpRequest/Responseの利用方法については[[コレ>https://github.com/ijokarumawak/hdf-tutorials-ja/wiki/HDF%E3%83%8F%E3%83%B3%E3%82%BA%E3%82%AA%E3%83%B3-1:-%E3%83%87%E3%83%BC%E3%82%BF%E3%81%AE%E5%8F%96%E8%BE%BC]]が参考になる。 -テンプレート : 演習後のものを以下に添付した。 >&ref(CreateWebAPI.xml); ***Processorを追加する。 [#f21305c2] 以下のProcessorを追加する。 -HandleHttpRequest~ HTTPクライアントからのHTTPリクエストを受け取る。 -ReplaceText~ HTTPリクエストからHTTPレスポンスで返すボディの文字列を設定。 -HandleHttpResponse~ HTTPクライアントにHTTPレスポンスを返す。 -AttributesToJSON~ フローファイルから属性を抽出し、JSON形式に変換(HTTPヘッダ) -MergeContent~ JSON形式のHTTPヘッダとHTTPボディをマージ -PutFile~ 処理したファイルを出力(ココでは、確認のためHTTPリクエストを出力する)。 -LogAttribute~ ReplaceTextとHandleHttpResponseのfailureをこちらに流す。 ***Connectionで接続する。 [#pdfe404b] 以下のように接続する。 #ref(4-1.png,left,nowrap,手順4-1,60%) ※ ConnectionのNameには適当な名称を付与。~ 全てのRelationshipsを選択し、ProcessorのTerminateは設定しなかった。~ また、Selected Prioritizersには、FirstInFirstOutPrioritizer, OldestFlowFileFirstPrioritizerを設定。~ ***Processorを設定する。 [#k2b4a7e0] 以下のProcessorを設定する。 -HandleHttpRequest --Listening Port : 9095 --HTTP Context Map : ---DDLから[Create new service...]を選択する。 ---[Add Controller Service]ダイアログで[APPLY]ボタンをクリック。 ---[StandardHttpContextMap]Controller Serviceが追加され[→]が表示される。 ---コレを押下し、[Enable]アイコンをクリックし[StandardHttpContextMap]を有効化。 -ReplaceText --Replacement Value : {"Result": "OK"} --Replacement Strategy : Always Replace -HandleHttpResponse~ --HTTP Status Code : 202 --HTTP Context Map : DDLから、先ほど作成した[StandardHttpContextMap]を選択する。 --Terminate Relationships : success をチェック -AttributesToJSON --Destination : flowfile-content --Attributes List : 空文字列(全属性が対象) -MergeContent --Minimum Number of Entries : 1 --Maximum Number of Entries : 2 --Delimiter Strategy : Text --Header : [ --Footer : ] --Demarcator : , {press-shift+enter} >※ 波括弧内は要するに改行コードを意味している。~ NiFiのGUIで改行コードを入力するのにshiftが必要ということ。 -PutFile --Directory : ./data-out --Terminate Relationships : failure, success をチェック -LogAttribute~ --Terminate Relationships : success をチェック ココまでの設定が適切であれば、Processorのwarningマークはすべて取れているハズ。 #ref(4-2.png,left,nowrap,手順4-2,60%) ※ Controller Serviceとは、[[コチラ>ApacheNiFi#m40f0ef5]]を参照。~ StandardHttpContextMapで、Processor間でのHttpContextを共有する。 ***データフローを実行する。 [#yc42a9eb] -[[cURL>https://techinfoofmicrosofttech.osscons.jp/index.php?cURL%E3%82%B3%E3%83%9E%E3%83%B3%E3%83%89]]で以下を実行する。 >curl -i -XPOST -H "Content-type: application/json" -d "{"name": "C", "age": 20}" localhost:9095 -以下のように結果が返り、生のHTTPリクエストのJSONが./data-outフォルダに保存される。 HTTP/1.1 202 Accepted Date: Mon, 09 Jul 2018 05:05:32 GMT Transfer-Encoding: chunked Server: Jetty(9.4.3.v20170317) {"Result": "OK"} ※ 参考に従いLogAttributeを追加してみたが、~ エラーが起きないのでfailureに流れない。 **Webスクレイピング処理を実装してみる。 [#n7f497dd] -以下が参考になるが、 --nifi-templates/Web_Scraper_Sample.xml at master · hortonworks-gallery/nifi-templates~ https://github.com/hortonworks-gallery/nifi-templates/blob/master/templates/Web_Scraper_Sample.xml >大き過ぎるので、「本WikiのFrontPageのタイトルを抽出する」小さめのサンプルとしてみる。 -テンプレート : 演習後のものを以下に添付した。 >&ref(ScrapingImplementation.xml);~ ***準備 [#a8277084] HTTPSアクセスのため、以下の手順を参考にして、Truststore.jksファイルを作成する。 -以下のURL(HTTPSアクセスするURL)をChromeのロケーション・バーに打ち込んで、~ https://dotnetdevelopmentinfrastructure.osscons.jp/ -ロケーション・バーの保護された通信をクリックし、[証明書]を選択。 -表示された[証明書]ダイアログの[詳細]タブで[ファイルにコピー]を選択。 -既定値でxxxx.cerのようにエクスポートし、 -エクスポートしたファイルに対して、以下のコマンドを実行。 keytool -import -alias dotnetdevelopmentinfrastructure.osscons.jp -file ...\xxxx.cer -keystore ...\certificate\Truststore.jks -storepass changeit -- keytoolは、jdkのbinにある。 -- -alias はユニークなら何でもOK. -- -storepass changeitは標準のパスフレーズ -- -file, -keystore は適宜変更。-keystoreのフォルダは作成しておく。 この証明書を信頼しますか。 [いいえ]: Y 証明書がキーストアに追加されました ***Processorを追加する。 [#y5e88841] -GetHTTP~ HTMLコンテンツを取得する。 -GetHTMLElement~ HTMLエレメントを取得する。 -ExtractText~ 正規表現で情報を抜く。 -AttributesToJSON~ フローファイルから属性を抽出し、JSON形式に変換 -PutFile~ 結果ファイルを出力する。 ***Connectionで接続する。 [#g26266bc] [[上記のProcessor>#y5e88841]]をConnectionで順に繋ぐ。 ※ ConnectionのNameには適当な名称を付与。~ 全てのRelationshipsを選択し、ProcessorのTerminateは設定しなかった。~ また、Selected Prioritizersには、FirstInFirstOutPrioritizer, OldestFlowFileFirstPrioritizerを設定。~ ***Processorを設定する。 [#d0532ae1] 以下のProcessorを設定する。 -GetHTTP~ --URL : https://dotnetdevelopmentinfrastructure.osscons.jp/index.php?FrontPage --Filename : FrontPage_dnetdevinf${now():format("HHmmssSSS")}.html --SSL Context Service : [[前述の要領>#k2b4a7e0]]で、Control Serviceを追加する。~ [Add Controller Service]ダイアログでDDLからStandardSSLContextServiceを選択し、~ 以下を設定後、Control ServiceをEnableにする。 ---Truststore Filename : .\certificate\Truststore.jks ---Truststore Password : changeit ---Truststore Type : JKS ---TLS Protocol: SSL --Run Schedule : 1 sec -GetHTMLElement --URL : hoge --CSS Selector : h1.title --Destination : flowfile-content --Terminate Relationships : success以外を全てチェック -ExtractText ※ 正規表現(Regular Expression)の結果をフローファイルの属性に保存する --Title : page=FrontPage">(.+?)</a> ※ [+]ボタンでプロパティ追加する。 --Terminate Relationships : unmatched をチェック --参考 : [[Jmeterの正規表現 - マイクロソフト系技術情報 Wiki>https://techinfoofmicrosofttech.osscons.jp/index.php?Jmeter%E3%81%AE%E6%AD%A3%E8%A6%8F%E8%A1%A8%E7%8F%BE]] -AttributesToJSON --Destination : flowfile-content --Attributes List : 空文字列(全属性が対象) -PutFile --Directory : ./data-out --Terminate Relationships : failure, success をチェック ココまでの設定が適切であれば、Processorのwarningマークはすべて取れているハズ。 #ref(5-2.png,left,nowrap,手順5-2,60%) ***データフローを実行する。 [#d3ccec97] 以下のような出力を得られる。 { path:./ filename:FrontPage_dnetdevinf095740401.html gethttp.remote.source:dotnetdevelopmentinfrastructure.osscons.jp Title.1:FrontPage Title.0:page=FrontPage">FrontPage</a> Title:FrontPage mime.type:text/html; charset=UTF-8 uuid:53fff8cb-64de-494a-a818-c7ee6295f6c0 } **RDBMS処理を実装してみる。 [#vbe20c60] -[[コレ>Apache NiFi - RDBMS系のProcessor#dcf5b75e]]を参考に、以下のサンプルをコンテンツを開発。 -テンプレート : 演習後のものを以下に添付した。 >&ref(ExecuteSQLSample.xml); ***準備 [#v5f709b8] -RDBMSの準備:ココではSQL ServerのNorthwindを選択。 -JDBCの準備:事前にMicrosoft SQL Server 用 JDBC Driverをインストールしておく。 --JDBC ドライバーの使用 | Microsoft Docs~ https://docs.microsoft.com/ja-jp/sql/connect/jdbc/using-the-jdbc-driver?view=sql-server-2017 ***Processorを追加する。 [#t52d0467] -GenerateFlowFile~ JSON配列形式のフローファイルを生成 -SplitJson~ JSON配列を分割する。 -EvaluateJsonPath~ JSONのKey-Valueを属性に抽出(パラメタ値)。 -UpdateAttribute~ 各フローファイルのファイル名称属性を更新(パラメタ型) -ExecuteSQL~ 属性を使用し、Prepared statementにパラメタライズして実行する。 -ConvertAvroToJSON~ 結果をJSONに変換する。 -UpdateAttribute~ 各フローファイルのファイル名称属性を更新(ファイル名) -PutFile~ 結果をファイルに出力。 ***Connectionで接続する。 [#t52d0467] [[上記のProcessor>#y5e88841]]をConnectionで順に繋ぐ。 ※ ConnectionのNameには適当な名称を付与。~ 全てのRelationshipsを選択し、ProcessorのTerminateは設定しなかった。~ また、Selected Prioritizersには、FirstInFirstOutPrioritizer, OldestFlowFileFirstPrioritizerを設定。~ ***Processorを設定する。 [#e64bf9d7] -GenerateFlowFile --Custom Text : [ { "ShipperID" : 1, "CompanyName" : "", "Phone" : "" }, { "ShipperID" : 2, "CompanyName" : "", "Phone" : "" }, { "ShipperID" : 3, "CompanyName" : "", "Phone" : "" } ] -SplitJson --JsonPath Expression : $ --Terminate Relationships : split以外を全てチェック -EvaluateJsonPath --Destination : flowfile-attribute --sql.args.1.value : $.ShipperID ※ [+]ボタンでプロパティ追加する。~ ※ sql.args.N.typeのNでPrepared Statementのインデックスを指定 --Terminate Relationships : matched以外を全てチェック -UpdateAttribute --sql.args.1.type : 4 ※ [+]ボタンでプロパティ追加する。~ ※ sql.args.N.typeのNでPrepared Statementのインデックスを指定 -ExecuteSQL --setSQL select query : SELECT * FROM Shippers WHERE ShipperID = ? --Database Connection Pooling Service : [[前述の要領>#k2b4a7e0]]で、Control Serviceを追加する。~ [Add Controller Service]ダイアログでDDLからDBCPConnectionPoolを選択し、~ 以下を設定後、Control ServiceをEnableにする。 ---Database Connection URL :~ jdbc:sqlserver://<ipaddress>or<hostname>:<portno>;database=<databasename>~ e.g. : jdbc:sqlserver://localhost:1433;database=northwind ---Database Driver Class Name : com.microsoft.sqlserver.jdbc.SQLServerDriver ---Database Driver Location(s) : C:\Program Files (x86)\Java\jdbc\sqljdbc_6.0\jpn\jre8\sqljdbc42.jar ---Database User : **** ---Password : **** --Terminate Relationships : failureをチェック -ConvertAvroToJSON~ --既定値 --Terminate Relationships : failureをチェック -UpdateAttribute --filename : ${UUID()} ※ [+]ボタンでプロパティ追加する。 -PutFile --Directory : ./data-out --Terminate Relationships : failure, success をチェック ココまでの設定が適切であれば、Processorのwarningマークはすべて取れているハズ。 #ref(6-2.png,left,nowrap,手順6-2,60%) ***データフローを実行する。 [#l6a6e14f] 以下のような出力を得られる。 {"ShipperID": 1, "CompanyName": "Speedy Express", "Phone": "(503) 555-9831"} {"ShipperID": 2, "CompanyName": "United Package", "Phone": "(503) 555-3199"} {"ShipperID": 3, "CompanyName": "Federal Shipping", "Phone": "(503) 555-9931"} *参考 [#rdf45f1f] **[[参考資料>Apache NiFi#ce179366]] [#e99bd3ab] **[[Apache NiFi - Processor]] [#c62e636c]