「.NET 開発基盤部会 Wiki」は、「Open棟梁Project」,「OSSコンソーシアム .NET開発基盤部会」によって運営されています。
SQLで記述できる構造化データの処理の分散処理フレームワーク
LINQ的
SparkSession?のsqlメソッドでクエリを発行し結果を DataFrame?として返す。
Hello World This .NET app uses .NET for Apache Spark This .NET app counts words with Apache Spark
// Create Spark session
SparkSession spark =
SparkSession
.Builder()
.AppName("word_count_sample")
.GetOrCreate();
// Create initial DataFrame
string filePath = args[0];
DataFrame dataFrame = spark.Read().Text(filePath);
// Count words
DataFrame words =
dataFrame
.Select(Split(Col("value")," ").Alias("words"))
.Select(Explode(Col("words")).Alias("word"))
.GroupBy("word")
.Count()
.OrderBy(Col("count").Desc());
// Display results
words.Show();
// Stop Spark session
spark.Stop();Select(Split(Col("value"), " ")).Alias("words")).Select(Explode(Col("words")).GroupBy("word").Count().OrderBy(Col("count").Desc())| id INT | url STRING | owner_id INT | name STRING | descriptor STRING | language STRING | created_at STRING | forked_from INT | deleted STRING | updated_at STRING |
// Create Spark session
SparkSession spark = SparkSession
.Builder()
.AppName("GitHub and Spark Batch")
.GetOrCreate();
// Create initial DataFrame
DataFrame projectsDf = spark
.Read()
.Schema("id INT, url STRING, owner_id INT, " +
"name STRING, descriptor STRING, language STRING, " +
"created_at STRING, forked_from INT, deleted STRING, " +
"updated_at STRING")
.Csv(filePath);
// Display results
projectsDf.Show();
// Drop any rows with NA values
DataFrameNaFunctions dropEmptyProjects = projectsDf.Na();
DataFrame cleanedProjects = dropEmptyProjects.Drop("any");
// Remove unnecessary columns
cleanedProjects = cleanedProjects.Drop("id", "url", "owner_id");
// Display results
cleanedProjects.Show();
// Average number of times each language has been forked
DataFrame groupedDF = cleanedProjects
.GroupBy("language")
.Agg(Avg(cleanedProjects["forked_from"]));
// Sort by most forked languages first & Display results
groupedDF.OrderBy(Desc("avg(forked_from)")).Show();
// Defines a UDF that determines if a date is greater than a specified date
spark.Udf().Register<string, bool>(
"MyUDF",
(date) => DateTime.TryParse(date, out DateTime convertedDate) &&
(convertedDate > referenceDate));
// Use UDF to add columns to the generated TempView
cleanedProjects.CreateOrReplaceTempView("dateView");
DataFrame dateDf = spark.Sql(
"SELECT *, MyUDF(dateView.updated_at) AS datebefore FROM dateView");
// Display results
dateDf.Show();
// Stop Spark session
spark.Stop();.GroupBy("language").Agg(Avg(cleanedProjects["forked_from"]));
// Create Spark session
SparkSession spark = SparkSession
.Builder()
.AppName("Streaming example with a UDF")
.GetOrCreate();
// Create initial DataFrame
DataFrame lines = spark
.ReadStream()
.Format("socket")
.Option("host", hostname)
.Option("port", port)
.Load();
// UDF to produce an array
// Array includes:
// 1) original string
// 2) original string + length of original string
Func<Column, Column> udfArray =
Udf<string, string[]>((str) => new string[] { str, $"{str} {str.Length}" });
// Explode array to rows
DataFrame arrayDF = lines.Select(Explode(udfArray(lines["value"])));
// Process and display each incoming line
StreamingQuery query = arrayDF
.WriteStream()
.Format("console")
.Start();
query.AwaitTermination();
※ WriteStream?でStartして取得したQueryにAwaitTermination?して終了。Hadoop Advent Calendar 2016