これらのキーワードがハイライトされています:PySpark
「.NET 開発基盤部会 Wiki」は、「Open棟梁Project」,「OSSコンソーシアム .NET開発基盤部会」によって運営されています。
目次 †
概要 †
詳細 †
環境:Notebook系 †
Apache Sparkの特性上、インタラクティブな環境の方が習得が捗る。
→ チュートリアル
→ チュートリアル
チュートリアル †
- docker-compose up
>docker-compose up
※ /jovyanをマウントすると、Kernel errorになる。
多分、jovyan直下に必要なファイルが存在している。
- Tuple&Listの二次元表からDataFrame?を生成する。
- 「In []:」に以下を貼り付けて RUN [▶] を押下する。
from typing import List, Tuple
from pyspark.sql import SparkSession
from pyspark.sql import DataFrame
from pyspark.sql.types import StructField, StructType, StringType, IntegerType
Trainer = Tuple[int, str, str, int]
trainers: List[Trainer] = [
(1, 'サトシ', 'male', 10),
(2, 'シゲル', 'male', 10),
(3, 'カスミ', 'female', 12),
]
trainers_schema = StructType([
StructField('id', IntegerType(), True),
StructField('name', StringType(), True),
StructField('gender', StringType(), True),
StructField('age', IntegerType(), True),
])
trainers_df: DataFrame = spark.createDataFrame(
spark.sparkContext.parallelize(trainers),
trainers_schema
)
trainers_df.show()
- DataFrame? を Row の list で返す。
- 以下のような結果が表示され、動作が確認できる。
[Row(id=1, name='サトシ', gender='male', age=10), Row(id=2, name='シゲル', gender='male', age=10), Row(id=3, name='カスミ', gender='female', age=12)]
- CSV入出力
WSL2に乗り換えたら、出力ができなくなったので、
複雑な検証を行う場合は、潔くVMでやったほうが良いかも。
- 「In []:」に以下を貼り付けて RUN [▶] を押下する。
from pyspark.sql import SparkSession
from pyspark.sql import DataFrame
from pyspark.sql.types import StructField, StructType, StringType, IntegerType
trainers_schema = StructType([
StructField('id', IntegerType(), True),
StructField('name', StringType(), True),
StructField('gender', StringType(), True),
StructField('age', IntegerType(), True),
])
trainers_df: DataFrame = spark.read.format("csv")\
.options(header="false")\
.load("path/to/output.csv", schema=trainers_schema)
trainers_df.show()
- 前述のtrainers_df.show()以降に以下を追記して RUN [▶] を押下する。
- 以下は、ユーザー定義関数(UDF)を使用した操作。
- trainers_df.show()以降に以下を追記して RUN [▶] を押下する。
@udf(StringType())
def name_with_suffix(name: str, gender: str) -> str:
return name + {'male': 'くん', 'female': 'さん'}.get(gender, '氏')
spark.udf.register('name_with_suffix', name_with_suffix)
dearest_trainers = spark.sql('''
SELECT name_with_suffix(name, gender) as hoge
FROM trainers
''')
dearest_trainers.show()
DataFrame?に対する様々な操作 †
Spark SQLとDataFrame APIが使用できる。
- DataFrame? API
前述のTuple&Listの二次元表からDataFrame?を生成する例等。
- DataFrame?の生成
from pyspark.sql import SparkSession
spark: SparkSession = SparkSession.builder.appName("SimpleApp").getOrCreate()
# do something to prove it works
df1 = spark.sql('SELECT "1" as c1, "aaa" as c21')
df2 = spark.sql('SELECT "1" as c1, "bbb" as c22')
df1.show()
df2.show()
- 列に分割
分割したデータを列に追加し、
必要に応じて、c1カラムを削除する。
import pyspark.sql.functions as f
split_col = f.split(df['c1'], '-')
df = df.withColumn('c2', split_col.getItem(0))
df = df.withColumn('c3', split_col.getItem(1))
df = df.withColumn('c4', split_col.getItem(2))
df.show()
+-----------+---+---+---+---+
| c1| c2| d1| d2| d3|
+-----------+---+---+---+---+
|aaa-bbb-ccc|XXX|aaa|bbb|ccc|
+-----------+---+---+---+---+
- 行に分割
分割したデータを列に追加し、
他のカラムには同じ値を入れる。
- explode
import pyspark.sql.functions as f
df = df.withColumn('c1', f.split(f.col('c1'), '-'))
df = df.select(f.explode(f.col('c1')), 'c2')
df = df.withColumnRenamed('col', 'c1')
df.show()
+---+---+
| c1| c2|
+---+---+
|aaa|XXX|
|bbb|XXX|
|ccc|XXX|
+---+---+
- posexplode
import pyspark.sql.functions as f
df = df.withColumn('c1', f.split(f.col('c1'), '-'))
df = df.select(f.posexplode(f.col('c1')), 'c2')
df = df.withColumnRenamed('pos', 'index')
df = df.withColumnRenamed('col', 'c1')
df.show()
+-----+---+---+
|index| c1| c2|
+-----+---+---+
| 0|aaa|XXX|
| 1|bbb|XXX|
| 2|ccc|XXX|
+-----+---+---+
(Sliding Window Operations in Spark Structured Streaming)
- コチラで上手く行かなかったモノのリベンジ。
https://github.com/apache/spark/blob/master/examples/src/main/python/sql/streaming/structured_network_wordcount_windowed.py
import sys
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
from pyspark.sql.functions import window
host = "xxx.xxx.xxx.xxx"
port = 9999
windowSize = 10
slideSize = 10
if slideSize > windowSize:
print("<slideSize> must be less than or equal to <windowSize>", file=sys.stderr)
windowDuration = '{} seconds'.format(windowSize)
slideDuration = '{} seconds'.format(slideSize)
spark = SparkSession\
.builder\
.appName("StructuredNetworkWordCountWindowed")\
.getOrCreate()
# Create DataFrame representing the stream of input lines from connection to host:port
lines = spark\
.readStream\
.format('socket')\
.option('host', host)\
.option('port', port)\
.option('includeTimestamp', 'true')\
.load()
# Split the lines into words, retaining timestamps
# split() splits each line into an array, and explode() turns the array into multiple rows
words = lines.select(
explode(split(lines.value, ' ')).alias('word'),
lines.timestamp
)
# Group the data by window and word and compute the count of each group
windowedCounts = words.groupBy(
window(words.timestamp, windowDuration, slideDuration),
words.word
).count().orderBy('window')
# Start running the query that prints the windowed word counts to the console
query = windowedCounts\
.writeStream\
.outputMode('complete')\
.format('console')\
.option('truncate', 'false')\
.start()
query.awaitTermination()
- 接続
java.net.ConnectException?: Connection refused (Connection refused)が発生。
- WSL / WSL2でncコマンドは使えるが、
・WSLでは、Windows上で動作しており、
・WSL2では、LinuxのVM上で動作している。
- WSL2同士の場合、localhost(WSL2のVM)であれば通信をブリッジできる。
- WSL2で上記のサンプルを実行する場合、
以下の様にする事で、繋がるようになる。
- Docker Desktop for WSL2は使用しない。
- WSL2のターミナルを起動し、WSL2の中からコンテナを起動する。
- もう一枚、WSL2のターミナルを起動しncする。
- 起動したコンテナの中に入りホストアクセスのIPアドレスを取得する。
- 取得したIPアドレスを使用してコンテナからWSL2のncへアクセスする。
- 疎通確認後、ココで使用したIPアドレスを上記のコード中に埋め込む。
- 結果
以下のように動作した。
-------------------------------------------
Batch: 4
-------------------------------------------
+------------------------------------------+----+-----+
|window |word|count|
+------------------------------------------+----+-----+
|{2021-09-29 10:27:20, 2021-09-29 10:27:30}|444 |6 |
|{2021-09-29 10:27:20, 2021-09-29 10:27:30}|111 |6 |
|{2021-09-29 10:27:20, 2021-09-29 10:27:30}|222 |6 |
|{2021-09-29 10:27:20, 2021-09-29 10:27:30}|333 |6 |
|{2021-09-29 10:27:50, 2021-09-29 10:28:00}|ccc |4 |
|{2021-09-29 10:27:50, 2021-09-29 10:28:00}|bbb |4 |
|{2021-09-29 10:27:50, 2021-09-29 10:28:00}|aaa |4 |
|{2021-09-29 10:27:50, 2021-09-29 10:28:00}|ddd |4 |
+------------------------------------------+----+-----+
古いウィンドウが蓄積していくので、
- ソレをどうやって削除するか?とか、
- どうやってウィンドウ毎にファイル出力するか?
とか、そう言うのがまだ良く解らない(→ outputModeで制御するらしい)。
- 追加モード
追加モードでは、withWatermarkが必要。
ただし、orderBy('window')は使えなくなる。
# Split the lines into words, retaining timestamps
# split() splits each line into an array, and explode() turns the array into multiple rows
words = lines.select(
explode(split(lines.value, ' ')).alias('word'),
lines.timestamp
).withWatermark("timestamp", "1 minute")
# Group the data by window and word and compute the count of each group
windowedCounts = words.groupBy(
window(words.timestamp, windowDuration, slideDuration),
words.word
).count()
# Start running the query that prints the windowed word counts to the console
query = windowedCounts\
.writeStream\
.outputMode('append')\
.format('console')\
.option('truncate', 'false')\
.start()
- CSV出力
以下で行けるはずだが、WSL2にしたら出力できなくなった。
query = windowedCounts\
.writeStream\
.outputMode('append')\
.format("parquet")\
.option("checkpointLocation", "path/to/")\
.option("path", "path/to/")\
.option('truncate', 'false')\
.start()
様々な入力元・出力先の例 †
- Parquetファイル
コチラで説明済み(入力のみ)。
- 構造化ストリーミングの場合
- 入力:Scalaの入力サンプルをPySpark的に書く。
- kafka
kafka_stream_df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", 'hostname:10090,...,hostname:10099') \
.option("subscribe", "test") \
.option("startingOffsets", "earliest") \
.option("groupId", "streamtest") \
.load()
※ Kafkaチュートリアルで作成した環境と(WSL2上のコンテナ同士を)接続できなかった(何故?)。
リンク先で、上記のサンプルをDatabricksで動かしてみる。
参考 †
環境構築 †
構造化ストリーミング †
EnsekiTT Blog †
CUBE SUGAR CONTAINER †
Simple is Beautiful. †
Qiita †
masato †
PythonとSensorTag?, Kafka, Spark Streamingのストリーム処理
Stack Overflow †
|