「[[.NET 開発基盤部会 Wiki>http://dotnetdevelopmentinfrastructure.osscons.jp]]」は、「[[Open棟梁Project>https://github.com/OpenTouryoProject/]]」,「[[OSSコンソーシアム .NET開発基盤部会>https://www.osscons.jp/dotNetDevelopmentInfrastructure/]]」によって運営されています。

-[[戻る>Spark Streaming]]

*目次 [#vac9e68a]
#contents

*概要 [#haceebed]
構造化ストリーミングとも呼ばれる。
構造化ストリーミングとも呼ばれる、集めない[[ビッグデータ]]処理技術

-[[Spark SQL]]エンジン(DataframeとDatasetのAPI)に基づいて構築

-データストリームを連続して追加される無限の入力テーブルとして扱う。

-これによりストリーミングを~
バッチと同じ方法([[Spark SQLとDataFrame API>Spark SQL]])で~
記述できる(ただし、少数のサポートされない操作がある)。

*詳細 [#e7eca325]

**特徴 [#ne7e7ab0]
[[Spark Streaming]]と比較した優位性。

***よりリアルなストリーミング [#e9d2458b]
-トリガーで受信したデータは、継続的に流れているデータ・ストリームに追加される。

-データ・ストリームを深さが無限のテーブルとして表すため、
--不完全な集計結果を長時間維持でき、
--遅延したデータを再処理することが可能となり、
--その都度結果を更新できる。

***RDD(DStream) vs. DataFrames/DataSet [#z6c2224d]
-API
--[[Spark Streaming]]は内部でRDDを使用しているDStream APIで動作、
--Spark Structured StreamingはDataFrameやDataset APIを使用している。

-DataFramesの方が処理が最適化されており、~
様々な関数が用意されているため、集約などの処理の選択肢が多い。

***イベント・タイムによる遅延データへの対応 [#gfc38642]
受信したデータにイベントのタイムスタンプが含まれている場合、~
イベントタイムを基準にしてデータを処理する機能がある。

***その他 [#mbf2f400]
-端から端まで確実に一回のセマンティクスを保証。
-Spark 2.4以降、入出力が柔軟になった。

**処理 [#pf1d08a3]

***ステートレス・ステートフル [#m4e096a0]
-ステートレスな処理
--属性の選択(プロジェクション)やフィルタなど
--他のレコードと 独立に処理可能なもの
--マイクロバッチごとに、新規に到着した~
レコードで構成された DataFrameを処理する

-ステートフルな処理

--集約処理や、イベント発生回数のカウントなど

--新規に到着したレコードに加えて、~
直前までのマイクロバッチにより更新された~
ステートを加味してDataFrameを処理する。

--6つのフェーズ
---① 部分集約処理
---② 部分マージ処理 (1)
---③ ステート・リストア
---④ 部分マージ処理 (2)
---⑤ ステート・セーブ
---⑥ 最終処理

***イベントタイム・ウィンドウ集約処理 [#o8869c41]
-ステートフル集約処理の一種
--[[Spark Streamingでは、到着時刻に基づくウィンドウ集約処理>Spark Streaming#s40a4c2d]]のみ実装されていた。
--Spark Structured Streamingでは、到着時刻ではなく、イベントタイムに基づくウィンドウ集約が可能になった。

-イベントタイムに基づき、
--スライディングし、
--ウィンドウ幅で集約する。

-イベントタイム
--イベントの生起時刻など、
--データ自身の中に埋め込まれている
--レコードに意味付けされた時間。
--(自動付与することも可能)

***ウォーターマーク [#k73d94df]
[[イベントタイム・ウィンドウ集約処理>#o8869c41]]で用いられる。
-遅れてきたデータをハンドリングする仕組み。
-ユーザが遅れたデータの閾値を指定することができる。

***入力と出力 [#cf63f37d]

-入力ソース

--耐障害性有り
---ファイルソース(CSV, JSON, ORC, Parquet)
 val userSchema = new StructType().add("name", "string").add("age", "integer")
 val csvDF = spark
   .readStream
   .option("sep", ";")
   .schema(userSchema) // Specify schema of the csv files
   .csv("/path/to/directory") // Equivalent to 
 format("csv").load("/path/to/directory")

---Kafka ソース
 val df = spark
   .readStream
   .format("kafka")
   .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
   .option("subscribe", "topic1")
   .load()

--耐障害性無し(テスト用)
---ソケット ソース
// Read text from socket
 val socketDF = spark
   .readStream
   .format("socket")
   .option("host", "localhost")
   .option("port", 9999)
   .load()

---ソースのレート

-出力

--出力モード~
出力はクエリの特定の型に適用可能なモードで定義できる

---完全モード~
・更新された結果テーブル全体が外部ストレージに書き込まれる。~
・テーブル全体の書き込みをどう扱うかはストレージのコネクタによる。

---追加モード~
・追加された新しい行だけが外部ストレージに書き込まれる。~
・結果テーブルの既存行が変更されないクエリ上でのみ適用可能。

---更新モード~
Spark 2.1.1から利用可能~
・更新された行だけが外部ストレージに書き込まれる。~
・クエリが集約を含まない場合、追加モードと同じ。

--出力シンク

---ファイル sink~
出力をディレクトリに格納
 writeStream
     .format("parquet") // can be "orc", "json", "csv", etc.
     .option("path", "path/to/destination/dir")
     .start()

---Kafka sink
Kafka内の1つ以上のトピックに出力~
 writeStream
     .format("kafka")
     .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
     .option("topic", "updates")
     .start()

---Foreach sink~
出力内のレコードの任意の計算を実行
 writeStream
     .foreach(...)
     .start()

---コンソール sink (デバッグのため)~
トリガーの度に出力。Append と Complete の出力モードをサポート。
 writeStream
     .format("console")
     .start()

---メモリ sink (デバッグのため)~
≒ コンソール sink
 writeStream
     .format("memory")
     .queryName("tableName")
     .start()

--トリガー~
バッチの実行≒出力タイミング?
---無指定 (デフォルト)
---固定間隔のマイクロ バッチ
---1回だけのマイクロバッチ
---固定のチェックポイント間隔の連続

**その他 [#d42363d4]

***join オペレーション [#o7ea3016]
主に、エンリッチメントを行う目的か。

-Stream-static Joins~
Spark 2.0でサポート。
-Stream-stream Joins~
Spark 2.3でサポート。

***サポートされないオペレーション [#hca23a07]
(現段階で)

-クエリ的な
--多段ストリーミング集約
--ストリーミング データセット上の
---LIMIT句
---TOP句的な(df.show(n))
---distinct操作
---Sort操作(完全モード以外)

-アクション的な

--count()~
df.groupBy().count()は可

--foreach()~
df.writeStream.foreach(...)は可

--show()~
writeStreamのconsole sinkは可

**サンプル [#rfbc1b5f]
***[[Pythonの例>PySpark#x623bccd]] [#t85dc021]
***[[.NETの例>Spark SQL#kfcf74cc]] [#h4838163]

*参考 [#q9b1cd94]
-Structured Streaming - The Internal -~
https://www.slideshare.net/hadoopxnttdata/structured-streaming-the-internal

-Spark Structured Streaming で遅延データを処理する方法 - MicroAd Developers Blog~
https://developers.microad.co.jp/entry/2019/07/12/063000

**[[Azure Stream Analytics>https://techinfoofmicrosofttech.osscons.jp/index.php?Azure%20Stream%20Analytics]] [#ab60ff65]

**Programming Guide [#u81234f4]
-Spark 2.2.0 Documentation~
https://spark.apache.org/docs/2.2.0/structured-streaming-programming-guide.html

-Spark 3.0.0 Documentation~
https://spark.apache.org/docs/3.0.0/structured-streaming-programming-guide.html

-Spark latest Documentation~
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html

-日本語訳 3.0.0~
http://mogile.web.fc2.com/spark/structured-streaming-programming-guide.html

**Microsoft Docs [#k2c5860b]
Azure HDInsight資産が多い感。

-Azure HDInsight

--Azure HDInsight での Spark Structured Streaming~
https://docs.microsoft.com/ja-jp/azure/hdinsight/spark/apache-spark-structured-streaming-overview

--チュートリアル:Apache Spark ストリーミング & Apache Kafka~
https://docs.microsoft.com/ja-jp/azure/hdinsight/hdinsight-apache-kafka-spark-structured-streaming

--Cosmos DB を使用した Apache Spark と Apache Kafka~
https://docs.microsoft.com/ja-jp/azure/hdinsight/apache-kafka-spark-structured-streaming-cosmosdb

--Hive Warehouse Connector
---Azure HDInsight の Hive Warehouse Connector でサポートされる Apache Spark の操作~
https://docs.microsoft.com/ja-jp/azure/hdinsight/interactive-query/apache-hive-warehouse-connector-operations
---Azure HDInsight で Hive Warehouse Connector を使用して Apache Spark と Apache Hive を統合する~
https://docs.microsoft.com/ja-jp/azure/hdinsight/interactive-query/apache-hive-warehouse-connector

**Qiita [#g5486af2]
-DatabricksでSpark Structured Streamingをやってみる~
https://qiita.com/yabooun/items/f7f649a457fdbd2952b2
-KafkaからのデータをStructured Streamingで処理してElasticsearchに流す~
https://qiita.com/whata/items/4c9c15164e0bd87bc5cb

トップ   編集 差分 バックアップ 添付 複製 名前変更 リロード   新規 一覧 単語検索 最終更新   ヘルプ   最終更新のRSS