「.NET 開発基盤部会 Wiki」は、「Open棟梁Project」,「OSSコンソーシアム .NET開発基盤部会」によって運営されています。
Apache Sparkの特性上、インタラクティブな環境の方が習得が捗る。
docker run -it -p 8888:8888 jupyter/pyspark-notebook
version: '3' services: notebook: image: jupyter/pyspark-notebook ports: - '8888:8888' environment: - GRANT_SUDO=yes - TZ=Asia/Tokyo volumes: - ./work:/home/jovyan/work - ./path:/home/jovyan/path
>docker-compose up -d※ /jovyanをマウントすると、Kernel errorになる。
[C 03:39:12.997 NotebookApp] To access the notebook, open this file in a browser: file:///home/jovyan/.local/share/jupyter/runtime/nbserver-7-open.html Or copy and paste one of these URLs: http://xxxxx:8888/?token=xxxxx or http://127.0.0.1:8888/?token=xxxxx
from pyspark.sql import SparkSession spark: SparkSession = SparkSession.builder.appName("SimpleApp").getOrCreate() # do something to prove it works spark.sql('SELECT "Test1" as c1').show()
+----+ | c1| +----+ |Test| +----+
from typing import List, Tuple from pyspark.sql import SparkSession from pyspark.sql import DataFrame from pyspark.sql.types import StructField, StructType, StringType, IntegerType Trainer = Tuple[int, str, str, int] trainers: List[Trainer] = [ (1, 'サトシ', 'male', 10), (2, 'シゲル', 'male', 10), (3, 'カスミ', 'female', 12), ] trainers_schema = StructType([ StructField('id', IntegerType(), True), StructField('name', StringType(), True), StructField('gender', StringType(), True), StructField('age', IntegerType(), True), ]) trainers_df: DataFrame = spark.createDataFrame( spark.sparkContext.parallelize(trainers), trainers_schema ) trainers_df.show()
+---+------+------+---+ | id| name|gender|age| +---+------+------+---+ | 1|サトシ| male| 10| | 2|シゲル| male| 10| | 3|カスミ|female| 12| +---+------+------+---+
result = trainers_df.collect() print(result)
[Row(id=1, name='サトシ', gender='male', age=10), Row(id=2, name='シゲル', gender='male', age=10), Row(id=3, name='カスミ', gender='female', age=12)]
trainers_df.coalesce(1).write.mode('overwrite').csv("path/to/output.csv")
>docker ps CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES xxxxxxxxxxxx jupyter/pyspark-notebook... ... >docker exec -i -t xxxxxxxxxxxx /bin/bash ... /home/jovyan/path/to を探してみると、出力を確認できる。
from pyspark.sql import SparkSession from pyspark.sql import DataFrame from pyspark.sql.types import StructField, StructType, StringType, IntegerType trainers_schema = StructType([ StructField('id', IntegerType(), True), StructField('name', StringType(), True), StructField('gender', StringType(), True), StructField('age', IntegerType(), True), ]) trainers_df: DataFrame = spark.read.format("csv")\ .options(header="false")\ .load("path/to/output.csv", schema=trainers_schema) trainers_df.show()
+---+------+------+---+ | id| name|gender|age| +---+------+------+---+ | 1|サトシ| male| 10| | 2|シゲル| male| 10| | 3|カスミ|female| 12| +---+------+------+---+
trainers_df.createOrReplaceTempView('trainers'); male_trainers_df = spark.sql(''' SELECT * FROM trainers WHERE gender = 'male' ''') male_trainers_df.show()※ トリプル・クォーテーションは改行を扱える。
male_trainers_df = trainers_df.filter(trainers_df['gender'] == 'male') male_trainers_df.show()
+---+------+------+---+ | id| name|gender|age| +---+------+------+---+ | 1|サトシ| male| 10| | 2|シゲル| male| 10| | 3|カスミ|female| 12| +---+------+------+---+ +---+------+------+---+ | id| name|gender|age| +---+------+------+---+ | 1|サトシ| male| 10| | 2|シゲル| male| 10| +---+------+------+---+
from pyspark.sql.functions import udf
@udf(StringType()) def name_with_suffix(name: str, gender: str) -> str: return name + {'male': 'くん', 'female': 'さん'}.get(gender, '氏') spark.udf.register('name_with_suffix', name_with_suffix) dearest_trainers = spark.sql(''' SELECT name_with_suffix(name, gender) as hoge FROM trainers ''') dearest_trainers.show()
+---+------+------+---+ | id| name|gender|age| +---+------+------+---+ | 1|サトシ| male| 10| | 2|シゲル| male| 10| | 3|カスミ|female| 12| +---+------+------+---+ +----------+ | hoge| +----------+ |サトシくん| |シゲルくん| |カスミさん| +----------+
Spark SQLとDataFrame APIが使用できる。
+-----+-----+ | c1| c2| +-----+-----+ |Test1|Test2| +-----+-----+
from pyspark.sql import SparkSession spark: SparkSession = SparkSession.builder.appName("SimpleApp").getOrCreate() # do something to prove it works df = spark.sql('SELECT "Test1" as c1, "Test2" as c2 ') df.show()
+-----+-----+-----+ | c1| c2| c3| +-----+-----+-----+ |Test1|Test2|XXXXX| +-----+-----+-----+
df = df.withColumn('c3', df.c2) df.show() +-----+-----+-----+ | c1| c2| c3| +-----+-----+-----+ |Test1|Test2|Test2| +-----+-----+-----+
import pyspark.sql.functions as f df = df.withColumn('c3', f.lit('hoge')) df.show() +-----+-----+-----+ | c1| c2| c3| +-----+-----+-----+ |Test1|Test2| hoge| +-----+-----+-----+
df.createOrReplaceTempView('table') df = spark.sql('select c1, c2, c2 as c3 from table') df.show()
+-----+-----+-----+ | c1| c2| hoge| +-----+-----+-----+ |Test1|Test2|XXXXX| +-----+-----+-----+
df = df.withColumnRenamed('c3', 'hoge') df.show()
df.createOrReplaceTempView('table') df = spark.sql('select c1, c2, c3 as hoge from table') df.show()
+-----+-----+ | c1| c2| +-----+-----+ |Test1|Test2| +-----+-----+
df = df.drop('hoge') df.show()
df.createOrReplaceTempView('table') df = spark.sql('select c1, c2 from table') df.show()
+---+----+ +---+----+ +---+---+---+ | c1| c21| | c1| c22| | c1|c21|c22| +---+----+ join +---+----+ = +---+---+---+ | 1| aaa| | 1| bbb| | 1|aaa|bbb| +---+----+ +---+----+ +---+---+---+
from pyspark.sql import SparkSession spark: SparkSession = SparkSession.builder.appName("SimpleApp").getOrCreate() # do something to prove it works df1 = spark.sql('SELECT "1" as c1, "aaa" as c21') df2 = spark.sql('SELECT "1" as c1, "bbb" as c22') df1.show() df2.show()
df1.join(df2, "c1", "inner").show()
df1.join(df2, df1.c1 == df2.c1, "inner").show()※ c1がダブるが...。
df1.createOrReplaceTempView('t1') df2.createOrReplaceTempView('t2') df = spark.sql('select t1.c1, t1.c21, t2.c22 from t1 inner join t2 on t1.c1 = t2.c1') df.show()
from pyspark.sql import SparkSession spark: SparkSession = SparkSession.builder.appName("SimpleApp").getOrCreate() # do something to prove it works df = spark.sql('SELECT "aaa-bbb-ccc" as c1, "XXX" as c2') df.show() +-----------+---+ | c1| c2| +-----------+---+ |aaa-bbb-ccc|XXX| +-----------+---+
import pyspark.sql.functions as f split_col = f.split(df['c1'], '-') df = df.withColumn('c2', split_col.getItem(0)) df = df.withColumn('c3', split_col.getItem(1)) df = df.withColumn('c4', split_col.getItem(2)) df.show() +-----------+---+---+---+---+ | c1| c2| d1| d2| d3| +-----------+---+---+---+---+ |aaa-bbb-ccc|XXX|aaa|bbb|ccc| +-----------+---+---+---+---+
import pyspark.sql.functions as f df = df.withColumn('c1', f.split(f.col('c1'), '-')) df = df.select(f.explode(f.col('c1')), 'c2') df = df.withColumnRenamed('col', 'c1') df.show() +---+---+ | c1| c2| +---+---+ |aaa|XXX| |bbb|XXX| |ccc|XXX| +---+---+
import pyspark.sql.functions as f df = df.withColumn('c1', f.split(f.col('c1'), '-')) df = df.select(f.posexplode(f.col('c1')), 'c2') df = df.withColumnRenamed('pos', 'index') df = df.withColumnRenamed('col', 'c1') df.show() +-----+---+---+ |index| c1| c2| +-----+---+---+ | 0|aaa|XXX| | 1|bbb|XXX| | 2|ccc|XXX| +-----+---+---+
(Sliding Window Operations in Spark Structured Streaming)
import sys from pyspark.sql import SparkSession from pyspark.sql.functions import explode from pyspark.sql.functions import split from pyspark.sql.functions import window host = "xxx.xxx.xxx.xxx" port = 9999 windowSize = 10 slideSize = 10 if slideSize > windowSize: print("<slideSize> must be less than or equal to <windowSize>", file=sys.stderr) windowDuration = '{} seconds'.format(windowSize) slideDuration = '{} seconds'.format(slideSize) spark = SparkSession\ .builder\ .appName("StructuredNetworkWordCountWindowed")\ .getOrCreate() # Create DataFrame representing the stream of input lines from connection to host:port lines = spark\ .readStream\ .format('socket')\ .option('host', host)\ .option('port', port)\ .option('includeTimestamp', 'true')\ .load() # Split the lines into words, retaining timestamps # split() splits each line into an array, and explode() turns the array into multiple rows words = lines.select( explode(split(lines.value, ' ')).alias('word'), lines.timestamp ) # Group the data by window and word and compute the count of each group windowedCounts = words.groupBy( window(words.timestamp, windowDuration, slideDuration), words.word ).count().orderBy('window') # Start running the query that prints the windowed word counts to the console query = windowedCounts\ .writeStream\ .outputMode('complete')\ .format('console')\ .option('truncate', 'false')\ .start() query.awaitTermination()
------------------------------------------- Batch: 4 ------------------------------------------- +------------------------------------------+----+-----+ |window |word|count| +------------------------------------------+----+-----+ |{2021-09-29 10:27:20, 2021-09-29 10:27:30}|444 |6 | |{2021-09-29 10:27:20, 2021-09-29 10:27:30}|111 |6 | |{2021-09-29 10:27:20, 2021-09-29 10:27:30}|222 |6 | |{2021-09-29 10:27:20, 2021-09-29 10:27:30}|333 |6 | |{2021-09-29 10:27:50, 2021-09-29 10:28:00}|ccc |4 | |{2021-09-29 10:27:50, 2021-09-29 10:28:00}|bbb |4 | |{2021-09-29 10:27:50, 2021-09-29 10:28:00}|aaa |4 | |{2021-09-29 10:27:50, 2021-09-29 10:28:00}|ddd |4 | +------------------------------------------+----+-----+古いウィンドウが蓄積していくので、
とか、そう言うのがまだ良く解らない(→ outputModeで制御するらしい)。
from time import sleep ... # query.awaitTermination() sleep(600) # 単位は秒らしい query.stop()
# Split the lines into words, retaining timestamps # split() splits each line into an array, and explode() turns the array into multiple rows words = lines.select( explode(split(lines.value, ' ')).alias('word'), lines.timestamp ).withWatermark("timestamp", "1 minute") # Group the data by window and word and compute the count of each group windowedCounts = words.groupBy( window(words.timestamp, windowDuration, slideDuration), words.word ).count() # Start running the query that prints the windowed word counts to the console query = windowedCounts\ .writeStream\ .outputMode('append')\ .format('console')\ .option('truncate', 'false')\ .start()
query = windowedCounts\ .writeStream\ .outputMode('append')\ .format("parquet")\ .option("checkpointLocation", "path/to/")\ .option("path", "path/to/")\ .option('truncate', 'false')\ .start()
kafka_stream_df = spark \ .readStream \ .format("kafka") \ .option("kafka.bootstrap.servers", 'hostname:10090,...,hostname:10099') \ .option("subscribe", "test") \ .option("startingOffsets", "earliest") \ .option("groupId", "streamtest") \ .load()
リンク先で、上記のサンプルをDatabricksで動かしてみる。
タイトルからでは解り難いが、jupyter/datascience-notebookの
/home/jovyan/ディレクトリをdocker-composeでマウントする内容。
PythonとSensorTag?, Kafka, Spark Streamingのストリーム処理