「.NET 開発基盤部会 Wiki」は、「Open棟梁Project」,「OSSコンソーシアム .NET開発基盤部会」によって運営されています。
SQLで記述できる構造化データの処理の分散処理フレームワーク
LINQ的
Hello World This .NET app uses .NET for Apache Spark This .NET app counts words with Apache Spark
// Create Spark session SparkSession spark = SparkSession .Builder() .AppName("word_count_sample") .GetOrCreate(); // Create initial DataFrame string filePath = args[0]; DataFrame dataFrame = spark.Read().Text(filePath); // Count words DataFrame words = dataFrame .Select(Split(Col("value")," ").Alias("words")) .Select(Explode(Col("words")).Alias("word")) .GroupBy("word") .Count() .OrderBy(Col("count").Desc()); // Display results words.Show(); // Stop Spark session spark.Stop();
Select(Split(Col("value"), " ")).Alias("words"))
.Select(Explode(Col("words"))
.GroupBy("word").Count().OrderBy(Col("count").Desc())
id INT | url STRING | owner_id INT | name STRING | descriptor STRING | language STRING | created_at STRING | forked_from INT | deleted STRING | updated_at STRING |
// Create Spark session SparkSession spark = SparkSession .Builder() .AppName("GitHub and Spark Batch") .GetOrCreate(); // Create initial DataFrame DataFrame projectsDf = spark .Read() .Schema("id INT, url STRING, owner_id INT, " + "name STRING, descriptor STRING, language STRING, " + "created_at STRING, forked_from INT, deleted STRING, " + "updated_at STRING") .Csv(filePath); // Display results projectsDf.Show(); // Drop any rows with NA values DataFrameNaFunctions dropEmptyProjects = projectsDf.Na(); DataFrame cleanedProjects = dropEmptyProjects.Drop("any"); // Remove unnecessary columns cleanedProjects = cleanedProjects.Drop("id", "url", "owner_id"); // Display results cleanedProjects.Show(); // Average number of times each language has been forked DataFrame groupedDF = cleanedProjects .GroupBy("language") .Agg(Avg(cleanedProjects["forked_from"])); // Sort by most forked languages first & Display results groupedDF.OrderBy(Desc("avg(forked_from)")).Show(); // Defines a UDF that determines if a date is greater than a specified date spark.Udf().Register<string, bool>( "MyUDF", (date) => DateTime.TryParse(date, out DateTime convertedDate) && (convertedDate > referenceDate)); // Use UDF to add columns to the generated TempView cleanedProjects.CreateOrReplaceTempView("dateView"); DataFrame dateDf = spark.Sql( "SELECT *, MyUDF(dateView.updated_at) AS datebefore FROM dateView"); // Display results dateDf.Show(); // Stop Spark session spark.Stop();
.GroupBy("language")
.Agg(Avg(cleanedProjects["forked_from"]));
// Create Spark session SparkSession spark = SparkSession .Builder() .AppName("Streaming example with a UDF") .GetOrCreate(); // Create initial DataFrame DataFrame lines = spark .ReadStream() .Format("socket") .Option("host", hostname) .Option("port", port) .Load(); // UDF to produce an array // Array includes: // 1) original string // 2) original string + length of original string Func<Column, Column> udfArray = Udf<string, string[]>((str) => new string[] { str, $"{str} {str.Length}" }); // Explode array to rows DataFrame arrayDF = lines.Select(Explode(udfArray(lines["value"]))); // Process and display each incoming line StreamingQuery query = arrayDF .WriteStream() .Format("console") .Start(); query.AwaitTermination();※ WriteStream?でStartして取得したQueryにAwaitTermination?して終了。