「[[.NET 開発基盤部会 Wiki>http://dotnetdevelopmentinfrastructure.osscons.jp]]」は、「[[Open棟梁Project>https://github.com/OpenTouryoProject/]]」,「[[OSSコンソーシアム .NET開発基盤部会>https://www.osscons.jp/dotNetDevelopmentInfrastructure/]]」によって運営されています。

-[[戻る>Apache Spark]]

*目次 [#q2337c27]
#contents

*概要 [#pb3e516b]
-SQLで記述できる構造化データの処理の分散処理フレームワーク

-Spark SQLとDataFrame APIは、DataFrameで同じことができる。

*詳細 [#w4dd5ff5]

**機能 [#c67e6928]
-バックエンドで[[Apache Spark]]が動作する、

-[[Spark Core>Apache Spark#t805c63c]]より上位のコンポーネント

-[[Apache Hive]]との関連
--先行する[[Apache Hive]]と互換性を保ちながら発展中
--[[Apache Hive]]で培われた SQLによる分散処理を利用

***ドメイン固有言語 (DSL) > SQL言語 [#k934e69c]
-[[RDD>Apache Spark#f411c5af]]の処理をクエリ・エンジン経由で実行プラン生成して処理。
--基本的には[[DataFrame>#jfb944b3]]というオンメモリのテーブルに対し、[[LINQ的>#nd35024c]]に処理を行うことが出来る。
--強く型付けされたデータセットも完全にサポート([[DataFrame>#jfb944b3]]は型無しAPI)

-サポート
--Scala、Java、Pythonから[[DataFrame>#jfb944b3]]を操作できる。
--CLI、JDBC / ODBCサポートを実装している。
--[[2つのLINQ的な構文(クエリ構文 / メソッド構文)>#nd35024c]]

-クエリ オプティマイザ

--Catalyst Optimizer

--サポート
---ルールベース
---コストベース

--コストベースの 4 フェーズ
---分析
---ロジックの最適化
---物理的プランニング
---コード生成

-[[遅延評価>https://techinfoofmicrosofttech.osscons.jp/index.php?.NET%20for%20Apache%20Spark%E3%83%81%E3%83%A5%E3%83%BC%E3%83%88%E3%83%AA%E3%82%A2%E3%83%AB#y73d04c4]]
--実行計画は有向グラフ(DAG)として表現される。
--エグゼキュータでDAGが生成され、
--DAGを用いた遅延評価で、一回、処理が解析される。
--RDDを消費するAPIが呼ばれるとジョブが作られる。
--ドライバはエグゼキュータにジョブを転送する。

***DataFrameと言う構造化データ [#jfb944b3]
-テーブルのようなデータ構造をもった分散処理用データセット

--同名のRのデータ構造をモデルとした~
表形式データ(型と名前を持った列のテーブル)

--作業性のサポート向上を目的としている。
---フィルタ,グループ,集合演算も提供する。
---また、[[SQLライクにデータを操作>#k934e69c]]できる。

-今後は、[[RDD>Apache Spark#f411c5af]]ではなくDataFrameが主流になって行く。
--レジリエントな[[RDD>Apache Spark#f411c5af]]を上回る抽象化が提供される。
--Catalyst オプティマイザで従来の RDD の 5 ~ 20 倍、性能が向上。
--RDDを上回る抽象化が提供される。
--Catalyst オプティマイザでRDDの5~20倍、性能向上。

**構文 [#nd35024c]
[[LINQ>https://techinfoofmicrosofttech.osscons.jp/index.php?LINQ]]的

***「クエリ構文」的記法 [#c33c5c17]
-SparkSessionのsqlメソッドで入力元にクエリを発行し、結果をDataFrameで返す。
-若しくは、DataFrameにテーブル名を指定してSQL中で使用することでDataFrameを操作する。

***「メソッド構文」的記法 [#h1cb45e4]
コチラは、DataFrame操作のみの場合。

-正確にはDataFrame APIと呼ぶらしい。
-DataFrameをFluent API的なメソッド構文で操作する。

**操作 [#q7d7afae]

***基本 [#y6984111]
[[PySparkの例>PySpark#n212eaf0]]

***ユーザー定義関数(UDF) [#uf99259e]

**集計 [#w7306007]

***基本 [#j9e98b93]
[[PySparkの例>PySpark#n212eaf0]]

***ユーザー定義集計関数(UDAF) [#a15c075a]
Apache Spark 2.3 以降では [[Pandas]] UDF を使うことで UDAF に相当する処理を書くことができるようになっている。

**構造化ストリーミング [#vd23a8a1]
([[Spark Structured Streaming]])

***[[Python>PySpark#x623bccd]] [#l60dfd0c]
***[[.NET>#kfcf74cc]] [#a4ef3457]

*チュートリアル [#wbb02a14]

**[[Scala>Apache Sparkチュートリアル#bb51e63c]] [#scce13f5]

**[[PySpark>PySpark#n212eaf0]] [#jae8f1f5]
***[[ファースト・ステップ的なモノ>PySpark#ffe40eef]] [#cdcdc7b7]
***[[DataFrameに対する様々な操作>PySpark#w64c6c2e]] [#r26351c7]
***[[構造化ストリーミングでウィンドウ操作>PySpark#x623bccd]] [#m34a6c9e]
***[[様々な入力元・出力先の例>PySpark#yd135245]] [#t894f789]

**.NET for Apache Spark [#la5a0981]
***[[Get started in 10 minutes>https://techinfoofmicrosofttech.osscons.jp/index.php?.NET%20for%20Apache%20Spark%E3%83%81%E3%83%A5%E3%83%BC%E3%83%88%E3%83%AA%E3%82%A2%E3%83%AB#s680c185]] [#oaa10345]

-データ
 Hello World
 This .NET app uses .NET for Apache Spark
 This .NET app counts words with Apache Spark

-コード
 // Create Spark session
 SparkSession spark =
   SparkSession
     .Builder()
     .AppName("word_count_sample")
     .GetOrCreate();
 
 // Create initial DataFrame
 string filePath = args[0];
 DataFrame dataFrame = spark.Read().Text(filePath);
 
 // Count words
 DataFrame words =
   dataFrame
     .Select(Split(Col("value")," ").Alias("words"))
     .Select(Explode(Col("words")).Alias("word"))
     .GroupBy("word")
     .Count()
     .OrderBy(Col("count").Desc());
 
 // Display results
 words.Show();
 
 // Stop Spark session
 spark.Stop();

-説明

--value列(既定の列名)を、半角スペースで分割して配列化、wordと言うaliasの列名を設定する。
 Select(Split(Col("value"), " ")).Alias("words"))

--Explodeで、前述の結果セットのwords列セル中の配列を行に展開し、
 .Select(Explode(Col("words"))

--前述の結果セットをword列で行数を集計し、~
集計列(列名は"count"になる)をDescでOrderByする。
 .GroupBy("word").Count().OrderBy(Col("count").Desc())

--参考~
pandas documentation
---pandas.DataFrame.explode~
https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.explode.html

***[[バッチ処理>https://techinfoofmicrosofttech.osscons.jp/index.php?.NET%20for%20Apache%20Spark%E3%83%81%E3%83%A5%E3%83%BC%E3%83%88%E3%83%AA%E3%82%A2%E3%83%AB#l7b4829d]] [#n1c1114b]

-データ~
GitHubのリポジトリデータ?~
https://github.com/dotnet/spark/blob/main/examples/Microsoft.Spark.CSharp.Examples/Sql/Batch/projects_smaller.csv
|id INT|url STRING|owner_id INT|name STRING|descriptor STRING|language STRING|created_at STRING|forked_from INT|deleted STRING|updated_at STRING|

-コード
 // Create Spark session
 SparkSession spark = SparkSession
   .Builder()
   .AppName("GitHub and Spark Batch")
   .GetOrCreate();
 
 // Create initial DataFrame
 DataFrame projectsDf = spark
   .Read()
   .Schema("id INT, url STRING, owner_id INT, " +
   "name STRING, descriptor STRING, language STRING, " +
   "created_at STRING, forked_from INT, deleted STRING, " +
   "updated_at STRING")
   .Csv(filePath);
 
 // Display results
 projectsDf.Show();
 
 // Drop any rows with NA values
 DataFrameNaFunctions dropEmptyProjects = projectsDf.Na();
 DataFrame cleanedProjects = dropEmptyProjects.Drop("any");
 
 // Remove unnecessary columns
 cleanedProjects = cleanedProjects.Drop("id", "url", "owner_id");
 
 // Display results
 cleanedProjects.Show();
 
 // Average number of times each language has been forked
 DataFrame groupedDF = cleanedProjects
   .GroupBy("language")
   .Agg(Avg(cleanedProjects["forked_from"]));
 
 // Sort by most forked languages first & Display results
 groupedDF.OrderBy(Desc("avg(forked_from)")).Show();
 
 // Defines a UDF that determines if a date is greater than a specified date
 spark.Udf().Register<string, bool>(
   "MyUDF",
   (date) => DateTime.TryParse(date, out DateTime convertedDate) &&
     (convertedDate > referenceDate));
 
 // Use UDF to add columns to the generated TempView
 cleanedProjects.CreateOrReplaceTempView("dateView");
 DataFrame dateDf = spark.Sql(
   "SELECT *, MyUDF(dateView.updated_at) AS datebefore FROM dateView");
 
 // Display results
 dateDf.Show();
 
 // Stop Spark session
 spark.Stop();

-説明
--初めに、
---任意のNULLまたはNaN値を含む行を削除
---不要な列の削除("id", "url", "owner_id")

--次に、各言語がフォークされた回数の平均値を求める。

---言語毎に集計
 .GroupBy("language")

---集計方法はforked_from列の平均値
 .Agg(Avg(cleanedProjects["forked_from"]));

---最後に集計列(列名は"avg(forked_from)"になる)をDescでOrderByする。

--最後に、datebefore列(updated_atが指定の日付より大きいか?)を追加。
---当該処理を行う[[ユーザー定義関数(UDF)>#uf99259e]]を登録
---TempViewを生成して[[クエリ構文>#c33c5c17]]を使用してUDFを実行

***[[構造化ストリーミング>https://techinfoofmicrosofttech.osscons.jp/index.php?.NET%20for%20Apache%20Spark%E3%83%81%E3%83%A5%E3%83%BC%E3%83%88%E3%83%AA%E3%82%A2%E3%83%AB#v2288c2e]] [#kfcf74cc]

-データ~
ncからレスポンスされる文字列。

-コード
 // Create Spark session
 SparkSession spark = SparkSession
   .Builder()
   .AppName("Streaming example with a UDF")
   .GetOrCreate();
 
 // Create initial DataFrame
 DataFrame lines = spark
   .ReadStream()
   .Format("socket")
   .Option("host", hostname)
   .Option("port", port)
   .Load();
 
 // UDF to produce an array
 // Array includes:
 // 1) original string
 // 2) original string + length of original string
 Func<Column, Column> udfArray =
   Udf<string, string[]>((str) => new string[] { str, $"{str} {str.Length}" });
 
 // Explode array to rows
 DataFrame arrayDF = lines.Select(Explode(udfArray(lines["value"])));
 
 // Process and display each incoming line
 StreamingQuery query = arrayDF
   .WriteStream()
   .Format("console")
   .Start();
 
 query.AwaitTermination();
※ WriteStreamでStartして取得したQueryにAwaitTerminationして終了。

-説明~
--来た文字列を、{文字列, 文字列長}の~
文字列配列にするユーザー定義関数(UDF)を登録
--[[メソッド構文>#h1cb45e4]]を使用してUDFを実行
--UDFの返す配列をExplodeで行に展開して表示。
--構造化ストリーミングはループを書かずして、~
ストリーミング処理をループ的に処理することができる。

***[[ML.NETでの感情分析>https://techinfoofmicrosofttech.osscons.jp/index.php?.NET%20for%20Apache%20Spark%E3%83%81%E3%83%A5%E3%83%BC%E3%83%88%E3%83%AA%E3%82%A2%E3%83%AB#bb5afcf2]] [#g8615de3]

-データ
--トレーニング~
https://github.com/dotnet/spark/blob/main/examples/Microsoft.Spark.CSharp.Examples/MachineLearning/Sentiment/Resources/yelptrain.csv
--テスト~
https://github.com/dotnet/spark/blob/main/examples/Microsoft.Spark.CSharp.Examples/MachineLearning/Sentiment/Resources/yelptest.csv

-コード~
https://github.com/dotnet/spark/blob/main/examples/Microsoft.Spark.CSharp.Examples/MachineLearning/Sentiment/Program.cs

-説明
--テストCSVをDataFrameに読み込む。
--感情分析のユーザー定義関数(UDF)を登録、
--TempViewを生成して[[クエリ構文>#c33c5c17]]を使用してUDFを実行

*参考 [#aaa7ffa2]
-Apache Spark 1.3リリース - DataFrame,SparkSQL,MLibを拡張~
https://www.infoq.com/jp/news/2015/04/apache-spark-1.3-released

-Spark SQL およびデータフレーム - Spark 2.2.0 ドキュメント 日本語訳~
http://mogile.web.fc2.com/spark/spark220/sql-programming-guide.html

-Spark DataframeのSample Code集 - Qiita~
https://qiita.com/taka4sato/items/4ab2cf9e941599f1c0ca

**DevelopersIO [#f1fbe333]
Hadoop Advent Calendar 2016

-#16 > Spark SQLとDataFrame API入門~
https://dev.classmethod.jp/articles/hadoop-advent-calendar-16-spark-sql-and-dataframe/

-#17 > Spark Dataset APIについて~
https://dev.classmethod.jp/articles/hadoop-advent-calendar-17-spark-dataset/

**Learn | Microsoft Docs [#k9380484]
-Azure Databricks を使用した Data Engineering - ~
https://docs.microsoft.com/ja-jp/learn/paths/data-engineer-azure-databricks/
--Azure Databricks で DataFrames を操作する~
https://docs.microsoft.com/ja-jp/learn/modules/work-dataframes-azure-databricks/
--Azure Databricks の遅延評価および~
その他のパフォーマンス機能の説明~
https://docs.microsoft.com/ja-jp/learn/modules/describe-lazy-evaluation-performance-features-azure-databricks/
--Azure Databricks で DataFrames 列を操作する~
https://docs.microsoft.com/ja-jp/learn/modules/work-dataframes-columns-azure-databricks/
--Azure Databricks で DataFrames の高度なメソッドを操作する~
https://docs.microsoft.com/ja-jp/learn/modules/work-dataframes-advanced-methods-azure-databricks/
--Azure Databricks の構造化ストリーミングを~
使用してストリーミング データを処理する~
https://docs.microsoft.com/ja-jp/learn/modules/process-streaming-data-azure-databricks-structured-streaming/

**マイクロソフト系技術情報 Wiki [#i871e247]
-Azure Databricks > Azure Databricksチュートリアル~
https://techinfoofmicrosofttech.osscons.jp/index.php?Azure%20Databricks%E3%83%81%E3%83%A5%E3%83%BC%E3%83%88%E3%83%AA%E3%82%A2%E3%83%AB
-.NET for Apache Spark > .NET for Apache Sparkチュートリアル~
https://techinfoofmicrosofttech.osscons.jp/index.php?.NET%20for%20Apache%20Spark%E3%83%81%E3%83%A5%E3%83%BC%E3%83%88%E3%83%AA%E3%82%A2%E3%83%AB

トップ   編集 差分 バックアップ 添付 複製 名前変更 リロード   新規 一覧 単語検索 最終更新   ヘルプ   最終更新のRSS