「.NET 開発基盤部会 Wiki」は、「Open棟梁Project」,「OSSコンソーシアム .NET開発基盤部会」によって運営されています。
LINQ的
コチラは、DataFrame?操作のみの場合。
Apache Spark 2.3 以降では Pandas UDF を使うことで UDAF に相当する処理を書くことができるようになっている。
Hello World This .NET app uses .NET for Apache Spark This .NET app counts words with Apache Spark
// Create Spark session
SparkSession spark =
SparkSession
.Builder()
.AppName("word_count_sample")
.GetOrCreate();
// Create initial DataFrame
string filePath = args[0];
DataFrame dataFrame = spark.Read().Text(filePath);
// Count words
DataFrame words =
dataFrame
.Select(Split(Col("value")," ").Alias("words"))
.Select(Explode(Col("words")).Alias("word"))
.GroupBy("word")
.Count()
.OrderBy(Col("count").Desc());
// Display results
words.Show();
// Stop Spark session
spark.Stop();Select(Split(Col("value"), " ")).Alias("words")).Select(Explode(Col("words")).GroupBy("word").Count().OrderBy(Col("count").Desc())| id INT | url STRING | owner_id INT | name STRING | descriptor STRING | language STRING | created_at STRING | forked_from INT | deleted STRING | updated_at STRING |
// Create Spark session
SparkSession spark = SparkSession
.Builder()
.AppName("GitHub and Spark Batch")
.GetOrCreate();
// Create initial DataFrame
DataFrame projectsDf = spark
.Read()
.Schema("id INT, url STRING, owner_id INT, " +
"name STRING, descriptor STRING, language STRING, " +
"created_at STRING, forked_from INT, deleted STRING, " +
"updated_at STRING")
.Csv(filePath);
// Display results
projectsDf.Show();
// Drop any rows with NA values
DataFrameNaFunctions dropEmptyProjects = projectsDf.Na();
DataFrame cleanedProjects = dropEmptyProjects.Drop("any");
// Remove unnecessary columns
cleanedProjects = cleanedProjects.Drop("id", "url", "owner_id");
// Display results
cleanedProjects.Show();
// Average number of times each language has been forked
DataFrame groupedDF = cleanedProjects
.GroupBy("language")
.Agg(Avg(cleanedProjects["forked_from"]));
// Sort by most forked languages first & Display results
groupedDF.OrderBy(Desc("avg(forked_from)")).Show();
// Defines a UDF that determines if a date is greater than a specified date
spark.Udf().Register<string, bool>(
"MyUDF",
(date) => DateTime.TryParse(date, out DateTime convertedDate) &&
(convertedDate > referenceDate));
// Use UDF to add columns to the generated TempView
cleanedProjects.CreateOrReplaceTempView("dateView");
DataFrame dateDf = spark.Sql(
"SELECT *, MyUDF(dateView.updated_at) AS datebefore FROM dateView");
// Display results
dateDf.Show();
// Stop Spark session
spark.Stop();.GroupBy("language").Agg(Avg(cleanedProjects["forked_from"]));
// Create Spark session
SparkSession spark = SparkSession
.Builder()
.AppName("Streaming example with a UDF")
.GetOrCreate();
// Create initial DataFrame
DataFrame lines = spark
.ReadStream()
.Format("socket")
.Option("host", hostname)
.Option("port", port)
.Load();
// UDF to produce an array
// Array includes:
// 1) original string
// 2) original string + length of original string
Func<Column, Column> udfArray =
Udf<string, string[]>((str) => new string[] { str, $"{str} {str.Length}" });
// Explode array to rows
DataFrame arrayDF = lines.Select(Explode(udfArray(lines["value"])));
// Process and display each incoming line
StreamingQuery query = arrayDF
.WriteStream()
.Format("console")
.Start();
query.AwaitTermination();
※ WriteStream?でStartして取得したQueryにAwaitTermination?して終了。Hadoop Advent Calendar 2016