Spark SQL
をテンプレートにして作成
[
トップ
] [
新規
|
一覧
|
単語検索
|
最終更新
|
ヘルプ
]
開始行:
「[[.NET 開発基盤部会 Wiki>http://dotnetdevelopmentinfras...
-[[戻る>Apache Spark]]
*目次 [#q2337c27]
#contents
*概要 [#pb3e516b]
-SQLで記述できる構造化データの処理の分散処理フレームワーク
-Spark SQLとDataFrame APIは、DataFrameで同じことができる。
*詳細 [#w4dd5ff5]
**機能 [#c67e6928]
-バックエンドで[[Apache Spark]]が動作する、
-[[Spark Core>Apache Spark#t805c63c]]より上位のコンポーネ...
-[[Apache Hive]]との関連
--先行する[[Apache Hive]]と互換性を保ちながら発展中
--[[Apache Hive]]で培われた SQLによる分散処理を利用
***ドメイン固有言語 (DSL) > SQL言語 [#k934e69c]
-[[RDD>Apache Spark#f411c5af]]の処理をクエリ・エンジン経...
--基本的には[[DataFrame>#jfb944b3]]というオンメモリのテー...
--強く型付けされたデータセットも完全にサポート([[DataFra...
-サポート
--Scala、Java、Pythonから[[DataFrame>#jfb944b3]]を操作で...
--CLI、JDBC / ODBCサポートを実装している。
--[[2つのLINQ的な構文(クエリ構文 / メソッド構文)>#nd350...
-クエリ オプティマイザ
--Catalyst Optimizer
--サポート
---ルールベース
---コストベース
--コストベースの 4 フェーズ
---分析
---ロジックの最適化
---物理的プランニング
---コード生成
-[[遅延評価>https://techinfoofmicrosofttech.osscons.jp/in...
--実行計画は有向グラフ(DAG)として表現される。
--エグゼキュータでDAGが生成され、
--DAGを用いた遅延評価で、一回、処理が解析される。
--RDDを消費するAPIが呼ばれるとジョブが作られる。
--ドライバはエグゼキュータにジョブを転送する。
***DataFrameと言う構造化データ [#jfb944b3]
-テーブルのようなデータ構造をもった分散処理用データセット
--同名のRのデータ構造をモデルとした~
表形式データ(型と名前を持った列のテーブル)
--作業性のサポート向上を目的としている。
---フィルタ,グループ,集合演算も提供する。
---また、[[SQLライクにデータを操作>#k934e69c]]できる。
-今後は、[[RDD>Apache Spark#f411c5af]]ではなくDataFrameが...
--RDDを上回る抽象化が提供される。
--Catalyst オプティマイザでRDDの5~20倍、性能向上。
**構文 [#nd35024c]
[[LINQ>https://techinfoofmicrosofttech.osscons.jp/index.p...
***「クエリ構文」的記法 [#c33c5c17]
-SparkSessionのsqlメソッドで入力元にクエリを発行し、結果...
-若しくは、DataFrameにテーブル名を指定してSQL中で使用する...
***「メソッド構文」的記法 [#h1cb45e4]
コチラは、DataFrame操作のみの場合。
-正確にはDataFrame APIと呼ぶらしい。
-DataFrameをFluent API的なメソッド構文で操作する。
**操作 [#q7d7afae]
***基本 [#y6984111]
[[PySparkの例>PySpark#n212eaf0]]
***ユーザー定義関数(UDF) [#uf99259e]
**集計 [#w7306007]
***基本 [#j9e98b93]
[[PySparkの例>PySpark#n212eaf0]]
***ユーザー定義集計関数(UDAF) [#a15c075a]
Apache Spark 2.3 以降では [[Pandas]] UDF を使うことで UDA...
**構造化ストリーミング [#vd23a8a1]
([[Spark Structured Streaming]])
***[[Python>PySpark#x623bccd]] [#l60dfd0c]
***[[.NET>#kfcf74cc]] [#a4ef3457]
*チュートリアル [#wbb02a14]
**[[Scala>Apache Sparkチュートリアル#bb51e63c]] [#scce13f5]
**[[PySpark>PySpark#n212eaf0]] [#jae8f1f5]
***[[ファースト・ステップ的なモノ>PySpark#ffe40eef]] [#cd...
***[[DataFrameに対する様々な操作>PySpark#w64c6c2e]] [#r26...
***[[構造化ストリーミングでウィンドウ操作>PySpark#x623bcc...
***[[様々な入力元・出力先の例>PySpark#yd135245]] [#t894f7...
**.NET for Apache Spark [#la5a0981]
***[[Get started in 10 minutes>https://techinfoofmicrosof...
-データ
Hello World
This .NET app uses .NET for Apache Spark
This .NET app counts words with Apache Spark
-コード
// Create Spark session
SparkSession spark =
SparkSession
.Builder()
.AppName("word_count_sample")
.GetOrCreate();
// Create initial DataFrame
string filePath = args[0];
DataFrame dataFrame = spark.Read().Text(filePath);
// Count words
DataFrame words =
dataFrame
.Select(Split(Col("value")," ").Alias("words"))
.Select(Explode(Col("words")).Alias("word"))
.GroupBy("word")
.Count()
.OrderBy(Col("count").Desc());
// Display results
words.Show();
// Stop Spark session
spark.Stop();
-説明
--value列(既定の列名)を、半角スペースで分割して配列化、...
Select(Split(Col("value"), " ")).Alias("words"))
--Explodeで、前述の結果セットのwords列セル中の配列を行に...
.Select(Explode(Col("words"))
--前述の結果セットをword列で行数を集計し、~
集計列(列名は"count"になる)をDescでOrderByする。
.GroupBy("word").Count().OrderBy(Col("count").Desc())
--参考~
pandas documentation
---pandas.DataFrame.explode~
https://pandas.pydata.org/pandas-docs/stable/reference/ap...
***[[バッチ処理>https://techinfoofmicrosofttech.osscons.j...
-データ~
GitHubのリポジトリデータ?~
https://github.com/dotnet/spark/blob/main/examples/Micros...
|id INT|url STRING|owner_id INT|name STRING|descriptor ST...
-コード
// Create Spark session
SparkSession spark = SparkSession
.Builder()
.AppName("GitHub and Spark Batch")
.GetOrCreate();
// Create initial DataFrame
DataFrame projectsDf = spark
.Read()
.Schema("id INT, url STRING, owner_id INT, " +
"name STRING, descriptor STRING, language STRING, " +
"created_at STRING, forked_from INT, deleted STRING, " +
"updated_at STRING")
.Csv(filePath);
// Display results
projectsDf.Show();
// Drop any rows with NA values
DataFrameNaFunctions dropEmptyProjects = projectsDf.Na();
DataFrame cleanedProjects = dropEmptyProjects.Drop("any");
// Remove unnecessary columns
cleanedProjects = cleanedProjects.Drop("id", "url", "own...
// Display results
cleanedProjects.Show();
// Average number of times each language has been forked
DataFrame groupedDF = cleanedProjects
.GroupBy("language")
.Agg(Avg(cleanedProjects["forked_from"]));
// Sort by most forked languages first & Display results
groupedDF.OrderBy(Desc("avg(forked_from)")).Show();
// Defines a UDF that determines if a date is greater th...
spark.Udf().Register<string, bool>(
"MyUDF",
(date) => DateTime.TryParse(date, out DateTime convert...
(convertedDate > referenceDate));
// Use UDF to add columns to the generated TempView
cleanedProjects.CreateOrReplaceTempView("dateView");
DataFrame dateDf = spark.Sql(
"SELECT *, MyUDF(dateView.updated_at) AS datebefore FR...
// Display results
dateDf.Show();
// Stop Spark session
spark.Stop();
-説明
--初めに、
---任意のNULLまたはNaN値を含む行を削除
---不要な列の削除("id", "url", "owner_id")
--次に、各言語がフォークされた回数の平均値を求める。
---言語毎に集計
.GroupBy("language")
---集計方法はforked_from列の平均値
.Agg(Avg(cleanedProjects["forked_from"]));
---最後に集計列(列名は"avg(forked_from)"になる)をDescで...
--最後に、datebefore列(updated_atが指定の日付より大きい...
---当該処理を行う[[ユーザー定義関数(UDF)>#uf99259e]]を...
---TempViewを生成して[[クエリ構文>#c33c5c17]]を使用してUD...
***[[構造化ストリーミング>https://techinfoofmicrosofttech...
-データ~
ncからレスポンスされる文字列。
-コード
// Create Spark session
SparkSession spark = SparkSession
.Builder()
.AppName("Streaming example with a UDF")
.GetOrCreate();
// Create initial DataFrame
DataFrame lines = spark
.ReadStream()
.Format("socket")
.Option("host", hostname)
.Option("port", port)
.Load();
// UDF to produce an array
// Array includes:
// 1) original string
// 2) original string + length of original string
Func<Column, Column> udfArray =
Udf<string, string[]>((str) => new string[] { str, $"{...
// Explode array to rows
DataFrame arrayDF = lines.Select(Explode(udfArray(lines[...
// Process and display each incoming line
StreamingQuery query = arrayDF
.WriteStream()
.Format("console")
.Start();
query.AwaitTermination();
※ WriteStreamでStartして取得したQueryにAwaitTerminationし...
-説明~
--来た文字列を、{文字列, 文字列長}の~
文字列配列にするユーザー定義関数(UDF)を登録
--[[メソッド構文>#h1cb45e4]]を使用してUDFを実行
--UDFの返す配列をExplodeで行に展開して表示。
--構造化ストリーミングはループを書かずして、~
ストリーミング処理をループ的に処理することができる。
***[[ML.NETでの感情分析>https://techinfoofmicrosofttech.o...
-データ
--トレーニング~
https://github.com/dotnet/spark/blob/main/examples/Micros...
--テスト~
https://github.com/dotnet/spark/blob/main/examples/Micros...
-コード~
https://github.com/dotnet/spark/blob/main/examples/Micros...
-説明
--テストCSVをDataFrameに読み込む。
--感情分析のユーザー定義関数(UDF)を登録、
--TempViewを生成して[[クエリ構文>#c33c5c17]]を使用してUDF...
*参考 [#aaa7ffa2]
-Apache Spark 1.3リリース - DataFrame,SparkSQL,MLibを拡...
https://www.infoq.com/jp/news/2015/04/apache-spark-1.3-re...
-Spark SQL およびデータフレーム - Spark 2.2.0 ドキュメン...
http://mogile.web.fc2.com/spark/spark220/sql-programming-...
-Spark DataframeのSample Code集 - Qiita~
https://qiita.com/taka4sato/items/4ab2cf9e941599f1c0ca
**DevelopersIO [#f1fbe333]
Hadoop Advent Calendar 2016
-#16 > Spark SQLとDataFrame API入門~
https://dev.classmethod.jp/articles/hadoop-advent-calenda...
-#17 > Spark Dataset APIについて~
https://dev.classmethod.jp/articles/hadoop-advent-calenda...
**Learn | Microsoft Docs [#k9380484]
-Azure Databricks を使用した Data Engineering - ~
https://docs.microsoft.com/ja-jp/learn/paths/data-enginee...
--Azure Databricks で DataFrames を操作する~
https://docs.microsoft.com/ja-jp/learn/modules/work-dataf...
--Azure Databricks の遅延評価および~
その他のパフォーマンス機能の説明~
https://docs.microsoft.com/ja-jp/learn/modules/describe-l...
--Azure Databricks で DataFrames 列を操作する~
https://docs.microsoft.com/ja-jp/learn/modules/work-dataf...
--Azure Databricks で DataFrames の高度なメソッドを操作す...
https://docs.microsoft.com/ja-jp/learn/modules/work-dataf...
--Azure Databricks の構造化ストリーミングを~
使用してストリーミング データを処理する~
https://docs.microsoft.com/ja-jp/learn/modules/process-st...
**マイクロソフト系技術情報 Wiki [#i871e247]
-Azure Databricks > Azure Databricksチュートリアル~
https://techinfoofmicrosofttech.osscons.jp/index.php?Azur...
-.NET for Apache Spark > .NET for Apache Sparkチュートリ...
https://techinfoofmicrosofttech.osscons.jp/index.php?.NET...
終了行:
「[[.NET 開発基盤部会 Wiki>http://dotnetdevelopmentinfras...
-[[戻る>Apache Spark]]
*目次 [#q2337c27]
#contents
*概要 [#pb3e516b]
-SQLで記述できる構造化データの処理の分散処理フレームワーク
-Spark SQLとDataFrame APIは、DataFrameで同じことができる。
*詳細 [#w4dd5ff5]
**機能 [#c67e6928]
-バックエンドで[[Apache Spark]]が動作する、
-[[Spark Core>Apache Spark#t805c63c]]より上位のコンポーネ...
-[[Apache Hive]]との関連
--先行する[[Apache Hive]]と互換性を保ちながら発展中
--[[Apache Hive]]で培われた SQLによる分散処理を利用
***ドメイン固有言語 (DSL) > SQL言語 [#k934e69c]
-[[RDD>Apache Spark#f411c5af]]の処理をクエリ・エンジン経...
--基本的には[[DataFrame>#jfb944b3]]というオンメモリのテー...
--強く型付けされたデータセットも完全にサポート([[DataFra...
-サポート
--Scala、Java、Pythonから[[DataFrame>#jfb944b3]]を操作で...
--CLI、JDBC / ODBCサポートを実装している。
--[[2つのLINQ的な構文(クエリ構文 / メソッド構文)>#nd350...
-クエリ オプティマイザ
--Catalyst Optimizer
--サポート
---ルールベース
---コストベース
--コストベースの 4 フェーズ
---分析
---ロジックの最適化
---物理的プランニング
---コード生成
-[[遅延評価>https://techinfoofmicrosofttech.osscons.jp/in...
--実行計画は有向グラフ(DAG)として表現される。
--エグゼキュータでDAGが生成され、
--DAGを用いた遅延評価で、一回、処理が解析される。
--RDDを消費するAPIが呼ばれるとジョブが作られる。
--ドライバはエグゼキュータにジョブを転送する。
***DataFrameと言う構造化データ [#jfb944b3]
-テーブルのようなデータ構造をもった分散処理用データセット
--同名のRのデータ構造をモデルとした~
表形式データ(型と名前を持った列のテーブル)
--作業性のサポート向上を目的としている。
---フィルタ,グループ,集合演算も提供する。
---また、[[SQLライクにデータを操作>#k934e69c]]できる。
-今後は、[[RDD>Apache Spark#f411c5af]]ではなくDataFrameが...
--RDDを上回る抽象化が提供される。
--Catalyst オプティマイザでRDDの5~20倍、性能向上。
**構文 [#nd35024c]
[[LINQ>https://techinfoofmicrosofttech.osscons.jp/index.p...
***「クエリ構文」的記法 [#c33c5c17]
-SparkSessionのsqlメソッドで入力元にクエリを発行し、結果...
-若しくは、DataFrameにテーブル名を指定してSQL中で使用する...
***「メソッド構文」的記法 [#h1cb45e4]
コチラは、DataFrame操作のみの場合。
-正確にはDataFrame APIと呼ぶらしい。
-DataFrameをFluent API的なメソッド構文で操作する。
**操作 [#q7d7afae]
***基本 [#y6984111]
[[PySparkの例>PySpark#n212eaf0]]
***ユーザー定義関数(UDF) [#uf99259e]
**集計 [#w7306007]
***基本 [#j9e98b93]
[[PySparkの例>PySpark#n212eaf0]]
***ユーザー定義集計関数(UDAF) [#a15c075a]
Apache Spark 2.3 以降では [[Pandas]] UDF を使うことで UDA...
**構造化ストリーミング [#vd23a8a1]
([[Spark Structured Streaming]])
***[[Python>PySpark#x623bccd]] [#l60dfd0c]
***[[.NET>#kfcf74cc]] [#a4ef3457]
*チュートリアル [#wbb02a14]
**[[Scala>Apache Sparkチュートリアル#bb51e63c]] [#scce13f5]
**[[PySpark>PySpark#n212eaf0]] [#jae8f1f5]
***[[ファースト・ステップ的なモノ>PySpark#ffe40eef]] [#cd...
***[[DataFrameに対する様々な操作>PySpark#w64c6c2e]] [#r26...
***[[構造化ストリーミングでウィンドウ操作>PySpark#x623bcc...
***[[様々な入力元・出力先の例>PySpark#yd135245]] [#t894f7...
**.NET for Apache Spark [#la5a0981]
***[[Get started in 10 minutes>https://techinfoofmicrosof...
-データ
Hello World
This .NET app uses .NET for Apache Spark
This .NET app counts words with Apache Spark
-コード
// Create Spark session
SparkSession spark =
SparkSession
.Builder()
.AppName("word_count_sample")
.GetOrCreate();
// Create initial DataFrame
string filePath = args[0];
DataFrame dataFrame = spark.Read().Text(filePath);
// Count words
DataFrame words =
dataFrame
.Select(Split(Col("value")," ").Alias("words"))
.Select(Explode(Col("words")).Alias("word"))
.GroupBy("word")
.Count()
.OrderBy(Col("count").Desc());
// Display results
words.Show();
// Stop Spark session
spark.Stop();
-説明
--value列(既定の列名)を、半角スペースで分割して配列化、...
Select(Split(Col("value"), " ")).Alias("words"))
--Explodeで、前述の結果セットのwords列セル中の配列を行に...
.Select(Explode(Col("words"))
--前述の結果セットをword列で行数を集計し、~
集計列(列名は"count"になる)をDescでOrderByする。
.GroupBy("word").Count().OrderBy(Col("count").Desc())
--参考~
pandas documentation
---pandas.DataFrame.explode~
https://pandas.pydata.org/pandas-docs/stable/reference/ap...
***[[バッチ処理>https://techinfoofmicrosofttech.osscons.j...
-データ~
GitHubのリポジトリデータ?~
https://github.com/dotnet/spark/blob/main/examples/Micros...
|id INT|url STRING|owner_id INT|name STRING|descriptor ST...
-コード
// Create Spark session
SparkSession spark = SparkSession
.Builder()
.AppName("GitHub and Spark Batch")
.GetOrCreate();
// Create initial DataFrame
DataFrame projectsDf = spark
.Read()
.Schema("id INT, url STRING, owner_id INT, " +
"name STRING, descriptor STRING, language STRING, " +
"created_at STRING, forked_from INT, deleted STRING, " +
"updated_at STRING")
.Csv(filePath);
// Display results
projectsDf.Show();
// Drop any rows with NA values
DataFrameNaFunctions dropEmptyProjects = projectsDf.Na();
DataFrame cleanedProjects = dropEmptyProjects.Drop("any");
// Remove unnecessary columns
cleanedProjects = cleanedProjects.Drop("id", "url", "own...
// Display results
cleanedProjects.Show();
// Average number of times each language has been forked
DataFrame groupedDF = cleanedProjects
.GroupBy("language")
.Agg(Avg(cleanedProjects["forked_from"]));
// Sort by most forked languages first & Display results
groupedDF.OrderBy(Desc("avg(forked_from)")).Show();
// Defines a UDF that determines if a date is greater th...
spark.Udf().Register<string, bool>(
"MyUDF",
(date) => DateTime.TryParse(date, out DateTime convert...
(convertedDate > referenceDate));
// Use UDF to add columns to the generated TempView
cleanedProjects.CreateOrReplaceTempView("dateView");
DataFrame dateDf = spark.Sql(
"SELECT *, MyUDF(dateView.updated_at) AS datebefore FR...
// Display results
dateDf.Show();
// Stop Spark session
spark.Stop();
-説明
--初めに、
---任意のNULLまたはNaN値を含む行を削除
---不要な列の削除("id", "url", "owner_id")
--次に、各言語がフォークされた回数の平均値を求める。
---言語毎に集計
.GroupBy("language")
---集計方法はforked_from列の平均値
.Agg(Avg(cleanedProjects["forked_from"]));
---最後に集計列(列名は"avg(forked_from)"になる)をDescで...
--最後に、datebefore列(updated_atが指定の日付より大きい...
---当該処理を行う[[ユーザー定義関数(UDF)>#uf99259e]]を...
---TempViewを生成して[[クエリ構文>#c33c5c17]]を使用してUD...
***[[構造化ストリーミング>https://techinfoofmicrosofttech...
-データ~
ncからレスポンスされる文字列。
-コード
// Create Spark session
SparkSession spark = SparkSession
.Builder()
.AppName("Streaming example with a UDF")
.GetOrCreate();
// Create initial DataFrame
DataFrame lines = spark
.ReadStream()
.Format("socket")
.Option("host", hostname)
.Option("port", port)
.Load();
// UDF to produce an array
// Array includes:
// 1) original string
// 2) original string + length of original string
Func<Column, Column> udfArray =
Udf<string, string[]>((str) => new string[] { str, $"{...
// Explode array to rows
DataFrame arrayDF = lines.Select(Explode(udfArray(lines[...
// Process and display each incoming line
StreamingQuery query = arrayDF
.WriteStream()
.Format("console")
.Start();
query.AwaitTermination();
※ WriteStreamでStartして取得したQueryにAwaitTerminationし...
-説明~
--来た文字列を、{文字列, 文字列長}の~
文字列配列にするユーザー定義関数(UDF)を登録
--[[メソッド構文>#h1cb45e4]]を使用してUDFを実行
--UDFの返す配列をExplodeで行に展開して表示。
--構造化ストリーミングはループを書かずして、~
ストリーミング処理をループ的に処理することができる。
***[[ML.NETでの感情分析>https://techinfoofmicrosofttech.o...
-データ
--トレーニング~
https://github.com/dotnet/spark/blob/main/examples/Micros...
--テスト~
https://github.com/dotnet/spark/blob/main/examples/Micros...
-コード~
https://github.com/dotnet/spark/blob/main/examples/Micros...
-説明
--テストCSVをDataFrameに読み込む。
--感情分析のユーザー定義関数(UDF)を登録、
--TempViewを生成して[[クエリ構文>#c33c5c17]]を使用してUDF...
*参考 [#aaa7ffa2]
-Apache Spark 1.3リリース - DataFrame,SparkSQL,MLibを拡...
https://www.infoq.com/jp/news/2015/04/apache-spark-1.3-re...
-Spark SQL およびデータフレーム - Spark 2.2.0 ドキュメン...
http://mogile.web.fc2.com/spark/spark220/sql-programming-...
-Spark DataframeのSample Code集 - Qiita~
https://qiita.com/taka4sato/items/4ab2cf9e941599f1c0ca
**DevelopersIO [#f1fbe333]
Hadoop Advent Calendar 2016
-#16 > Spark SQLとDataFrame API入門~
https://dev.classmethod.jp/articles/hadoop-advent-calenda...
-#17 > Spark Dataset APIについて~
https://dev.classmethod.jp/articles/hadoop-advent-calenda...
**Learn | Microsoft Docs [#k9380484]
-Azure Databricks を使用した Data Engineering - ~
https://docs.microsoft.com/ja-jp/learn/paths/data-enginee...
--Azure Databricks で DataFrames を操作する~
https://docs.microsoft.com/ja-jp/learn/modules/work-dataf...
--Azure Databricks の遅延評価および~
その他のパフォーマンス機能の説明~
https://docs.microsoft.com/ja-jp/learn/modules/describe-l...
--Azure Databricks で DataFrames 列を操作する~
https://docs.microsoft.com/ja-jp/learn/modules/work-dataf...
--Azure Databricks で DataFrames の高度なメソッドを操作す...
https://docs.microsoft.com/ja-jp/learn/modules/work-dataf...
--Azure Databricks の構造化ストリーミングを~
使用してストリーミング データを処理する~
https://docs.microsoft.com/ja-jp/learn/modules/process-st...
**マイクロソフト系技術情報 Wiki [#i871e247]
-Azure Databricks > Azure Databricksチュートリアル~
https://techinfoofmicrosofttech.osscons.jp/index.php?Azur...
-.NET for Apache Spark > .NET for Apache Sparkチュートリ...
https://techinfoofmicrosofttech.osscons.jp/index.php?.NET...
ページ名: