「[[.NET 開発基盤部会 Wiki>http://dotnetdevelopmentinfrastructure.osscons.jp]]」は、「[[Open棟梁Project>https://github.com/OpenTouryoProject/]]」,「[[OSSコンソーシアム .NET開発基盤部会>https://www.osscons.jp/dotNetDevelopmentInfrastructure/]]」によって運営されています。 -[[戻る>Spark Streaming]] *目次 [#vac9e68a] #contents *概要 [#haceebed] 構造化ストリーミングとも呼ばれる。 構造化ストリーミングとも呼ばれる、集めない[[ビッグデータ]]処理技術 -[[Spark SQL]]エンジン(DataframeとDatasetのAPI)に基づいて構築 -データストリームを連続して追加される無限の入力テーブルとして扱う。 -これによりストリーミングを~ バッチと同じ方法([[Spark SQLとDataFrame API>Spark SQL]])で~ 記述できる(ただし、少数のサポートされない操作がある)。 *詳細 [#e7eca325] **特徴 [#ne7e7ab0] [[Spark Streaming]]と比較した優位性。 ***よりリアルなストリーミング [#e9d2458b] -トリガーで受信したデータは、継続的に流れているデータ・ストリームに追加される。 -データ・ストリームを深さが無限のテーブルとして表すため、 --不完全な集計結果を長時間維持でき、 --遅延したデータを再処理することが可能となり、 --その都度結果を更新できる。 ***RDD(DStream) vs. DataFrames/DataSet [#z6c2224d] -API --[[Spark Streaming]]は内部でRDDを使用しているDStream APIで動作、 --Spark Structured StreamingはDataFrameやDataset APIを使用している。 -DataFramesの方が処理が最適化されており、~ 様々な関数が用意されているため、集約などの処理の選択肢が多い。 ***イベント・タイムによる遅延データへの対応 [#gfc38642] 受信したデータにイベントのタイムスタンプが含まれている場合、~ イベントタイムを基準にしてデータを処理する機能がある。 ***その他 [#mbf2f400] -端から端まで確実に一回のセマンティクスを保証。 -Spark 2.4以降、入出力が柔軟になった。 **処理 [#pf1d08a3] ***ステートレス・ステートフル [#m4e096a0] -ステートレスな処理 --属性の選択(プロジェクション)やフィルタなど --他のレコードと 独立に処理可能なもの --マイクロバッチごとに、新規に到着した~ レコードで構成された DataFrameを処理する -ステートフルな処理 --集約処理や、イベント発生回数のカウントなど --新規に到着したレコードに加えて、~ 直前までのマイクロバッチにより更新された~ ステートを加味してDataFrameを処理する。 --6つのフェーズ ---① 部分集約処理 ---② 部分マージ処理 (1) ---③ ステート・リストア ---④ 部分マージ処理 (2) ---⑤ ステート・セーブ ---⑥ 最終処理 ***イベントタイム・ウィンドウ集約処理 [#o8869c41] -ステートフル集約処理の一種 --[[Spark Streamingでは、到着時刻に基づくウィンドウ集約処理>Spark Streaming#s40a4c2d]]のみ実装されていた。 --Spark Structured Streamingでは、到着時刻ではなく、イベントタイムに基づくウィンドウ集約が可能になった。 -イベントタイムに基づき、 --スライディングし、 --ウィンドウ幅で集約する。 -イベントタイム --イベントの生起時刻など、 --データ自身の中に埋め込まれている --レコードに意味付けされた時間。 --(自動付与することも可能) ***ウォーターマーク [#k73d94df] [[イベントタイム・ウィンドウ集約処理>#o8869c41]]で用いられる。 -遅れてきたデータをハンドリングする仕組み。 -ユーザが遅れたデータの閾値を指定することができる。 ***入力と出力 [#cf63f37d] -入力ソース --耐障害性有り ---ファイルソース(CSV, JSON, ORC, Parquet) val userSchema = new StructType().add("name", "string").add("age", "integer") val csvDF = spark .readStream .option("sep", ";") .schema(userSchema) // Specify schema of the csv files .csv("/path/to/directory") // Equivalent to format("csv").load("/path/to/directory") ---Kafka ソース val df = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "host1:port1,host2:port2") .option("subscribe", "topic1") .load() --耐障害性無し(テスト用) ---ソケット ソース // Read text from socket val socketDF = spark .readStream .format("socket") .option("host", "localhost") .option("port", 9999) .load() ---ソースのレート -出力 --出力モード~ 出力はクエリの特定の型に適用可能なモードで定義できる ---完全モード~ ・更新された結果テーブル全体が外部ストレージに書き込まれる。~ ・テーブル全体の書き込みをどう扱うかはストレージのコネクタによる。 ---追加モード~ ・追加された新しい行だけが外部ストレージに書き込まれる。~ ・結果テーブルの既存行が変更されないクエリ上でのみ適用可能。 ---更新モード~ Spark 2.1.1から利用可能~ ・更新された行だけが外部ストレージに書き込まれる。~ ・クエリが集約を含まない場合、追加モードと同じ。 --出力シンク ---ファイル sink~ 出力をディレクトリに格納 writeStream .format("parquet") // can be "orc", "json", "csv", etc. .option("path", "path/to/destination/dir") .start() ---Kafka sink Kafka内の1つ以上のトピックに出力~ writeStream .format("kafka") .option("kafka.bootstrap.servers", "host1:port1,host2:port2") .option("topic", "updates") .start() ---Foreach sink~ 出力内のレコードの任意の計算を実行 writeStream .foreach(...) .start() ---コンソール sink (デバッグのため)~ トリガーの度に出力。Append と Complete の出力モードをサポート。 writeStream .format("console") .start() ---メモリ sink (デバッグのため)~ ≒ コンソール sink writeStream .format("memory") .queryName("tableName") .start() --トリガー~ バッチの実行≒出力タイミング? ---無指定 (デフォルト) ---固定間隔のマイクロ バッチ ---1回だけのマイクロバッチ ---固定のチェックポイント間隔の連続 **その他 [#d42363d4] ***join オペレーション [#o7ea3016] 主に、エンリッチメントを行う目的か。 -Stream-static Joins~ Spark 2.0でサポート。 -Stream-stream Joins~ Spark 2.3でサポート。 ***サポートされないオペレーション [#hca23a07] (現段階で) -クエリ的な --多段ストリーミング集約 --ストリーミング データセット上の ---LIMIT句 ---TOP句的な(df.show(n)) ---distinct操作 ---Sort操作(完全モード以外) -アクション的な --count()~ df.groupBy().count()は可 --foreach()~ df.writeStream.foreach(...)は可 --show()~ writeStreamのconsole sinkは可 **サンプル [#rfbc1b5f] ***[[Pythonの例>PySpark#x623bccd]] [#t85dc021] ***[[.NETの例>Spark SQL#kfcf74cc]] [#h4838163] *参考 [#q9b1cd94] -Structured Streaming - The Internal -~ https://www.slideshare.net/hadoopxnttdata/structured-streaming-the-internal -Spark Structured Streaming で遅延データを処理する方法 - MicroAd Developers Blog~ https://developers.microad.co.jp/entry/2019/07/12/063000 **[[Azure Stream Analytics>https://techinfoofmicrosofttech.osscons.jp/index.php?Azure%20Stream%20Analytics]] [#ab60ff65] **Programming Guide [#u81234f4] -Spark 2.2.0 Documentation~ https://spark.apache.org/docs/2.2.0/structured-streaming-programming-guide.html -Spark 3.0.0 Documentation~ https://spark.apache.org/docs/3.0.0/structured-streaming-programming-guide.html -Spark latest Documentation~ https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html -日本語訳 3.0.0~ http://mogile.web.fc2.com/spark/structured-streaming-programming-guide.html **Microsoft Docs [#k2c5860b] Azure HDInsight資産が多い感。 -Azure HDInsight --Azure HDInsight での Spark Structured Streaming~ https://docs.microsoft.com/ja-jp/azure/hdinsight/spark/apache-spark-structured-streaming-overview --チュートリアル:Apache Spark ストリーミング & Apache Kafka~ https://docs.microsoft.com/ja-jp/azure/hdinsight/hdinsight-apache-kafka-spark-structured-streaming --Cosmos DB を使用した Apache Spark と Apache Kafka~ https://docs.microsoft.com/ja-jp/azure/hdinsight/apache-kafka-spark-structured-streaming-cosmosdb --Hive Warehouse Connector ---Azure HDInsight の Hive Warehouse Connector でサポートされる Apache Spark の操作~ https://docs.microsoft.com/ja-jp/azure/hdinsight/interactive-query/apache-hive-warehouse-connector-operations ---Azure HDInsight で Hive Warehouse Connector を使用して Apache Spark と Apache Hive を統合する~ https://docs.microsoft.com/ja-jp/azure/hdinsight/interactive-query/apache-hive-warehouse-connector **Qiita [#g5486af2] -DatabricksでSpark Structured Streamingをやってみる~ https://qiita.com/yabooun/items/f7f649a457fdbd2952b2 -KafkaからのデータをStructured Streamingで処理してElasticsearchに流す~ https://qiita.com/whata/items/4c9c15164e0bd87bc5cb