- 追加された行はこの色です。
- 削除された行はこの色です。
「[[.NET 開発基盤部会 Wiki>http://dotnetdevelopmentinfrastructure.osscons.jp]]」は、「[[Open棟梁Project>https://github.com/OpenTouryoProject/]]」,「[[OSSコンソーシアム .NET開発基盤部会>https://www.osscons.jp/dotNetDevelopmentInfrastructure/]]」によって運営されています。
-[[戻る>Apache Spark]]
-戻る
--[[Python]]
--[[Apache Spark]] > [[Spark SQL]]
*目次 [#k30dfef7]
#contents
*概要 [#r180bc95]
[[.NET for Apache Spark>Apache Spark#a2ce8cf5]]のチュートリアル用環境で動作したので。
-[[Apache Spark]]の[[Python]]言語バインディング
-[[.NET for Apache Spark>Apache Spark#a2ce8cf5]]のチュートリアル用環境で動作したので。
*詳細 [#i5025fa2]
**Jupyter Notebook [#y2664b01]
-Jupyter Notebookドキュメントを作成・共有するためのウェブアプリケーション
-プログラムコード、Markdownテキスト、数式、図式等を含むことができる。
**環境:Notebook系 [#xa89d7e4]
[[Apache Spark]]の特性上、インタラクティブな環境の方が習得が捗る。
-そのため、以下のように表現される。
--PythonなどをWebブラウザ上で記述・実行できる統合開発環境
--ブラウザ上で Python やその他のプログラミング言語のプログラムを~
実行したり、実行した結果を保存したり共有したりすることができるツール
***[[Jupyter Notebook]] [#y2664b01]
→ [[チュートリアル>#ffe40eef]]
-参考
--Project Jupyter - Wikipedia~
https://ja.wikipedia.org/wiki/Project_Jupyter
***[[JupyterLab]] [#vc859f33]
→ [[チュートリアル>#ffe40eef]]
***[[Azure Databricks>#p629aafe]] [#f0ad6acf]
**チュートリアル [#n212eaf0]
***on Docker [#ffe40eef]
-ローカルでもできそうだったが、手軽そうな[[Docker]]をチョイス。
***on [[Jupyter>統合開発環境 (IDE)#x1ebfeb7]] on [[Docker]] [#ffe40eef]
-[[Jupyter>統合開発環境 (IDE)#x1ebfeb7]]で手軽に実行できる。
-以下、[[Qiitaの「Apache Spark を Jupyter Notebook で試す (on ローカル Docker」>#f615d0ec]]を参考にした。
-ローカルでもできそうだったが、手軽そうな[[Docker]]をチョイス。~
ローカルだと、[[Jupyter>統合開発環境 (IDE)#x1ebfeb7]]に、別途、PySparkを入れる必要がある。
-[[Docker]]前提で(ここでは、[[Docker for Windows>https://techinfoofmicrosofttech.osscons.jp/index.php?Docker%20for%20Windows]]を使用した)
-[[Qiitaの「Apache Spark を Jupyter Notebook で試す (on ローカル Docker」>#f615d0ec]]を参考にした。
--環境のコンテナを
-[[Docker]]前提で(ここでは、[[Docker for Windows>https://techinfoofmicrosofttech.osscons.jp/index.php?Docker%20for%20Windows]]を使用した)環境のコンテナを
---docker run する場合
--docker run で起動する場合
docker run -it -p 8888:8888 jupyter/pyspark-notebook
---docker-compose を使う場合~
・docker-compose.yaml
--docker-compose で起動する場合
---docker-compose.yaml
version: '3'
services:
env:
image: jupyter/datascience-notebook
environment:
LANG: C.UTF-8
expose:
- "8888"
ports:
- "8888:8888"
volumes:
- ./:/home/jovyan/
~
・docker-compose up
>docker-compose up -d
notebook:
image: jupyter/pyspark-notebook
ports:
- '8888:8888'
environment:
- GRANT_SUDO=yes
- TZ=Asia/Tokyo
volumes:
- ./work:/home/jovyan/work
- ./path:/home/jovyan/path
---docker-compose up
>docker-compose up
※ /jovyanをマウントすると、Kernel errorになる。~
多分、jovyan直下に必要なファイルが存在している。
-最後に表示されたURLにブラウザからアクセスする。
[C 03:39:12.997 NotebookApp]
To access the notebook, open this file in a browser:
file:///home/jovyan/.local/share/jupyter/runtime/nbserver-7-open.html
Or copy and paste one of these URLs:
http://xxxxx:8888/?token=xxxxx
or http://127.0.0.1:8888/?token=xxxxx
-[[Jupyter Notebook>#y2664b01]]が表示されるので、~
New から Notebook: Python3 を選択し Notebook を開く。
New から Notebook: Python3 を選択し Notebook を開く。~
(URLをちょっと変えると[[JupyterLab>#vc859f33]]が起動する)
-ハロー・ワールド風
--「In []:」に以下を貼り付けて RUN [▶] を押下する。
from pyspark.sql import SparkSession
spark: SparkSession = SparkSession.builder.appName("SimpleApp").getOrCreate()
# do something to prove it works
spark.sql('SELECT "Test" as c1').show()
spark.sql('SELECT "Test1" as c1').show()
--以下のような結果が表示され、動作が確認できる。
+----+
| c1|
+----+
|Test|
+----+
--[[Jupyter Notebook>#y2664b01]]では、Session?が維持されるので、~
以降の処理で、SparkSessionを再度定義する必要は無い。
-Tuple&Listの二次元表からDataFrameを生成する。
--「In []:」に以下を貼り付けて RUN [▶] を押下する。
from typing import List, Tuple
from pyspark.sql import SparkSession
from pyspark.sql import DataFrame
from pyspark.sql.types import StructField, StructType, StringType, IntegerType
Trainer = Tuple[int, str, str, int]
trainers: List[Trainer] = [
(1, 'サトシ', 'male', 10),
(2, 'シゲル', 'male', 10),
(3, 'カスミ', 'female', 12),
]
trainers_schema = StructType([
StructField('id', IntegerType(), True),
StructField('name', StringType(), True),
StructField('gender', StringType(), True),
StructField('age', IntegerType(), True),
])
trainers_df: DataFrame = spark.createDataFrame(
spark.sparkContext.parallelize(trainers),
trainers_schema
)
trainers_df.show()
--以下のような結果が表示され、動作が確認できる。
+---+------+------+---+
| id| name|gender|age|
+---+------+------+---+
| 1|サトシ| male| 10|
| 2|シゲル| male| 10|
| 3|カスミ|female| 12|
+---+------+------+---+
-DataFrame を Row の list で返す。
--前述のtrainers_df.show()以降に以下を追記して RUN [▶] を押下する。
result = trainers_df.collect()
print(result)
--以下のような結果が表示され、動作が確認できる。
[Row(id=1, name='サトシ', gender='male', age=10), Row(id=2, name='シゲル', gender='male', age=10), Row(id=3, name='カスミ', gender='female', age=12)]
-入出力
-CSV入出力~
WSL2に乗り換えたら、出力ができなくなったので、~
複雑な検証を行う場合は、潔くVMでやったほうが良いかも。
--CSV出力
--出力
---前述のtrainers_df.show()以降に以下を追記して RUN [▶] を押下する。
trainers_df.coalesce(1).write.mode('overwrite').csv("path/to/output.csv")
---CSV出力の確認~
・コンテナに入って確認する。
>docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
xxxxxxxxxxxx jupyter/pyspark-notebook...
...
>docker exec -i -t xxxxxxxxxxxx /bin/bash
... /home/jovyan/path/to を探してみると、出力を確認できる。
~
・docker-compose.yamlのvolumesに指定した~
フォルダを探してみると、出力を確認できる。~
~
・[[Jupyter Notebook>#y2664b01]](Files)からも確認できる。~
http://127.0.0.1:8888/tree#notebooks
--CSV入力~
上記ファイルを入力する方法は...、
---「In []:」に以下を貼り付けて RUN [▶] を押下する。
from pyspark.sql import SparkSession
from pyspark.sql import DataFrame
from pyspark.sql.types import StructField, StructType, StringType, IntegerType
trainers_schema = StructType([
StructField('id', IntegerType(), True),
StructField('name', StringType(), True),
StructField('gender', StringType(), True),
StructField('age', IntegerType(), True),
])
trainers_df: DataFrame = spark.read.format("csv")\
.options(header="false")\
.load("path/to/output.csv", schema=trainers_schema)
trainers_df.show()
---以下のような結果が表示され、動作が確認できる。
+---+------+------+---+
| id| name|gender|age|
+---+------+------+---+
| 1|サトシ| male| 10|
| 2|シゲル| male| 10|
| 3|カスミ|female| 12|
+---+------+------+---+
-以下は、[[Spark SQLとDataFrame API>Spark SQL]]を使用した操作。
--前述のtrainers_df.show()以降に以下を追記して RUN [▶] を押下する。
---Spark SQLを使用する。
---[[Spark SQL]]を使用する。
trainers_df.createOrReplaceTempView('trainers');
male_trainers_df = spark.sql('''
SELECT *
FROM trainers
WHERE gender = 'male'
''')
male_trainers_df.show()
※ トリプル・クォーテーションは改行を扱える。
---DataFrame APIを使用する。
male_trainers_df = trainers_df.filter(trainers_df['gender'] == 'male')
male_trainers_df.show()
--以下のような結果が表示され、動作が確認できる。
+---+------+------+---+
| id| name|gender|age|
+---+------+------+---+
| 1|サトシ| male| 10|
| 2|シゲル| male| 10|
| 3|カスミ|female| 12|
+---+------+------+---+
+---+------+------+---+
| id| name|gender|age|
+---+------+------+---+
| 1|サトシ| male| 10|
| 2|シゲル| male| 10|
+---+------+------+---+
-以下は、ユーザー定義関数(UDF)を使用した操作。
--前述に
---ユーザー定義関数(UDF)のimportを追加し、
from pyspark.sql.functions import udf
---trainers_df.show()以降に以下を追記して RUN [▶] を押下する。
@udf(StringType())
def name_with_suffix(name: str, gender: str) -> str:
return name + {'male': 'くん', 'female': 'さん'}.get(gender, '氏')
spark.udf.register('name_with_suffix', name_with_suffix)
dearest_trainers = spark.sql('''
SELECT name_with_suffix(name, gender) as hoge
FROM trainers
''')
dearest_trainers.show()
--以下のような結果が表示され、動作が確認できる。
+---+------+------+---+
| id| name|gender|age|
+---+------+------+---+
| 1|サトシ| male| 10|
| 2|シゲル| male| 10|
| 3|カスミ|female| 12|
+---+------+------+---+
+----------+
| hoge|
+----------+
|サトシくん|
|シゲルくん|
|カスミさん|
+----------+
***DataFrameに対する様々な操作 [#w64c6c2e]
[[Spark SQLとDataFrame API>Spark SQL]]が使用できる。
-DataFrameの生成
+-----+-----+
| c1| c2|
+-----+-----+
|Test1|Test2|
+-----+-----+
--DataFrame API~
[[前述>#ffe40eef]]のTuple&Listの二次元表からDataFrameを生成する例等。
--Spark SQL
from pyspark.sql import SparkSession
spark: SparkSession = SparkSession.builder.appName("SimpleApp").getOrCreate()
# do something to prove it works
df = spark.sql('SELECT "Test1" as c1, "Test2" as c2 ')
df.show()
-カラムの追加~
DataFrame や、それを支える内部的な RDD はイミュータブル (不変) なオブジェクトになっている。~
そのため、カラムを追加するときは既存のオブジェクトを変更するのではなく、新たなオブジェクトを作ることになる。
+-----+-----+-----+
| c1| c2| c3|
+-----+-----+-----+
|Test1|Test2|XXXXX|
+-----+-----+-----+
--DataFrame API
---指定列の値
df = df.withColumn('c3', df.c2)
df.show()
+-----+-----+-----+
| c1| c2| c3|
+-----+-----+-----+
|Test1|Test2|Test2|
+-----+-----+-----+
---リテラル値
import pyspark.sql.functions as f
df = df.withColumn('c3', f.lit('hoge'))
df.show()
+-----+-----+-----+
| c1| c2| c3|
+-----+-----+-----+
|Test1|Test2| hoge|
+-----+-----+-----+
--Spark SQL
df.createOrReplaceTempView('table')
df = spark.sql('select c1, c2, c2 as c3 from table')
df.show()
-カラムのリネーム
+-----+-----+-----+
| c1| c2| hoge|
+-----+-----+-----+
|Test1|Test2|XXXXX|
+-----+-----+-----+
--DataFrame API
df = df.withColumnRenamed('c3', 'hoge')
df.show()
--Spark SQL
df.createOrReplaceTempView('table')
df = spark.sql('select c1, c2, c3 as hoge from table')
df.show()
-カラムの削除
+-----+-----+
| c1| c2|
+-----+-----+
|Test1|Test2|
+-----+-----+
--DataFrame API
df = df.drop('hoge')
df.show()
--Spark SQL
df.createOrReplaceTempView('table')
df = spark.sql('select c1, c2 from table')
df.show()
-テーブルの結合
+---+----+ +---+----+ +---+---+---+
| c1| c21| | c1| c22| | c1|c21|c22|
+---+----+ join +---+----+ = +---+---+---+
| 1| aaa| | 1| bbb| | 1|aaa|bbb|
+---+----+ +---+----+ +---+---+---+
--DataFrameの生成
from pyspark.sql import SparkSession
spark: SparkSession = SparkSession.builder.appName("SimpleApp").getOrCreate()
# do something to prove it works
df1 = spark.sql('SELECT "1" as c1, "aaa" as c21')
df2 = spark.sql('SELECT "1" as c1, "bbb" as c22')
df1.show()
df2.show()
--DataFrame API
---カラム名が同じ
df1.join(df2, "c1", "inner").show()
---カラム名が異なる
df1.join(df2, df1.c1 == df2.c1, "inner").show()
※ &color(red){c1がダブるが...。};
--Spark SQL
df1.createOrReplaceTempView('t1')
df2.createOrReplaceTempView('t2')
df = spark.sql('select t1.c1, t1.c21, t2.c22 from t1 inner join t2 on t1.c1 = t2.c1')
df.show()
-セルの分割
--DataFrameの生成
from pyspark.sql import SparkSession
spark: SparkSession = SparkSession.builder.appName("SimpleApp").getOrCreate()
# do something to prove it works
df = spark.sql('SELECT "aaa-bbb-ccc" as c1, "XXX" as c2')
df.show()
+-----------+---+
| c1| c2|
+-----------+---+
|aaa-bbb-ccc|XXX|
+-----------+---+
--列に分割~
分割したデータを列に追加し、~
必要に応じて、c1カラムを削除する。
import pyspark.sql.functions as f
split_col = f.split(df['c1'], '-')
df = df.withColumn('c2', split_col.getItem(0))
df = df.withColumn('c3', split_col.getItem(1))
df = df.withColumn('c4', split_col.getItem(2))
df.show()
+-----------+---+---+---+---+
| c1| c2| d1| d2| d3|
+-----------+---+---+---+---+
|aaa-bbb-ccc|XXX|aaa|bbb|ccc|
+-----------+---+---+---+---+
--行に分割~
分割したデータを列に追加し、~
他のカラムには同じ値を入れる。
---explode
import pyspark.sql.functions as f
df = df.withColumn('c1', f.split(f.col('c1'), '-'))
df = df.select(f.explode(f.col('c1')), 'c2')
df = df.withColumnRenamed('col', 'c1')
df.show()
+---+---+
| c1| c2|
+---+---+
|aaa|XXX|
|bbb|XXX|
|ccc|XXX|
+---+---+
---posexplode
import pyspark.sql.functions as f
df = df.withColumn('c1', f.split(f.col('c1'), '-'))
df = df.select(f.posexplode(f.col('c1')), 'c2')
df = df.withColumnRenamed('pos', 'index')
df = df.withColumnRenamed('col', 'c1')
df.show()
+-----+---+---+
|index| c1| c2|
+-----+---+---+
| 0|aaa|XXX|
| 1|bbb|XXX|
| 2|ccc|XXX|
+-----+---+---+
***[[構造化ストリーミング>#m0435b58]]でウィンドウ操作 [#x623bccd]
(Sliding Window Operations in Spark Structured Streaming)
-[[コチラ>https://techinfoofmicrosofttech.osscons.jp/index.php?.NET%20for%20Apache%20Spark%E3%83%81%E3%83%A5%E3%83%BC%E3%83%88%E3%83%AA%E3%82%A2%E3%83%AB#w3d503ab]]で上手く行かなかったモノのリベンジ。~
https://github.com/apache/spark/blob/master/examples/src/main/python/sql/streaming/structured_network_wordcount_windowed.py
import sys
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
from pyspark.sql.functions import window
host = "xxx.xxx.xxx.xxx"
port = 9999
windowSize = 10
slideSize = 10
if slideSize > windowSize:
print("<slideSize> must be less than or equal to <windowSize>", file=sys.stderr)
windowDuration = '{} seconds'.format(windowSize)
slideDuration = '{} seconds'.format(slideSize)
spark = SparkSession\
.builder\
.appName("StructuredNetworkWordCountWindowed")\
.getOrCreate()
# Create DataFrame representing the stream of input lines from connection to host:port
lines = spark\
.readStream\
.format('socket')\
.option('host', host)\
.option('port', port)\
.option('includeTimestamp', 'true')\
.load()
# Split the lines into words, retaining timestamps
# split() splits each line into an array, and explode() turns the array into multiple rows
words = lines.select(
explode(split(lines.value, ' ')).alias('word'),
lines.timestamp
)
# Group the data by window and word and compute the count of each group
windowedCounts = words.groupBy(
window(words.timestamp, windowDuration, slideDuration),
words.word
).count().orderBy('window')
# Start running the query that prints the windowed word counts to the console
query = windowedCounts\
.writeStream\
.outputMode('complete')\
.format('console')\
.option('truncate', 'false')\
.start()
query.awaitTermination()
-接続~
java.net.ConnectException: Connection refused (Connection refused)が発生。
--使用するIPアドレス、≒ [[ホストアクセスのIPアドレスに注意が必要>Dockerのネットワーク設定#af36791e]]。
---WSL / WSL2で[[ncコマンド>https://dotnetdevelopmentinfrastructure.osscons.jp/index.php?nc%20%28netcat%29%20%E3%82%B3%E3%83%9E%E3%83%B3%E3%83%89]]は使えるが、~
・WSLでは、Windows上で動作しており、~
・WSL2では、LinuxのVM上で動作している。
---WSL2同士の場合、localhost(WSL2のVM)であれば通信をブリッジできる。
---[[Docker Desktop for WSL2のホストアクセスのIPアドレス>https://techinfoofmicrosofttech.osscons.jp/index.php?WSL%20%E2%86%92%20WSL2#yf2b317a]]はWindowsのIPなので、~
このIPだとWSL2のncとDocker Desktop for WSL2の通信をブリッジしない。
--WSL2で上記のサンプルを実行する場合、~
以下の様にする事で、繋がるようになる。
---Docker Desktop for WSL2は使用しない。
---WSL2のターミナルを起動し、WSL2の中からコンテナを起動する。
---もう一枚、WSL2のターミナルを起動しncする。
---[[起動したコンテナの中に入りホストアクセスのIPアドレスを取得する。>Dockerのネットワーク設定#bb6baf03]]
---取得したIPアドレスを使用してコンテナからWSL2のncへアクセスする。
---疎通確認後、ココで使用したIPアドレスを上記のコード中に埋め込む。
-結果~
以下のように動作した。
-------------------------------------------
Batch: 4
-------------------------------------------
+------------------------------------------+----+-----+
|window |word|count|
+------------------------------------------+----+-----+
|{2021-09-29 10:27:20, 2021-09-29 10:27:30}|444 |6 |
|{2021-09-29 10:27:20, 2021-09-29 10:27:30}|111 |6 |
|{2021-09-29 10:27:20, 2021-09-29 10:27:30}|222 |6 |
|{2021-09-29 10:27:20, 2021-09-29 10:27:30}|333 |6 |
|{2021-09-29 10:27:50, 2021-09-29 10:28:00}|ccc |4 |
|{2021-09-29 10:27:50, 2021-09-29 10:28:00}|bbb |4 |
|{2021-09-29 10:27:50, 2021-09-29 10:28:00}|aaa |4 |
|{2021-09-29 10:27:50, 2021-09-29 10:28:00}|ddd |4 |
+------------------------------------------+----+-----+
古いウィンドウが蓄積していくので、
--ソレをどうやって削除するか?とか、
--どうやってウィンドウ毎にファイル出力するか?
>とか、そう言うのがまだ良く解らない(→ [[outputMode>Spark Structured Streaming#cf63f37d]]で制御するらしい)。
-補足
--一定時間で停止
from time import sleep
...
# query.awaitTermination()
sleep(600) # 単位は秒らしい
query.stop()
--追加モード~
追加モードでは、withWatermarkが必要。~
ただし、orderBy('window')は使えなくなる。
# Split the lines into words, retaining timestamps
# split() splits each line into an array, and explode() turns the array into multiple rows
words = lines.select(
explode(split(lines.value, ' ')).alias('word'),
lines.timestamp
).withWatermark("timestamp", "1 minute")
# Group the data by window and word and compute the count of each group
windowedCounts = words.groupBy(
window(words.timestamp, windowDuration, slideDuration),
words.word
).count()
# Start running the query that prints the windowed word counts to the console
query = windowedCounts\
.writeStream\
.outputMode('append')\
.format('console')\
.option('truncate', 'false')\
.start()
--CSV出力~
以下で行けるはずだが、WSL2にしたら出力できなくなった。
query = windowedCounts\
.writeStream\
.outputMode('append')\
.format("parquet")\
.option("checkpointLocation", "path/to/")\
.option("path", "path/to/")\
.option('truncate', 'false')\
.start()
***様々な入力元・出力先の例 [#yd135245]
-バッチの場合
--CSVファイル~
[[コチラ>#ffe40eef]]で説明済み。
--Parquetファイル~
[[コチラ>#p629aafe]]で説明済み(入力のみ)。
-[[構造化ストリーミング>#m0435b58]]の場合
--入力:[[Scalaの入力サンプル>Spark Structured Streaming#cf63f37d]]をPySpark的に書く。
---[[kafka>Apache Kafka]]
kafka_stream_df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", 'hostname:10090,...,hostname:10099') \
.option("subscribe", "test") \
.option("startingOffsets", "earliest") \
.option("groupId", "streamtest") \
.load()
※ [[Kafkaチュートリアル]]で作成した環境と(WSL2上のコンテナ同士を)接続できなかった(何故?)。
--出力:[[Scalaの出力サンプル>Spark Structured Streaming#cf63f37d]]をPySpark的に書く。
***[[on Azure Databricks>https://techinfoofmicrosofttech.osscons.jp/index.php?Azure%20Databricks%E3%83%81%E3%83%A5%E3%83%BC%E3%83%88%E3%83%AA%E3%82%A2%E3%83%AB#r15035b9]] [#p629aafe]
リンク先で、[[上記のサンプル>#ffe40eef]]をDatabricksで動かしてみる。
*参考 [#z4b7048e]
**環境構築 [#vcf9cacd]
-PySpark環境構築メモ - Smile Engineering Blog~
https://smile-jsp.hateblo.jp/entry/2020/05/07/012432
-WindowsでSpark(PySpark)環境をつくる - goodbyegangsterのブログ~
https://goodbyegangster.hatenablog.com/entry/2018/06/27/022915
-Jupyter Notebookの使い方 | Python入門~
https://www.javadrive.jp/python/jupyter-notebook/
**構造化ストリーミング [#m0435b58]
-[[Spark Structured Streaming]]
-PySparkとKafkaでStreaming処理を行う | 私の備忘録~
https://www.my-memorandum.tk/pyspark-structuedstreaming-docker
-[[Qiita > masato>#yce576d5]]
**EnsekiTT Blog [#z408f56e]
-Dockerでデータ分析環境を作るためにJupyter Docker Stacksを使ってみた話~
https://ensekitt.hatenablog.com/entry/2018/06/29/200000
-うっかりコンテナを削除しても大丈夫なようにdocker-composeを使うことにした話~
https://ensekitt.hatenablog.com/entry/2018/07/01/200000
>タイトルからでは解り難いが、jupyter/datascience-notebookの~
/home/jovyan/ディレクトリをdocker-composeでマウントする内容。
**CUBE SUGAR CONTAINER [#m977e9b5]
-PySpark: 時刻と文字列を相互に変換する (DataFrame / Spark SQL)~
https://blog.amedama.jp/entry/2018/01/29/212711
-PySpark: Jupyter Notebook からローカルの PySpark ドライバを操作する~
https://blog.amedama.jp/entry/2018/01/29/215458
-PySpark の UDF (User Defined Function) を試す~
https://blog.amedama.jp/entry/2018/01/31/210755
-PySpark の DataFrame を SparkSQL で操作する~
https://blog.amedama.jp/entry/2018/03/03/173257
-PySpark のスクリプトファイルで引数を扱う~
https://blog.amedama.jp/entry/2018/03/17/113516
-Python: PySpark で DataFrame にカラムを追加する~
https://blog.amedama.jp/entry/2019/08/17/125115
**Simple is Beautiful. [#g83b52c2]
-PySparkで追加したカラムにリテラル値を追加する~
https://blog.kozakana.net/2020/11/add-literal-values-to-columns-added-by-pyspark/
-PySparkで日付情報を別カラムに分割する~
https://blog.kozakana.net/2020/11/split-date-information-into-separate-columns-with-pyspark/
-PySparkで配列を展開してそれぞれの行にする~
https://blog.kozakana.net/2020/12/expand-the-array-into-each-row-with-pyspark/
-PySparkで日付データを0パディングして別カラムに分ける~
https://blog.kozakana.net/2021/01/separate-datetime-and-zero-padding-in-pyspark/
-PySparkで行をフィルタリングする~
https://blog.kozakana.net/2020/12/filtering-rows-in-pyspark/
-PySparkで重複行を削除する~
https://blog.kozakana.net/2020/12/remove-duplicate-lines-in-pyspark/
-PySparkでgroupbyで集計したデータを配列にして一行にまとめる~
https://blog.kozakana.net/2020/12/concat-the-aggregated-data-in-an-array-by-groupby-with-pyspark/~
-PySparkのDataFrameをSQLで操作する~
https://blog.kozakana.net/2020/12/query-pyspark-dataframe-in-sql/
**Qiita [#f615d0ec]
-Ubuntu18.04 (WSL)でpyspark+jupyterの環境を手早く作る~
https://qiita.com/gaborotta/items/0d324f58ae3f0149db2a
-Apache Spark を Jupyter Notebook で試す (on ローカル Docker~
https://qiita.com/mangano-ito/items/dac5582a331d40a484ad
-Jupyter NotebookでのpySparkコードサンプル~
https://qiita.com/kazurof/items/0e8e46771cd845b7edbb
-【PySpark】dataframe操作サンプルコード集~
https://qiita.com/YujiHamada3/items/0220bb68efb5c1e6ef62
***masato [#yce576d5]
PythonとSensorTag, Kafka, Spark Streamingのストリーム処理
-Part 1: Raspberry Pi 3~
https://qiita.com/masato/items/c5b4fbd3257c3b8b3b13
-Part 2: KafkaとLandoop~
https://qiita.com/masato/items/12e746ffa112eaaabd39
-Part 3: Apache AvroとSchema Registry~
https://qiita.com/masato/items/9ca08a198fb29457163a
-Part 4: Kafka ConnectでMongoDBに出力する~
https://qiita.com/masato/items/825a38787b07f62a84c2
-Part 5: Apache Toree でJupyterからSparkに接続する~
https://qiita.com/masato/items/55ee77def529f4189560
-Part 6: JupyterからPySpark Streamingのウィンドウ集計をする~
https://qiita.com/masato/items/574b38e45014a6ae7d88
**Stack Overflow [#u043ee8c]
-pyspark - Split Spark Dataframe string column into multiple columns~
https://stackoverflow.com/questions/39235704/split-spark-dataframe-string-column-into-multiple-columns