Spark Structured Streaming
をテンプレートにして作成
[
トップ
] [
新規
|
一覧
|
単語検索
|
最終更新
|
ヘルプ
]
開始行:
「[[.NET 開発基盤部会 Wiki>http://dotnetdevelopmentinfras...
-[[戻る>Spark Streaming]]
*目次 [#vac9e68a]
#contents
*概要 [#haceebed]
構造化ストリーミングとも呼ばれる、集めない[[ビッグデータ]...
-[[Spark SQL]]エンジン(DataframeとDatasetのAPI)に基づい...
-データストリームを連続して追加される無限の入力テーブルと...
-これによりストリーミングを~
バッチと同じ方法([[Spark SQLとDataFrame API>Spark SQL]]...
記述できる(ただし、少数のサポートされない操作がある)。
*詳細 [#e7eca325]
**特徴 [#ne7e7ab0]
[[Spark Streaming]]と比較した優位性。
***よりリアルなストリーミング [#e9d2458b]
-トリガーで受信したデータは、継続的に流れているデータ・ス...
-データ・ストリームを深さが無限のテーブルとして表すため、
--不完全な集計結果を長時間維持でき、
--遅延したデータを再処理することが可能となり、
--その都度結果を更新できる。
***RDD(DStream) vs. DataFrames/DataSet [#z6c2224d]
-API
--[[Spark Streaming]]は内部でRDDを使用しているDStream API...
--Spark Structured StreamingはDataFrameやDataset APIを使...
-DataFramesの方が処理が最適化されており、~
様々な関数が用意されているため、集約などの処理の選択肢が...
***イベント・タイムによる遅延データへの対応 [#gfc38642]
受信したデータにイベントのタイムスタンプが含まれている場...
イベントタイムを基準にしてデータを処理する機能がある。
***その他 [#mbf2f400]
-端から端まで確実に一回のセマンティクスを保証。
-Spark 2.4以降、入出力が柔軟になった。
**処理 [#pf1d08a3]
***ステートレス・ステートフル [#m4e096a0]
-ステートレスな処理
--属性の選択(プロジェクション)やフィルタなど
--他のレコードと 独立に処理可能なもの
--マイクロバッチごとに、新規に到着した~
レコードで構成された DataFrameを処理する
-ステートフルな処理
--集約処理や、イベント発生回数のカウントなど
--新規に到着したレコードに加えて、~
直前までのマイクロバッチにより更新された~
ステートを加味してDataFrameを処理する。
--6つのフェーズ
---① 部分集約処理
---② 部分マージ処理 (1)
---③ ステート・リストア
---④ 部分マージ処理 (2)
---⑤ ステート・セーブ
---⑥ 最終処理
***イベントタイム・ウィンドウ集約処理 [#o8869c41]
-ステートフル集約処理の一種
--[[Spark Streamingでは、到着時刻に基づくウィンドウ集約処...
--Spark Structured Streamingでは、到着時刻ではなく、イベ...
-イベントタイムに基づき、
--スライディングし、
--ウィンドウ幅で集約する。
-イベントタイム
--イベントの生起時刻など、
--データ自身の中に埋め込まれている
--レコードに意味付けされた時間。
--(自動付与することも可能)
***ウォーターマーク [#k73d94df]
[[イベントタイム・ウィンドウ集約処理>#o8869c41]]で用いら...
-遅れてきたデータをハンドリングする仕組み。
-ユーザが遅れたデータの閾値を指定することができる。
***入力と出力 [#cf63f37d]
-入力ソース
--耐障害性有り
---ファイルソース(CSV, JSON, ORC, Parquet)
val userSchema = new StructType().add("name", "string")....
val csvDF = spark
.readStream
.option("sep", ";")
.schema(userSchema) // Specify schema of the csv files
.csv("/path/to/directory") // Equivalent to
format("csv").load("/path/to/directory")
---Kafka ソース
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:...
.option("subscribe", "topic1")
.load()
--耐障害性無し(テスト用)
---ソケット ソース
// Read text from socket
val socketDF = spark
.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load()
---ソースのレート
-出力
--出力モード~
出力はクエリの特定の型に適用可能なモードで定義できる
---完全モード~
・更新された結果テーブル全体が外部ストレージに書き込まれ...
・テーブル全体の書き込みをどう扱うかはストレージのコネク...
---追加モード~
・追加された新しい行だけが外部ストレージに書き込まれる。~
・結果テーブルの既存行が変更されないクエリ上でのみ適用可...
---更新モード~
Spark 2.1.1から利用可能~
・更新された行だけが外部ストレージに書き込まれる。~
・クエリが集約を含まない場合、追加モードと同じ。
--出力シンク
---ファイル sink~
出力をディレクトリに格納
writeStream
.format("parquet") // can be "orc", "json", "csv", e...
.option("path", "path/to/destination/dir")
.start()
---Kafka sink
Kafka内の1つ以上のトピックに出力~
writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host...
.option("topic", "updates")
.start()
---Foreach sink~
出力内のレコードの任意の計算を実行
writeStream
.foreach(...)
.start()
---コンソール sink (デバッグのため)~
トリガーの度に出力。Append と Complete の出力モードをサポ...
writeStream
.format("console")
.start()
---メモリ sink (デバッグのため)~
≒ コンソール sink
writeStream
.format("memory")
.queryName("tableName")
.start()
--トリガー~
バッチの実行≒出力タイミング?
---無指定 (デフォルト)
---固定間隔のマイクロ バッチ
---1回だけのマイクロバッチ
---固定のチェックポイント間隔の連続
**その他 [#d42363d4]
***join オペレーション [#o7ea3016]
主に、エンリッチメントを行う目的か。
-Stream-static Joins~
Spark 2.0でサポート。
-Stream-stream Joins~
Spark 2.3でサポート。
***サポートされないオペレーション [#hca23a07]
(現段階で)
-クエリ的な
--多段ストリーミング集約
--ストリーミング データセット上の
---LIMIT句
---TOP句的な(df.show(n))
---distinct操作
---Sort操作(完全モード以外)
-アクション的な
--count()~
df.groupBy().count()は可
--foreach()~
df.writeStream.foreach(...)は可
--show()~
writeStreamのconsole sinkは可
**サンプル [#rfbc1b5f]
***[[Pythonの例>PySpark#x623bccd]] [#t85dc021]
***[[.NETの例>Spark SQL#kfcf74cc]] [#h4838163]
*参考 [#q9b1cd94]
-Structured Streaming - The Internal -~
https://www.slideshare.net/hadoopxnttdata/structured-stre...
-Spark Structured Streaming で遅延データを処理する方法 - ...
https://developers.microad.co.jp/entry/2019/07/12/063000
**[[Azure Stream Analytics>https://techinfoofmicrosofttec...
**Programming Guide [#u81234f4]
-Spark 2.2.0 Documentation~
https://spark.apache.org/docs/2.2.0/structured-streaming-...
-Spark 3.0.0 Documentation~
https://spark.apache.org/docs/3.0.0/structured-streaming-...
-Spark latest Documentation~
https://spark.apache.org/docs/latest/structured-streaming...
-日本語訳 3.0.0~
http://mogile.web.fc2.com/spark/structured-streaming-prog...
**Microsoft Docs [#k2c5860b]
Azure HDInsight資産が多い感。
-Azure HDInsight
--Azure HDInsight での Spark Structured Streaming~
https://docs.microsoft.com/ja-jp/azure/hdinsight/spark/ap...
--チュートリアル:Apache Spark ストリーミング & Apache Kaf...
https://docs.microsoft.com/ja-jp/azure/hdinsight/hdinsigh...
--Cosmos DB を使用した Apache Spark と Apache Kafka~
https://docs.microsoft.com/ja-jp/azure/hdinsight/apache-k...
--Hive Warehouse Connector
---Azure HDInsight の Hive Warehouse Connector でサポート...
https://docs.microsoft.com/ja-jp/azure/hdinsight/interact...
---Azure HDInsight で Hive Warehouse Connector を使用して...
https://docs.microsoft.com/ja-jp/azure/hdinsight/interact...
**Qiita [#g5486af2]
-DatabricksでSpark Structured Streamingをやってみる~
https://qiita.com/yabooun/items/f7f649a457fdbd2952b2
-KafkaからのデータをStructured Streamingで処理してElastic...
https://qiita.com/whata/items/4c9c15164e0bd87bc5cb
終了行:
「[[.NET 開発基盤部会 Wiki>http://dotnetdevelopmentinfras...
-[[戻る>Spark Streaming]]
*目次 [#vac9e68a]
#contents
*概要 [#haceebed]
構造化ストリーミングとも呼ばれる、集めない[[ビッグデータ]...
-[[Spark SQL]]エンジン(DataframeとDatasetのAPI)に基づい...
-データストリームを連続して追加される無限の入力テーブルと...
-これによりストリーミングを~
バッチと同じ方法([[Spark SQLとDataFrame API>Spark SQL]]...
記述できる(ただし、少数のサポートされない操作がある)。
*詳細 [#e7eca325]
**特徴 [#ne7e7ab0]
[[Spark Streaming]]と比較した優位性。
***よりリアルなストリーミング [#e9d2458b]
-トリガーで受信したデータは、継続的に流れているデータ・ス...
-データ・ストリームを深さが無限のテーブルとして表すため、
--不完全な集計結果を長時間維持でき、
--遅延したデータを再処理することが可能となり、
--その都度結果を更新できる。
***RDD(DStream) vs. DataFrames/DataSet [#z6c2224d]
-API
--[[Spark Streaming]]は内部でRDDを使用しているDStream API...
--Spark Structured StreamingはDataFrameやDataset APIを使...
-DataFramesの方が処理が最適化されており、~
様々な関数が用意されているため、集約などの処理の選択肢が...
***イベント・タイムによる遅延データへの対応 [#gfc38642]
受信したデータにイベントのタイムスタンプが含まれている場...
イベントタイムを基準にしてデータを処理する機能がある。
***その他 [#mbf2f400]
-端から端まで確実に一回のセマンティクスを保証。
-Spark 2.4以降、入出力が柔軟になった。
**処理 [#pf1d08a3]
***ステートレス・ステートフル [#m4e096a0]
-ステートレスな処理
--属性の選択(プロジェクション)やフィルタなど
--他のレコードと 独立に処理可能なもの
--マイクロバッチごとに、新規に到着した~
レコードで構成された DataFrameを処理する
-ステートフルな処理
--集約処理や、イベント発生回数のカウントなど
--新規に到着したレコードに加えて、~
直前までのマイクロバッチにより更新された~
ステートを加味してDataFrameを処理する。
--6つのフェーズ
---① 部分集約処理
---② 部分マージ処理 (1)
---③ ステート・リストア
---④ 部分マージ処理 (2)
---⑤ ステート・セーブ
---⑥ 最終処理
***イベントタイム・ウィンドウ集約処理 [#o8869c41]
-ステートフル集約処理の一種
--[[Spark Streamingでは、到着時刻に基づくウィンドウ集約処...
--Spark Structured Streamingでは、到着時刻ではなく、イベ...
-イベントタイムに基づき、
--スライディングし、
--ウィンドウ幅で集約する。
-イベントタイム
--イベントの生起時刻など、
--データ自身の中に埋め込まれている
--レコードに意味付けされた時間。
--(自動付与することも可能)
***ウォーターマーク [#k73d94df]
[[イベントタイム・ウィンドウ集約処理>#o8869c41]]で用いら...
-遅れてきたデータをハンドリングする仕組み。
-ユーザが遅れたデータの閾値を指定することができる。
***入力と出力 [#cf63f37d]
-入力ソース
--耐障害性有り
---ファイルソース(CSV, JSON, ORC, Parquet)
val userSchema = new StructType().add("name", "string")....
val csvDF = spark
.readStream
.option("sep", ";")
.schema(userSchema) // Specify schema of the csv files
.csv("/path/to/directory") // Equivalent to
format("csv").load("/path/to/directory")
---Kafka ソース
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:...
.option("subscribe", "topic1")
.load()
--耐障害性無し(テスト用)
---ソケット ソース
// Read text from socket
val socketDF = spark
.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load()
---ソースのレート
-出力
--出力モード~
出力はクエリの特定の型に適用可能なモードで定義できる
---完全モード~
・更新された結果テーブル全体が外部ストレージに書き込まれ...
・テーブル全体の書き込みをどう扱うかはストレージのコネク...
---追加モード~
・追加された新しい行だけが外部ストレージに書き込まれる。~
・結果テーブルの既存行が変更されないクエリ上でのみ適用可...
---更新モード~
Spark 2.1.1から利用可能~
・更新された行だけが外部ストレージに書き込まれる。~
・クエリが集約を含まない場合、追加モードと同じ。
--出力シンク
---ファイル sink~
出力をディレクトリに格納
writeStream
.format("parquet") // can be "orc", "json", "csv", e...
.option("path", "path/to/destination/dir")
.start()
---Kafka sink
Kafka内の1つ以上のトピックに出力~
writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host...
.option("topic", "updates")
.start()
---Foreach sink~
出力内のレコードの任意の計算を実行
writeStream
.foreach(...)
.start()
---コンソール sink (デバッグのため)~
トリガーの度に出力。Append と Complete の出力モードをサポ...
writeStream
.format("console")
.start()
---メモリ sink (デバッグのため)~
≒ コンソール sink
writeStream
.format("memory")
.queryName("tableName")
.start()
--トリガー~
バッチの実行≒出力タイミング?
---無指定 (デフォルト)
---固定間隔のマイクロ バッチ
---1回だけのマイクロバッチ
---固定のチェックポイント間隔の連続
**その他 [#d42363d4]
***join オペレーション [#o7ea3016]
主に、エンリッチメントを行う目的か。
-Stream-static Joins~
Spark 2.0でサポート。
-Stream-stream Joins~
Spark 2.3でサポート。
***サポートされないオペレーション [#hca23a07]
(現段階で)
-クエリ的な
--多段ストリーミング集約
--ストリーミング データセット上の
---LIMIT句
---TOP句的な(df.show(n))
---distinct操作
---Sort操作(完全モード以外)
-アクション的な
--count()~
df.groupBy().count()は可
--foreach()~
df.writeStream.foreach(...)は可
--show()~
writeStreamのconsole sinkは可
**サンプル [#rfbc1b5f]
***[[Pythonの例>PySpark#x623bccd]] [#t85dc021]
***[[.NETの例>Spark SQL#kfcf74cc]] [#h4838163]
*参考 [#q9b1cd94]
-Structured Streaming - The Internal -~
https://www.slideshare.net/hadoopxnttdata/structured-stre...
-Spark Structured Streaming で遅延データを処理する方法 - ...
https://developers.microad.co.jp/entry/2019/07/12/063000
**[[Azure Stream Analytics>https://techinfoofmicrosofttec...
**Programming Guide [#u81234f4]
-Spark 2.2.0 Documentation~
https://spark.apache.org/docs/2.2.0/structured-streaming-...
-Spark 3.0.0 Documentation~
https://spark.apache.org/docs/3.0.0/structured-streaming-...
-Spark latest Documentation~
https://spark.apache.org/docs/latest/structured-streaming...
-日本語訳 3.0.0~
http://mogile.web.fc2.com/spark/structured-streaming-prog...
**Microsoft Docs [#k2c5860b]
Azure HDInsight資産が多い感。
-Azure HDInsight
--Azure HDInsight での Spark Structured Streaming~
https://docs.microsoft.com/ja-jp/azure/hdinsight/spark/ap...
--チュートリアル:Apache Spark ストリーミング & Apache Kaf...
https://docs.microsoft.com/ja-jp/azure/hdinsight/hdinsigh...
--Cosmos DB を使用した Apache Spark と Apache Kafka~
https://docs.microsoft.com/ja-jp/azure/hdinsight/apache-k...
--Hive Warehouse Connector
---Azure HDInsight の Hive Warehouse Connector でサポート...
https://docs.microsoft.com/ja-jp/azure/hdinsight/interact...
---Azure HDInsight で Hive Warehouse Connector を使用して...
https://docs.microsoft.com/ja-jp/azure/hdinsight/interact...
**Qiita [#g5486af2]
-DatabricksでSpark Structured Streamingをやってみる~
https://qiita.com/yabooun/items/f7f649a457fdbd2952b2
-KafkaからのデータをStructured Streamingで処理してElastic...
https://qiita.com/whata/items/4c9c15164e0bd87bc5cb
ページ名: