「.NET 開発基盤部会 Wiki」は、「Open棟梁Project」,「OSSコンソーシアム .NET開発基盤部会」によって運営されています。
LINQ的
コチラは、DataFrame?操作のみの場合。
Apache Spark 2.3 以降では Pandas UDF を使うことで UDAF に相当する処理を書くことができるようになっている。
Hello World This .NET app uses .NET for Apache Spark This .NET app counts words with Apache Spark
// Create Spark session SparkSession spark = SparkSession .Builder() .AppName("word_count_sample") .GetOrCreate(); // Create initial DataFrame string filePath = args[0]; DataFrame dataFrame = spark.Read().Text(filePath); // Count words DataFrame words = dataFrame .Select(Split(Col("value")," ").Alias("words")) .Select(Explode(Col("words")).Alias("word")) .GroupBy("word") .Count() .OrderBy(Col("count").Desc()); // Display results words.Show(); // Stop Spark session spark.Stop();
Select(Split(Col("value"), " ")).Alias("words"))
.Select(Explode(Col("words"))
.GroupBy("word").Count().OrderBy(Col("count").Desc())
id INT | url STRING | owner_id INT | name STRING | descriptor STRING | language STRING | created_at STRING | forked_from INT | deleted STRING | updated_at STRING |
// Create Spark session SparkSession spark = SparkSession .Builder() .AppName("GitHub and Spark Batch") .GetOrCreate(); // Create initial DataFrame DataFrame projectsDf = spark .Read() .Schema("id INT, url STRING, owner_id INT, " + "name STRING, descriptor STRING, language STRING, " + "created_at STRING, forked_from INT, deleted STRING, " + "updated_at STRING") .Csv(filePath); // Display results projectsDf.Show(); // Drop any rows with NA values DataFrameNaFunctions dropEmptyProjects = projectsDf.Na(); DataFrame cleanedProjects = dropEmptyProjects.Drop("any"); // Remove unnecessary columns cleanedProjects = cleanedProjects.Drop("id", "url", "owner_id"); // Display results cleanedProjects.Show(); // Average number of times each language has been forked DataFrame groupedDF = cleanedProjects .GroupBy("language") .Agg(Avg(cleanedProjects["forked_from"])); // Sort by most forked languages first & Display results groupedDF.OrderBy(Desc("avg(forked_from)")).Show(); // Defines a UDF that determines if a date is greater than a specified date spark.Udf().Register<string, bool>( "MyUDF", (date) => DateTime.TryParse(date, out DateTime convertedDate) && (convertedDate > referenceDate)); // Use UDF to add columns to the generated TempView cleanedProjects.CreateOrReplaceTempView("dateView"); DataFrame dateDf = spark.Sql( "SELECT *, MyUDF(dateView.updated_at) AS datebefore FROM dateView"); // Display results dateDf.Show(); // Stop Spark session spark.Stop();
.GroupBy("language")
.Agg(Avg(cleanedProjects["forked_from"]));
// Create Spark session SparkSession spark = SparkSession .Builder() .AppName("Streaming example with a UDF") .GetOrCreate(); // Create initial DataFrame DataFrame lines = spark .ReadStream() .Format("socket") .Option("host", hostname) .Option("port", port) .Load(); // UDF to produce an array // Array includes: // 1) original string // 2) original string + length of original string Func<Column, Column> udfArray = Udf<string, string[]>((str) => new string[] { str, $"{str} {str.Length}" }); // Explode array to rows DataFrame arrayDF = lines.Select(Explode(udfArray(lines["value"]))); // Process and display each incoming line StreamingQuery query = arrayDF .WriteStream() .Format("console") .Start(); query.AwaitTermination();※ WriteStream?でStartして取得したQueryにAwaitTermination?して終了。
Hadoop Advent Calendar 2016