「.NET 開発基盤部会 Wiki」は、「Open棟梁Project」,「OSSコンソーシアム .NET開発基盤部会」によって運営されています。
Apache Sparkの特性上、インタラクティブな環境の方が習得が捗る。
docker run -it -p 8888:8888 jupyter/pyspark-notebook
version: '3'
services:
notebook:
image: jupyter/pyspark-notebook
ports:
- '8888:8888'
environment:
- GRANT_SUDO=yes
- TZ=Asia/Tokyo
volumes:
- ./work:/home/jovyan/work
- ./path:/home/jovyan/path>docker-compose up -d※ /jovyanをマウントすると、Kernel errorになる。
[C 03:39:12.997 NotebookApp]
To access the notebook, open this file in a browser:
file:///home/jovyan/.local/share/jupyter/runtime/nbserver-7-open.html
Or copy and paste one of these URLs:
http://xxxxx:8888/?token=xxxxx
or http://127.0.0.1:8888/?token=xxxxxfrom pyspark.sql import SparkSession
spark: SparkSession = SparkSession.builder.appName("SimpleApp").getOrCreate()
# do something to prove it works
spark.sql('SELECT "Test1" as c1, "Test2" as c2, ').show()+----+ | c1| +----+ |Test| +----+
from typing import List, Tuple
from pyspark.sql import SparkSession
from pyspark.sql import DataFrame
from pyspark.sql.types import StructField, StructType, StringType, IntegerType
Trainer = Tuple[int, str, str, int]
trainers: List[Trainer] = [
(1, 'サトシ', 'male', 10),
(2, 'シゲル', 'male', 10),
(3, 'カスミ', 'female', 12),
]
trainers_schema = StructType([
StructField('id', IntegerType(), True),
StructField('name', StringType(), True),
StructField('gender', StringType(), True),
StructField('age', IntegerType(), True),
])
trainers_df: DataFrame = spark.createDataFrame(
spark.sparkContext.parallelize(trainers),
trainers_schema
)
trainers_df.show()+---+------+------+---+ | id| name|gender|age| +---+------+------+---+ | 1|サトシ| male| 10| | 2|シゲル| male| 10| | 3|カスミ|female| 12| +---+------+------+---+
result = trainers_df.collect() print(result)
[Row(id=1, name='サトシ', gender='male', age=10), Row(id=2, name='シゲル', gender='male', age=10), Row(id=3, name='カスミ', gender='female', age=12)]
trainers_df.coalesce(1).write.mode('overwrite').csv("path/to/output.csv")>docker ps CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES xxxxxxxxxxxx jupyter/pyspark-notebook... ... >docker exec -i -t xxxxxxxxxxxx /bin/bash ... /home/jovyan/path/to を探してみると、出力を確認できる。
from pyspark.sql import SparkSession
from pyspark.sql import DataFrame
from pyspark.sql.types import StructField, StructType, StringType, IntegerType
trainers_schema = StructType([
StructField('id', IntegerType(), True),
StructField('name', StringType(), True),
StructField('gender', StringType(), True),
StructField('age', IntegerType(), True),
])
trainers_df: DataFrame = spark.read.format("csv")\
.options(header="false")\
.load("path/to/output.csv", schema=trainers_schema)
trainers_df.show()+---+------+------+---+ | id| name|gender|age| +---+------+------+---+ | 1|サトシ| male| 10| | 2|シゲル| male| 10| | 3|カスミ|female| 12| +---+------+------+---+
trainers_df.createOrReplaceTempView('trainers');
male_trainers_df = spark.sql('''
SELECT *
FROM trainers
WHERE gender = 'male'
''')
male_trainers_df.show()
※ トリプル・クォーテーションは改行を扱える。male_trainers_df = trainers_df.filter(trainers_df['gender'] == 'male') male_trainers_df.show()
+---+------+------+---+ | id| name|gender|age| +---+------+------+---+ | 1|サトシ| male| 10| | 2|シゲル| male| 10| | 3|カスミ|female| 12| +---+------+------+---+ +---+------+------+---+ | id| name|gender|age| +---+------+------+---+ | 1|サトシ| male| 10| | 2|シゲル| male| 10| +---+------+------+---+
from pyspark.sql.functions import udf
@udf(StringType())
def name_with_suffix(name: str, gender: str) -> str:
return name + {'male': 'くん', 'female': 'さん'}.get(gender, '氏')
spark.udf.register('name_with_suffix', name_with_suffix)
dearest_trainers = spark.sql('''
SELECT name_with_suffix(name, gender) as hoge
FROM trainers
''')
dearest_trainers.show()+---+------+------+---+ | id| name|gender|age| +---+------+------+---+ | 1|サトシ| male| 10| | 2|シゲル| male| 10| | 3|カスミ|female| 12| +---+------+------+---+ +----------+ | hoge| +----------+ |サトシくん| |シゲルくん| |カスミさん| +----------+
Spark SQLとDataFrame APIが使用できるが、
ココでは、DataFrame? APIの利用を中心に説明する。
+-----+-----+ | c1| c2| +-----+-----+ |Test1|Test2| +-----+-----+
from pyspark.sql import SparkSession
spark: SparkSession = SparkSession.builder.appName("SimpleApp").getOrCreate()
# do something to prove it works
df = spark.sql('SELECT "Test1" as c1, "Test2" as c2 ')
df.show()+-----+-----+-----+ | c1| c2| c3| +-----+-----+-----+ |Test1|Test2|XXXXX| +-----+-----+-----+
df = df.withColumn('c3', df.c2)
df.show()
+-----+-----+-----+
| c1| c2| c3|
+-----+-----+-----+
|Test1|Test2|Test2|
+-----+-----+-----+import pyspark.sql.functions as f
df = df.withColumn('c3', f.lit('hoge'))
df.show()
+-----+-----+-----+
| c1| c2| c3|
+-----+-----+-----+
|Test1|Test2| hoge|
+-----+-----+-----+df.createOrReplaceTempView('table')
df = spark.sql('select c1, c2, c2 as c3 from table')
df.show()+-----+-----+-----+ | c1| c2| hoge| +-----+-----+-----+ |Test1|Test2|XXXXX| +-----+-----+-----+
df = df.withColumnRenamed('c3', 'hoge')
df.show()df.createOrReplaceTempView('table')
df = spark.sql('select c1, c2, c3 as hoge from table')
df.show()+-----+-----+ | c1| c2| +-----+-----+ |Test1|Test2| +-----+-----+
df = df.drop('hoge')
df.show()df.createOrReplaceTempView('table')
df = spark.sql('select c1, c2 from table')
df.show()+---+----+ +---+----+ +---+---+---+ | c1| c21| | c1| c22| | c1|c21|c22| +---+----+ join +---+----+ = +---+---+---+ | 1| aaa| | 1| bbb| | 1|aaa|bbb| +---+----+ +---+----+ +---+---+---+
from pyspark.sql import SparkSession
spark: SparkSession = SparkSession.builder.appName("SimpleApp").getOrCreate()
# do something to prove it works
df1 = spark.sql('SELECT "1" as c1, "aaa" as c21')
df2 = spark.sql('SELECT "1" as c1, "bbb" as c22')
df1.show()
df2.show() df1.join(df2, "c1", "inner").show()
df1.join(df2, df1.c1 == df2.c1, "inner").show()※ c1がダブるが...。
df1.createOrReplaceTempView('t1')
df2.createOrReplaceTempView('t2')
df = spark.sql('select t1.c1, t1.c21, t2.c22 from t1 inner join t2 on t1.c1 = t2.c1')
df.show()from pyspark.sql import SparkSession
spark: SparkSession = SparkSession.builder.appName("SimpleApp").getOrCreate()
# do something to prove it works
df = spark.sql('SELECT "aaa-bbb-ccc" as c1, "XXX" as c2')
df.show()
+-----------+---+
| c1| c2|
+-----------+---+
|aaa-bbb-ccc|XXX|
+-----------+---+import pyspark.sql.functions as f
split_col = f.split(df['c1'], '-')
df = df.withColumn('c2', split_col.getItem(0))
df = df.withColumn('c3', split_col.getItem(1))
df = df.withColumn('c4', split_col.getItem(2))
df.show()
+-----------+---+---+---+---+
| c1| c2| d1| d2| d3|
+-----------+---+---+---+---+
|aaa-bbb-ccc|XXX|aaa|bbb|ccc|
+-----------+---+---+---+---+import pyspark.sql.functions as f
df = df.withColumn('c1', f.split(f.col('c1'), '-'))
df = df.select(f.explode(f.col('c1')), 'c2')
df = df.withColumnRenamed('col', 'c1')
df.show()
+---+---+
| c1| c2|
+---+---+
|aaa|XXX|
|bbb|XXX|
|ccc|XXX|
+---+---+import pyspark.sql.functions as f
df = df.withColumn('c1', f.split(f.col('c1'), '-'))
df = df.select(f.posexplode(f.col('c1')), 'c2')
df = df.withColumnRenamed('pos', 'index')
df = df.withColumnRenamed('col', 'c1')
df.show()
+-----+---+---+
|index| c1| c2|
+-----+---+---+
| 0|aaa|XXX|
| 1|bbb|XXX|
| 2|ccc|XXX|
+-----+---+---+(Sliding Window Operations in Spark Structured Streaming)
import sys
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
from pyspark.sql.functions import window
host = "xxx.xxx.xxx.xxx"
port = 9999
windowSize = 10
slideSize = 10
if slideSize > windowSize:
print("<slideSize> must be less than or equal to <windowSize>", file=sys.stderr)
windowDuration = '{} seconds'.format(windowSize)
slideDuration = '{} seconds'.format(slideSize)
spark = SparkSession\
.builder\
.appName("StructuredNetworkWordCountWindowed")\
.getOrCreate()
# Create DataFrame representing the stream of input lines from connection to host:port
lines = spark\
.readStream\
.format('socket')\
.option('host', host)\
.option('port', port)\
.option('includeTimestamp', 'true')\
.load()
# Split the lines into words, retaining timestamps
# split() splits each line into an array, and explode() turns the array into multiple rows
words = lines.select(
explode(split(lines.value, ' ')).alias('word'),
lines.timestamp
)
# Group the data by window and word and compute the count of each group
windowedCounts = words.groupBy(
window(words.timestamp, windowDuration, slideDuration),
words.word
).count().orderBy('window')
# Start running the query that prints the windowed word counts to the console
query = windowedCounts\
.writeStream\
.outputMode('complete')\
.format('console')\
.option('truncate', 'false')\
.start()
query.awaitTermination()-------------------------------------------
Batch: 4
-------------------------------------------
+------------------------------------------+----+-----+
|window |word|count|
+------------------------------------------+----+-----+
|{2021-09-29 10:27:20, 2021-09-29 10:27:30}|444 |6 |
|{2021-09-29 10:27:20, 2021-09-29 10:27:30}|111 |6 |
|{2021-09-29 10:27:20, 2021-09-29 10:27:30}|222 |6 |
|{2021-09-29 10:27:20, 2021-09-29 10:27:30}|333 |6 |
|{2021-09-29 10:27:50, 2021-09-29 10:28:00}|ccc |4 |
|{2021-09-29 10:27:50, 2021-09-29 10:28:00}|bbb |4 |
|{2021-09-29 10:27:50, 2021-09-29 10:28:00}|aaa |4 |
|{2021-09-29 10:27:50, 2021-09-29 10:28:00}|ddd |4 |
+------------------------------------------+----+-----+
古いウィンドウが蓄積していくので、
とか、そう言うのがまだ良く解らない(→ outputModeで制御するらしい)。
query = windowedCounts \
.writeStream \
.format("csv") \
.option("path", "stream_output") \
.start()
query.awaitTermination()query.awaitTermination() sleep(600) # 単位は秒らしい query.stop()
kafka_stream_df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", '172.26.0.1:10090,...,172.26.0.1:10099') \
.option("subscribe", "test") \
.option("startingOffsets", "earliest") \
.option("groupId", "streamtest") \
.load()リンク先で、上記のサンプルをDatabricksで動かしてみる。
タイトルからでは解り難いが、jupyter/datascience-notebookの
/home/jovyan/ディレクトリをdocker-composeでマウントする内容。
PythonとSensorTag?, Kafka, Spark Streamingのストリーム処理