「[[.NET 開発基盤部会 Wiki>http://dotnetdevelopmentinfrastructure.osscons.jp]]」は、「[[Open棟梁Project>https://github.com/OpenTouryoProject/]]」,「[[OSSコンソーシアム .NET開発基盤部会>https://www.osscons.jp/dotNetDevelopmentInfrastructure/]]」によって運営されています。 -[[戻る>Apache Spark]] *目次 [#q2337c27] #contents *概要 [#pb3e516b] -SQLで記述できる構造化データの処理の分散処理フレームワーク -Spark SQLとDataFrame APIは、DataFrameで同じことができる。 *詳細 [#w4dd5ff5] **機能 [#c67e6928] -バックエンドで[[Apache Spark]]が動作する、 -[[Spark Core>Apache Spark#t805c63c]]より上位のコンポーネント -[[Apache Hive]]との関連 --先行する[[Apache Hive]]と互換性を保ちながら発展中 --[[Apache Hive]]で培われた SQLによる分散処理を利用 ***ドメイン固有言語 (DSL) > SQL言語 [#k934e69c] -[[RDD>Apache Spark#f411c5af]]の処理をクエリ・エンジン経由で実行プラン生成して処理。 --基本的には[[DataFrame>#jfb944b3]]というオンメモリのテーブルに対し、[[LINQ的>#nd35024c]]に処理を行うことが出来る。 --強く型付けされたデータセットも完全にサポート([[DataFrame>#jfb944b3]]は型無しAPI) -サポート --Scala、Java、Pythonから[[DataFrame>#jfb944b3]]を操作できる。 --CLI、JDBC / ODBCサポートを実装している。 --[[2つのLINQ的な構文(クエリ構文 / メソッド構文)>#nd35024c]] -クエリ オプティマイザ --Catalyst Optimizer --サポート ---ルールベース ---コストベース --コストベースの 4 フェーズ ---分析 ---ロジックの最適化 ---物理的プランニング ---コード生成 -[[遅延評価>https://techinfoofmicrosofttech.osscons.jp/index.php?.NET%20for%20Apache%20Spark%E3%83%81%E3%83%A5%E3%83%BC%E3%83%88%E3%83%AA%E3%82%A2%E3%83%AB#y73d04c4]] --実行計画は有向グラフ(DAG)として表現される。 --エグゼキュータでDAGが生成され、 --DAGを用いた遅延評価で、一回、処理が解析される。 --RDDを消費するAPIが呼ばれるとジョブが作られる。 --ドライバはエグゼキュータにジョブを転送する。 ***DataFrameと言う構造化データ [#jfb944b3] -テーブルのようなデータ構造をもった分散処理用データセット --同名のRのデータ構造をモデルとした~ 表形式データ(型と名前を持った列のテーブル) --作業性のサポート向上を目的としている。 ---フィルタ,グループ,集合演算も提供する。 ---また、[[SQLライクにデータを操作>#k934e69c]]できる。 -今後は、[[RDD>Apache Spark#f411c5af]]ではなくDataFrameが主流になって行く。 --レジリエントな[[RDD>Apache Spark#f411c5af]]を上回る抽象化が提供される。 --Catalyst オプティマイザで従来の RDD の 5 ~ 20 倍、性能が向上。 --RDDを上回る抽象化が提供される。 --Catalyst オプティマイザでRDDの5~20倍、性能向上。 **構文 [#nd35024c] [[LINQ>https://techinfoofmicrosofttech.osscons.jp/index.php?LINQ]]的 ***「クエリ構文」的記法 [#c33c5c17] -SparkSessionのsqlメソッドで入力元にクエリを発行し、結果をDataFrameで返す。 -若しくは、DataFrameにテーブル名を指定してSQL中で使用することでDataFrameを操作する。 ***「メソッド構文」的記法 [#h1cb45e4] コチラは、DataFrame操作のみの場合。 -正確にはDataFrame APIと呼ぶらしい。 -DataFrameをFluent API的なメソッド構文で操作する。 **操作 [#q7d7afae] ***基本 [#y6984111] [[PySparkの例>PySpark#n212eaf0]] ***ユーザー定義関数(UDF) [#uf99259e] **集計 [#w7306007] ***基本 [#j9e98b93] [[PySparkの例>PySpark#n212eaf0]] ***ユーザー定義集計関数(UDAF) [#a15c075a] Apache Spark 2.3 以降では [[Pandas]] UDF を使うことで UDAF に相当する処理を書くことができるようになっている。 **構造化ストリーミング [#vd23a8a1] ([[Spark Structured Streaming]]) ***[[Python>PySpark#x623bccd]] [#l60dfd0c] ***[[.NET>#kfcf74cc]] [#a4ef3457] *チュートリアル [#wbb02a14] **[[Scala>Apache Sparkチュートリアル#bb51e63c]] [#scce13f5] **[[PySpark>PySpark#n212eaf0]] [#jae8f1f5] ***[[ファースト・ステップ的なモノ>PySpark#ffe40eef]] [#cdcdc7b7] ***[[DataFrameに対する様々な操作>PySpark#w64c6c2e]] [#r26351c7] ***[[構造化ストリーミングでウィンドウ操作>PySpark#x623bccd]] [#m34a6c9e] ***[[様々な入力元・出力先の例>PySpark#yd135245]] [#t894f789] **.NET for Apache Spark [#la5a0981] ***[[Get started in 10 minutes>https://techinfoofmicrosofttech.osscons.jp/index.php?.NET%20for%20Apache%20Spark%E3%83%81%E3%83%A5%E3%83%BC%E3%83%88%E3%83%AA%E3%82%A2%E3%83%AB#s680c185]] [#oaa10345] -データ Hello World This .NET app uses .NET for Apache Spark This .NET app counts words with Apache Spark -コード // Create Spark session SparkSession spark = SparkSession .Builder() .AppName("word_count_sample") .GetOrCreate(); // Create initial DataFrame string filePath = args[0]; DataFrame dataFrame = spark.Read().Text(filePath); // Count words DataFrame words = dataFrame .Select(Split(Col("value")," ").Alias("words")) .Select(Explode(Col("words")).Alias("word")) .GroupBy("word") .Count() .OrderBy(Col("count").Desc()); // Display results words.Show(); // Stop Spark session spark.Stop(); -説明 --value列(既定の列名)を、半角スペースで分割して配列化、wordと言うaliasの列名を設定する。 Select(Split(Col("value"), " ")).Alias("words")) --Explodeで、前述の結果セットのwords列セル中の配列を行に展開し、 .Select(Explode(Col("words")) --前述の結果セットをword列で行数を集計し、~ 集計列(列名は"count"になる)をDescでOrderByする。 .GroupBy("word").Count().OrderBy(Col("count").Desc()) --参考~ pandas documentation ---pandas.DataFrame.explode~ https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.explode.html ***[[バッチ処理>https://techinfoofmicrosofttech.osscons.jp/index.php?.NET%20for%20Apache%20Spark%E3%83%81%E3%83%A5%E3%83%BC%E3%83%88%E3%83%AA%E3%82%A2%E3%83%AB#l7b4829d]] [#n1c1114b] -データ~ GitHubのリポジトリデータ?~ https://github.com/dotnet/spark/blob/main/examples/Microsoft.Spark.CSharp.Examples/Sql/Batch/projects_smaller.csv |id INT|url STRING|owner_id INT|name STRING|descriptor STRING|language STRING|created_at STRING|forked_from INT|deleted STRING|updated_at STRING| -コード // Create Spark session SparkSession spark = SparkSession .Builder() .AppName("GitHub and Spark Batch") .GetOrCreate(); // Create initial DataFrame DataFrame projectsDf = spark .Read() .Schema("id INT, url STRING, owner_id INT, " + "name STRING, descriptor STRING, language STRING, " + "created_at STRING, forked_from INT, deleted STRING, " + "updated_at STRING") .Csv(filePath); // Display results projectsDf.Show(); // Drop any rows with NA values DataFrameNaFunctions dropEmptyProjects = projectsDf.Na(); DataFrame cleanedProjects = dropEmptyProjects.Drop("any"); // Remove unnecessary columns cleanedProjects = cleanedProjects.Drop("id", "url", "owner_id"); // Display results cleanedProjects.Show(); // Average number of times each language has been forked DataFrame groupedDF = cleanedProjects .GroupBy("language") .Agg(Avg(cleanedProjects["forked_from"])); // Sort by most forked languages first & Display results groupedDF.OrderBy(Desc("avg(forked_from)")).Show(); // Defines a UDF that determines if a date is greater than a specified date spark.Udf().Register<string, bool>( "MyUDF", (date) => DateTime.TryParse(date, out DateTime convertedDate) && (convertedDate > referenceDate)); // Use UDF to add columns to the generated TempView cleanedProjects.CreateOrReplaceTempView("dateView"); DataFrame dateDf = spark.Sql( "SELECT *, MyUDF(dateView.updated_at) AS datebefore FROM dateView"); // Display results dateDf.Show(); // Stop Spark session spark.Stop(); -説明 --初めに、 ---任意のNULLまたはNaN値を含む行を削除 ---不要な列の削除("id", "url", "owner_id") --次に、各言語がフォークされた回数の平均値を求める。 ---言語毎に集計 .GroupBy("language") ---集計方法はforked_from列の平均値 .Agg(Avg(cleanedProjects["forked_from"])); ---最後に集計列(列名は"avg(forked_from)"になる)をDescでOrderByする。 --最後に、datebefore列(updated_atが指定の日付より大きいか?)を追加。 ---当該処理を行う[[ユーザー定義関数(UDF)>#uf99259e]]を登録 ---TempViewを生成して[[クエリ構文>#c33c5c17]]を使用してUDFを実行 ***[[構造化ストリーミング>https://techinfoofmicrosofttech.osscons.jp/index.php?.NET%20for%20Apache%20Spark%E3%83%81%E3%83%A5%E3%83%BC%E3%83%88%E3%83%AA%E3%82%A2%E3%83%AB#v2288c2e]] [#kfcf74cc] -データ~ ncからレスポンスされる文字列。 -コード // Create Spark session SparkSession spark = SparkSession .Builder() .AppName("Streaming example with a UDF") .GetOrCreate(); // Create initial DataFrame DataFrame lines = spark .ReadStream() .Format("socket") .Option("host", hostname) .Option("port", port) .Load(); // UDF to produce an array // Array includes: // 1) original string // 2) original string + length of original string Func<Column, Column> udfArray = Udf<string, string[]>((str) => new string[] { str, $"{str} {str.Length}" }); // Explode array to rows DataFrame arrayDF = lines.Select(Explode(udfArray(lines["value"]))); // Process and display each incoming line StreamingQuery query = arrayDF .WriteStream() .Format("console") .Start(); query.AwaitTermination(); ※ WriteStreamでStartして取得したQueryにAwaitTerminationして終了。 -説明~ --来た文字列を、{文字列, 文字列長}の~ 文字列配列にするユーザー定義関数(UDF)を登録 --[[メソッド構文>#h1cb45e4]]を使用してUDFを実行 --UDFの返す配列をExplodeで行に展開して表示。 --構造化ストリーミングはループを書かずして、~ ストリーミング処理をループ的に処理することができる。 ***[[ML.NETでの感情分析>https://techinfoofmicrosofttech.osscons.jp/index.php?.NET%20for%20Apache%20Spark%E3%83%81%E3%83%A5%E3%83%BC%E3%83%88%E3%83%AA%E3%82%A2%E3%83%AB#bb5afcf2]] [#g8615de3] -データ --トレーニング~ https://github.com/dotnet/spark/blob/main/examples/Microsoft.Spark.CSharp.Examples/MachineLearning/Sentiment/Resources/yelptrain.csv --テスト~ https://github.com/dotnet/spark/blob/main/examples/Microsoft.Spark.CSharp.Examples/MachineLearning/Sentiment/Resources/yelptest.csv -コード~ https://github.com/dotnet/spark/blob/main/examples/Microsoft.Spark.CSharp.Examples/MachineLearning/Sentiment/Program.cs -説明 --テストCSVをDataFrameに読み込む。 --感情分析のユーザー定義関数(UDF)を登録、 --TempViewを生成して[[クエリ構文>#c33c5c17]]を使用してUDFを実行 *参考 [#aaa7ffa2] -Apache Spark 1.3リリース - DataFrame,SparkSQL,MLibを拡張~ https://www.infoq.com/jp/news/2015/04/apache-spark-1.3-released -Spark SQL およびデータフレーム - Spark 2.2.0 ドキュメント 日本語訳~ http://mogile.web.fc2.com/spark/spark220/sql-programming-guide.html -Spark DataframeのSample Code集 - Qiita~ https://qiita.com/taka4sato/items/4ab2cf9e941599f1c0ca **DevelopersIO [#f1fbe333] Hadoop Advent Calendar 2016 -#16 > Spark SQLとDataFrame API入門~ https://dev.classmethod.jp/articles/hadoop-advent-calendar-16-spark-sql-and-dataframe/ -#17 > Spark Dataset APIについて~ https://dev.classmethod.jp/articles/hadoop-advent-calendar-17-spark-dataset/ **Learn | Microsoft Docs [#k9380484] -Azure Databricks を使用した Data Engineering - ~ https://docs.microsoft.com/ja-jp/learn/paths/data-engineer-azure-databricks/ --Azure Databricks で DataFrames を操作する~ https://docs.microsoft.com/ja-jp/learn/modules/work-dataframes-azure-databricks/ --Azure Databricks の遅延評価および~ その他のパフォーマンス機能の説明~ https://docs.microsoft.com/ja-jp/learn/modules/describe-lazy-evaluation-performance-features-azure-databricks/ --Azure Databricks で DataFrames 列を操作する~ https://docs.microsoft.com/ja-jp/learn/modules/work-dataframes-columns-azure-databricks/ --Azure Databricks で DataFrames の高度なメソッドを操作する~ https://docs.microsoft.com/ja-jp/learn/modules/work-dataframes-advanced-methods-azure-databricks/ --Azure Databricks の構造化ストリーミングを~ 使用してストリーミング データを処理する~ https://docs.microsoft.com/ja-jp/learn/modules/process-streaming-data-azure-databricks-structured-streaming/ **マイクロソフト系技術情報 Wiki [#i871e247] -Azure Databricks > Azure Databricksチュートリアル~ https://techinfoofmicrosofttech.osscons.jp/index.php?Azure%20Databricks%E3%83%81%E3%83%A5%E3%83%BC%E3%83%88%E3%83%AA%E3%82%A2%E3%83%AB -.NET for Apache Spark > .NET for Apache Sparkチュートリアル~ https://techinfoofmicrosofttech.osscons.jp/index.php?.NET%20for%20Apache%20Spark%E3%83%81%E3%83%A5%E3%83%BC%E3%83%88%E3%83%AA%E3%82%A2%E3%83%AB