.NET 開発基盤部会 Wiki」は、「Open棟梁Project」,「OSSコンソーシアム .NET開発基盤部会」によって運営されています。

目次

概要

  • SQLで記述できる構造化データの処理の分散処理フレームワーク
  • Spark SQLとDataFrame? APIは、DataFrame?で同じことができる。

詳細

機能

提供

ドメイン固有言語 (DSL) > SQL言語

  • RDDの処理をクエリ・エンジン経由で実行プラン生成して処理。
    • 基本的にはDataFrameというオンメモリのテーブルに対し、LINQ的に処理を行うことが出来る。
    • 強く型付けされたデータセットも完全にサポート(DataFrameは型無しAPI)
  • クエリ オプティマイザ
  • Catalyst Optimizer
  • サポート
    • ルールベース
    • コストベース
  • コストベースの 4 フェーズ
    • 分析
    • ロジックの最適化
    • 物理的プランニング
    • コード生成
  • 遅延評価
    • 実行計画は有向グラフ(DAG)として表現される。
    • エグゼキュータでDAGが生成され、
    • DAGを用いた遅延評価で、一回、処理が解析される。
    • RDDを消費するAPIが呼ばれるとジョブが作られる。
    • ドライバはエグゼキュータにジョブを転送する。

DataFrame?と言う構造化データ

  • テーブルのようなデータ構造をもった分散処理用データセット
  • 同名のRのデータ構造をモデルとした
    表形式データ(型と名前を持った列のテーブル)
  • 作業性のサポート向上を目的としている。
  • 今後は、RDDではなくDataFrame?が主流になって行く。

構文

LINQ

「クエリ構文」的記法

  • SparkSession?のsqlメソッドで入力元にクエリを発行し、結果をDataFrame?で返す。
  • 若しくは、DataFrame?にテーブル名を指定してSQL中で使用することでDataFrame?を操作する。

「メソッド構文」的記法

コチラは、DataFrame?操作のみの場合。

  • 正確にはDataFrame? APIと呼ぶらしい。
  • DataFrame?をFluent API的なメソッド構文で操作する。

操作

基本

列の分割

...

集計

...

.NET for Apache Sparkのチュートリアル

Get started in 10 minutes

  • データ
    Hello World
    This .NET app uses .NET for Apache Spark
    This .NET app counts words with Apache Spark
  • コード
    // Create Spark session
    SparkSession spark =
      SparkSession
        .Builder()
        .AppName("word_count_sample")
        .GetOrCreate();
    
    // Create initial DataFrame
    string filePath = args[0];
    DataFrame dataFrame = spark.Read().Text(filePath);
    
    // Count words
    DataFrame words =
      dataFrame
        .Select(Split(Col("value")," ").Alias("words"))
        .Select(Explode(Col("words")).Alias("word"))
        .GroupBy("word")
        .Count()
        .OrderBy(Col("count").Desc());
    
    // Display results
    words.Show();
    
    // Stop Spark session
    spark.Stop();
  • 説明
  • value列(既定の列名)を、半角スペースで分割して配列化、wordと言うaliasの列名を設定する。
    Select(Split(Col("value"), " ")).Alias("words"))
  • Explodeで、前述の結果セットのwords列セル中の配列を行に展開し、
    .Select(Explode(Col("words"))
  • 前述の結果セットをword列で行数を集計し、
    集計列(列名は"count"になる)をDescでOrderBy?する。
    .GroupBy("word").Count().OrderBy(Col("count").Desc())

バッチ処理

  • コード
    // Create Spark session
    SparkSession spark = SparkSession
      .Builder()
      .AppName("GitHub and Spark Batch")
      .GetOrCreate();
    
    // Create initial DataFrame
    DataFrame projectsDf = spark
      .Read()
      .Schema("id INT, url STRING, owner_id INT, " +
      "name STRING, descriptor STRING, language STRING, " +
      "created_at STRING, forked_from INT, deleted STRING, " +
      "updated_at STRING")
      .Csv(filePath);
    
    // Display results
    projectsDf.Show();
    
    // Drop any rows with NA values
    DataFrameNaFunctions dropEmptyProjects = projectsDf.Na();
    DataFrame cleanedProjects = dropEmptyProjects.Drop("any");
    
    // Remove unnecessary columns
    cleanedProjects = cleanedProjects.Drop("id", "url", "owner_id");
    
    // Display results
    cleanedProjects.Show();
    
    // Average number of times each language has been forked
    DataFrame groupedDF = cleanedProjects
      .GroupBy("language")
      .Agg(Avg(cleanedProjects["forked_from"]));
    
    // Sort by most forked languages first & Display results
    groupedDF.OrderBy(Desc("avg(forked_from)")).Show();
    
    // Defines a UDF that determines if a date is greater than a specified date
    spark.Udf().Register<string, bool>(
      "MyUDF",
      (date) => DateTime.TryParse(date, out DateTime convertedDate) &&
        (convertedDate > referenceDate));
    
    // Use UDF to add columns to the generated TempView
    cleanedProjects.CreateOrReplaceTempView("dateView");
    DataFrame dateDf = spark.Sql(
      "SELECT *, MyUDF(dateView.updated_at) AS datebefore FROM dateView");
    
    // Display results
    dateDf.Show();
    
    // Stop Spark session
    spark.Stop();
  • 説明
    • 初めに、
      • 任意のNULLまたはNaN値を含む行を削除
      • 不要な列の削除("id", "url", "owner_id")
  • 次に、各言語がフォークされた回数の平均値を求める。
  • 言語毎に集計
    .GroupBy("language")
  • 集計方法はforked_from列の平均値
    .Agg(Avg(cleanedProjects["forked_from"]));
  • 最後に集計列(列名は"avg(forked_from)"になる)をDescでOrderBy?する。
  • 最後に、datebefore列(updated_atが指定の日付より大きいか?)を追加。
    • 当該処理を行うユーザー定義関数(UDF)を登録
    • TempView?を生成してクエリ構文を使用してUDFを実行

構造化ストリーミング

  • データ
    ncからレスポンスされる文字列。
  • コード
    // Create Spark session
    SparkSession spark = SparkSession
      .Builder()
      .AppName("Streaming example with a UDF")
      .GetOrCreate();
    
    // Create initial DataFrame
    DataFrame lines = spark
      .ReadStream()
      .Format("socket")
      .Option("host", hostname)
      .Option("port", port)
      .Load();
    
    // UDF to produce an array
    // Array includes:
    // 1) original string
    // 2) original string + length of original string
    Func<Column, Column> udfArray =
      Udf<string, string[]>((str) => new string[] { str, $"{str} {str.Length}" });
    
    // Explode array to rows
    DataFrame arrayDF = lines.Select(Explode(udfArray(lines["value"])));
    
    // Process and display each incoming line
    StreamingQuery query = arrayDF
      .WriteStream()
      .Format("console")
      .Start();
    
    query.AwaitTermination();
    WriteStream?でStartして取得したQueryにAwaitTermination?して終了。
  • 説明
    • 来た文字列を、{文字列, 文字列長}の
      文字列配列にするユーザー定義関数(UDF)を登録
    • メソッド構文を使用してUDFを実行
    • UDFの返す配列をExplodeで行に展開して表示。
    • 構造化ストリーミングはループを書かずして、
      ストリーミング処理をループ的に処理することができる。

ML.NETでの感情分析

  • 説明
    • テストCSVをDataFrame?に読み込む。
    • 感情分析のユーザー定義関数(UDF)を登録、
    • TempView?を生成してクエリ構文を使用してUDFを実行

参考

DevelopersIO

Hadoop Advent Calendar 2016

Learn | Microsoft Docs

マイクロソフト系技術情報 Wiki


トップ   編集 凍結 差分 バックアップ 添付 複製 名前変更 リロード   新規 一覧 単語検索 最終更新   ヘルプ   最終更新のRSS
Last-modified: 2021-08-23 (月) 17:25:42 (31d)