「.NET 開発基盤部会 Wiki」は、「Open棟梁Project」,「OSSコンソーシアム .NET開発基盤部会」によって運営されています。
Apache Sparkの特性上、インタラクティブな環境の方が習得が捗る。
docker run -it -p 8888:8888 jupyter/pyspark-notebook
version: '3'
services:
notebook:
image: jupyter/pyspark-notebook
ports:
- '8888:8888'
environment:
- GRANT_SUDO=yes
- TZ=Asia/Tokyo
volumes:
- ./work:/home/jovyan/work
- ./path:/home/jovyan/path>docker-compose up -d※ /jovyanをマウントすると、Kernel errorになる。
[C 03:39:12.997 NotebookApp]
To access the notebook, open this file in a browser:
file:///home/jovyan/.local/share/jupyter/runtime/nbserver-7-open.html
Or copy and paste one of these URLs:
http://xxxxx:8888/?token=xxxxx
or http://127.0.0.1:8888/?token=xxxxxfrom pyspark.sql import SparkSession
spark: SparkSession = SparkSession.builder.appName("SimpleApp").getOrCreate()
# do something to prove it works
spark.sql('SELECT "Test" as c1').show()+----+ | c1| +----+ |Test| +----+
from typing import List, Tuple
from pyspark.sql import SparkSession
from pyspark.sql import DataFrame
from pyspark.sql.types import StructField, StructType, StringType, IntegerType
Trainer = Tuple[int, str, str, int]
trainers: List[Trainer] = [
(1, 'サトシ', 'male', 10),
(2, 'シゲル', 'male', 10),
(3, 'カスミ', 'female', 12),
]
trainers_schema = StructType([
StructField('id', IntegerType(), True),
StructField('name', StringType(), True),
StructField('gender', StringType(), True),
StructField('age', IntegerType(), True),
])
trainers_df: DataFrame = spark.createDataFrame(
spark.sparkContext.parallelize(trainers),
trainers_schema
)
trainers_df.show()+---+------+------+---+ | id| name|gender|age| +---+------+------+---+ | 1|サトシ| male| 10| | 2|シゲル| male| 10| | 3|カスミ|female| 12| +---+------+------+---+
result = trainers_df.collect() print(result)
[Row(id=1, name='サトシ', gender='male', age=10), Row(id=2, name='シゲル', gender='male', age=10), Row(id=3, name='カスミ', gender='female', age=12)]
trainers_df.coalesce(1).write.mode('overwrite').csv("path/to/output.csv")>docker ps CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES xxxxxxxxxxxx jupyter/pyspark-notebook... ... >docker exec -i -t xxxxxxxxxxxx /bin/bash ... /home/jovyan/path/to を探してみると、出力を確認できる。
from pyspark.sql import SparkSession
from pyspark.sql import DataFrame
from pyspark.sql.types import StructField, StructType, StringType, IntegerType
trainers_schema = StructType([
StructField('id', IntegerType(), True),
StructField('name', StringType(), True),
StructField('gender', StringType(), True),
StructField('age', IntegerType(), True),
])
trainers_df: DataFrame = spark.read.format("csv")\
.options(header="false")\
.load("path/to/output.csv", schema=trainers_schema)
trainers_df.show()+---+------+------+---+ | id| name|gender|age| +---+------+------+---+ | 1|サトシ| male| 10| | 2|シゲル| male| 10| | 3|カスミ|female| 12| +---+------+------+---+
trainers_df.createOrReplaceTempView('trainers');
male_trainers_df = spark.sql('''
SELECT *
FROM trainers
WHERE gender = 'male'
''')
male_trainers_df.show()
※ トリプル・クォーテーションは改行を扱える。male_trainers_df = trainers_df.filter(trainers_df['gender'] == 'male') male_trainers_df.show()
+---+------+------+---+ | id| name|gender|age| +---+------+------+---+ | 1|サトシ| male| 10| | 2|シゲル| male| 10| | 3|カスミ|female| 12| +---+------+------+---+ +---+------+------+---+ | id| name|gender|age| +---+------+------+---+ | 1|サトシ| male| 10| | 2|シゲル| male| 10| +---+------+------+---+
from pyspark.sql.functions import udf
@udf(StringType())
def name_with_suffix(name: str, gender: str) -> str:
return name + {'male': 'くん', 'female': 'さん'}.get(gender, '氏')
spark.udf.register('name_with_suffix', name_with_suffix)
dearest_trainers = spark.sql('''
SELECT name_with_suffix(name, gender) as hoge
FROM trainers
''')
dearest_trainers.show()+---+------+------+---+ | id| name|gender|age| +---+------+------+---+ | 1|サトシ| male| 10| | 2|シゲル| male| 10| | 3|カスミ|female| 12| +---+------+------+---+ +----------+ | hoge| +----------+ |サトシくん| |シゲルくん| |カスミさん| +----------+
リンク先で、Databricksで動かしてみる。
コチラで上手く行かなかったモノのリベンジ。
import sys
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
from pyspark.sql.functions import window
host = "xxx.xxx.xxx.xxx"
port = 9999
windowSize = 10
slideSize = 10
if slideSize > windowSize:
print("<slideSize> must be less than or equal to <windowSize>", file=sys.stderr)
windowDuration = '{} seconds'.format(windowSize)
slideDuration = '{} seconds'.format(slideSize)
spark = SparkSession\
.builder\
.appName("StructuredNetworkWordCountWindowed")\
.getOrCreate()
# Create DataFrame representing the stream of input lines from connection to host:port
lines = spark\
.readStream\
.format('socket')\
.option('host', host)\
.option('port', port)\
.option('includeTimestamp', 'true')\
.load()
# Split the lines into words, retaining timestamps
# split() splits each line into an array, and explode() turns the array into multiple rows
words = lines.select(
explode(split(lines.value, ' ')).alias('word'),
lines.timestamp
)
# Group the data by window and word and compute the count of each group
windowedCounts = words.groupBy(
window(words.timestamp, windowDuration, slideDuration),
words.word
).count().orderBy('window')
# Start running the query that prints the windowed word counts to the console
query = windowedCounts\
.writeStream\
.outputMode('complete')\
.format('console')\
.option('truncate', 'false')\
.start()
query.awaitTermination()
...と言うのもWSL2のホストアクセスのIPアドレスは、
WindowsのIPなのでWSL2同士の通信をブリッジしない)
タイトルからでは解り難いが、jupyter/datascience-notebookの
/home/jovyan/ディレクトリをdocker-composeでマウントする内容。