.NET 開発基盤部会 Wiki」は、「Open棟梁Project」,「OSSコンソーシアム .NET開発基盤部会」によって運営されています。

目次

概要

構造化ストリーミングとも呼ばれる。

  • Spark SQLエンジン(DataframeとDatasetのAPI)に基づいて構築
  • データストリームを連続して追加される無限の入力テーブルとして扱う。
  • これによりストリーミングを
    バッチと同じ方法(Spark SQLとDataFrame API)で
    記述できる(ただし、少数のサポートされない操作がある)。

詳細

特徴

Spark Streamingと比較した優位性。

よりリアルなストリーミング

  • トリガーで受信したデータは、継続的に流れているデータ・ストリームに追加される。
  • データ・ストリームを深さが無限のテーブルとして表すため、
    • 不完全な集計結果を長時間維持でき、
    • 遅延したデータを再処理することが可能となり、
    • その都度結果を更新できる。

RDD(DStream) vs. DataFrames?/DataSet?

  • API
    • Spark Streamingは内部でRDDを使用しているDStream APIで動作、
    • Spark Structured StreamingはDataFrame?やDataset APIを使用している。
  • DataFrames?の方が処理が最適化されており、
    様々な関数が用意されているため、集約などの処理の選択肢が多い。

イベント・タイムによる遅延データへの対応

受信したデータにイベントのタイムスタンプが含まれている場合、
イベントタイムを基準にしてデータを処理する機能がある。

その他

  • 端から端まで確実に一回のセマンティクスを保証。
  • Spark 2.4以降、入出力が柔軟になった。

処理

ステートレス・ステートフル

  • ステートレスな処理
    • 属性の選択(プロジェクション)やフィルタなど
    • 他のレコードと 独立に処理可能なもの
    • マイクロバッチごとに、新規に到着した
      レコードで構成された DataFrame?を処理する
  • ステートフルな処理
  • 集約処理や、イベント発生回数のカウントなど
  • 新規に到着したレコードに加えて、
    直前までのマイクロバッチにより更新された
    ステートを加味してDataFrame?を処理する。
  • 6つのフェーズ
    • ① 部分集約処理
    • ② 部分マージ処理 (1)
    • ③ ステート・リストア
    • ④ 部分マージ処理 (2)
    • ⑤ ステート・セーブ
    • ⑥ 最終処理

イベントタイム・ウィンドウ集約処理

  • イベントタイムに基づき、
    • スライディングし、
    • ウィンドウ幅で集約する。
  • イベントタイム
    • イベントの生起時刻など、
    • データ自身の中に埋め込まれている
    • レコードに意味付けされた時間。
    • (自動付与することも可能)

ウォーターマーク

イベントタイム・ウィンドウ集約処理で用いられる。

  • 遅れてきたデータをハンドリングする仕組み。
  • ユーザが遅れたデータの閾値を指定することができる。

入力と出力

  • 入力ソース
  • 耐障害性有り
    • ファイルソース(CSV, JSON, ORC, Parquet)
      val userSchema = new StructType().add("name", "string").add("age", "integer")
      val csvDF = spark
        .readStream
        .option("sep", ";")
        .schema(userSchema) // Specify schema of the csv files
        .csv("/path/to/directory") // Equivalent to 
      format("csv").load("/path/to/directory")
  • Kafka ソース
    val df = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
      .option("subscribe", "topic1")
      .load()
  • 耐障害性無し(テスト用)
    • ソケット ソース
      val socketDF = spark
        .readStream
        .format("socket")
        .option("host", "localhost")
        .option("port", 9999)
        .load()
  • ソースのレート
  • 出力
  • 出力モード
    出力はクエリの特定の型に適用可能なモードで定義できる
  • 完全モード
    ・更新された結果テーブル全体が外部ストレージに書き込まれる。
    ・テーブル全体の書き込みをどう扱うかはストレージのコネクタによる。
  • 追加モード
    ・追加された新しい行だけが外部ストレージに書き込まれる。
    ・結果テーブルの既存行が変更されないクエリ上でのみ適用可能。
  • 更新モード
    Spark 2.1.1から利用可能
    ・更新された行だけが外部ストレージに書き込まれる。
    ・クエリが集約を含まない場合、追加モードと同じ。
  • 出力シンク
  • ファイル sink
    出力をディレクトリに格納
    writeStream
        .format("parquet") // can be "orc", "json", "csv", etc.
        .option("path", "path/to/destination/dir")
        .start()
  • Kafka sink Kafka内の1つ以上のトピックに出力
    writeStream
        .format("kafka")
        .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
        .option("topic", "updates")
        .start()
  • Foreach sink
    出力内のレコードの任意の計算を実行
    writeStream
        .foreach(...)
        .start()
  • コンソール sink (デバッグのため)
    トリガーの度に出力。Append と Complete の出力モードをサポート。
    writeStream
        .format("console")
        .start()
  • メモリ sink (デバッグのため)
    ≒ コンソール sink
    writeStream
        .format("memory")
        .queryName("tableName")
        .start()
  • トリガー
    バッチの実行≒出力タイミング?
    • 無指定 (デフォルト)
    • 固定間隔のマイクロ バッチ
    • 1回だけのマイクロバッチ
    • 固定のチェックポイント間隔の連続

その他

join オペレーション

主に、エンリッチメントを行う目的か。

  • Stream-static Joins
    Spark 2.0でサポート。
  • Stream-stream Joins
    Spark 2.3でサポート。

サポートされないオペレーション

(現段階で)

  • クエリ的な
    • 多段ストリーミング集約
    • ストリーミング データセット上の
      • LIMIT句
      • TOP句的な(df.show(n))
      • distinct操作
      • Sort操作(完全モード以外)
  • アクション的な
  • count()
    df.groupBy().count()は可
  • foreach()
    df.writeStream.foreach(...)は可
  • show()
    writeStreamのconsole sinkは可

サンプル

Pythonの例

.NETの例

参考

Azure Stream Analytics

Programming Guide

Microsoft Docs

Azure HDInsight資産が多い感。

  • Azure HDInsight

Qiita


トップ   編集 凍結 差分 バックアップ 添付 複製 名前変更 リロード   新規 一覧 単語検索 最終更新   ヘルプ   最終更新のRSS
Last-modified: 2021-10-15 (金) 21:55:24 (11h)