「[[.NET 開発基盤部会 Wiki>http://dotnetdevelopmentinfrastructure.osscons.jp]]」は、「[[Open棟梁Project>https://github.com/OpenTouryoProject/]]」,「[[OSSコンソーシアム .NET開発基盤部会>https://www.osscons.jp/dotNetDevelopmentInfrastructure/]]」によって運営されています。

-戻る
--[[Python]]
--[[Apache Spark]]
--[[Apache Spark]] > [[Spark SQL]]

*目次 [#k30dfef7]
#contents

*概要 [#r180bc95]
-[[Apache Spark]]の[[Python]]言語バインディング
-[[.NET for Apache Spark>Apache Spark#a2ce8cf5]]のチュートリアル用環境で動作したので。

*詳細 [#i5025fa2]

**Notebook系 [#xa89d7e4]
**環境:Notebook系 [#xa89d7e4]
[[Apache Spark]]の特性上、インタラクティブな環境の方が習得が捗る。

***[[Jupyter Notebook]] [#y2664b01]
→ [[チュートリアル>#ffe40eef]]

***[[JupyterLab]] [#vc859f33]
→ [[チュートリアル>#ffe40eef]]

***[[Azure Databricks>#p629aafe]] [#f0ad6acf]

**チュートリアル [#n212eaf0]

***on Docker [#ffe40eef]
-ローカルでもできそうだったが、手軽そうな[[Docker]]をチョイス。
***on [[Jupyter>統合開発環境 (IDE)#x1ebfeb7]] on [[Docker]] [#ffe40eef]
-[[Jupyter>統合開発環境 (IDE)#x1ebfeb7]]で手軽に実行できる。

-以下、[[Qiitaの「Apache Spark を Jupyter Notebook で試す (on ローカル Docker」>#f615d0ec]]を参考にした。
-ローカルでもできそうだったが、手軽そうな[[Docker]]をチョイス。~
ローカルだと、[[Jupyter>統合開発環境 (IDE)#x1ebfeb7]]に、別途、PySparkを入れる必要がある。

-[[Qiitaの「Apache Spark を Jupyter Notebook で試す (on ローカル Docker」>#f615d0ec]]を参考にした。

-[[Docker]]前提で(ここでは、[[Docker for Windows>https://techinfoofmicrosofttech.osscons.jp/index.php?Docker%20for%20Windows]]を使用した)環境のコンテナを

--docker run で起動する場合
 docker run -it -p 8888:8888 jupyter/pyspark-notebook

--docker-compose で起動する場合

---docker-compose.yaml
 version: '3'
 services:
   notebook:
     image: jupyter/pyspark-notebook
     ports:
       - '8888:8888'
     environment:
       - GRANT_SUDO=yes
       - TZ=Asia/Tokyo
     volumes:
       - ./work:/home/jovyan/work
       - ./path:/home/jovyan/path

---docker-compose up
 >docker-compose up -d
 >docker-compose up
※ /jovyanをマウントすると、Kernel errorになる。~
  多分、jovyan直下に必要なファイルが存在している。

-最後に表示されたURLにブラウザからアクセスする。
 [C 03:39:12.997 NotebookApp]
 To access the notebook, open this file in a browser:
     file:///home/jovyan/.local/share/jupyter/runtime/nbserver-7-open.html
 Or copy and paste one of these URLs:
     http://xxxxx:8888/?token=xxxxx
     or http://127.0.0.1:8888/?token=xxxxx

-[[Jupyter Notebook>#y2664b01]]が表示されるので、~
New から Notebook: Python3 を選択し Notebook を開く。
New から Notebook: Python3 を選択し Notebook を開く。~
(URLをちょっと変えると[[JupyterLab>#vc859f33]]が起動する)

-ハロー・ワールド風

--「In []:」に以下を貼り付けて RUN [▶] を押下する。
 from pyspark.sql import SparkSession
 spark: SparkSession = SparkSession.builder.appName("SimpleApp").getOrCreate()
 # do something to prove it works
 spark.sql('SELECT "Test" as c1').show()
 spark.sql('SELECT "Test1" as c1').show()

--以下のような結果が表示され、動作が確認できる。
 +----+
 |  c1|
 +----+
 |Test|
 +----+

--Jupyter Notebookでは、Session?が維持されるので、~
--[[Jupyter Notebook>#y2664b01]]では、Session?が維持されるので、~
以降の処理で、SparkSessionを再度定義する必要は無い。

-Tuple&Listの二次元表からDataFrameを生成する。

--「In []:」に以下を貼り付けて RUN [▶] を押下する。
 from typing import List, Tuple
 from pyspark.sql import SparkSession
 from pyspark.sql import DataFrame
 from pyspark.sql.types import StructField, StructType, StringType, IntegerType
 
 Trainer = Tuple[int, str, str, int]
 trainers: List[Trainer] = [
     (1, 'サトシ', 'male',   10),
     (2, 'シゲル', 'male',   10),
     (3, 'カスミ', 'female', 12),
 ]
 
 trainers_schema = StructType([
     StructField('id',      IntegerType(), True),
     StructField('name',    StringType(),  True),
     StructField('gender',  StringType(),  True),
     StructField('age',     IntegerType(), True),
 ])
 
 trainers_df: DataFrame = spark.createDataFrame(
     spark.sparkContext.parallelize(trainers),
     trainers_schema
 )
 
 trainers_df.show()

--以下のような結果が表示され、動作が確認できる。
 +---+------+------+---+
 | id|  name|gender|age|
 +---+------+------+---+
 |  1|サトシ|  male| 10|
 |  2|シゲル|  male| 10|
 |  3|カスミ|female| 12|
 +---+------+------+---+

-DataFrame を Row の list で返す。

--前述のtrainers_df.show()以降に以下を追記して RUN [▶] を押下する。
 result = trainers_df.collect()
 print(result)

--以下のような結果が表示され、動作が確認できる。
 [Row(id=1, name='サトシ', gender='male', age=10), Row(id=2, name='シゲル', gender='male', age=10), Row(id=3, name='カスミ', gender='female', age=12)]

-入出力
-CSV入出力~
WSL2に乗り換えたら、出力ができなくなったので、~
複雑な検証を行う場合は、潔くVMでやったほうが良いかも。

--CSV出力
--出力

---前述のtrainers_df.show()以降に以下を追記して RUN [▶] を押下する。
 trainers_df.coalesce(1).write.mode('overwrite').csv("path/to/output.csv")

---CSV出力の確認~
・コンテナに入って確認する。
 >docker ps
 CONTAINER ID   IMAGE                        COMMAND CREATED STATUS PORTS NAMES
 xxxxxxxxxxxx   jupyter/pyspark-notebook...
 ...
 
 >docker exec -i -t xxxxxxxxxxxx /bin/bash
 ... /home/jovyan/path/to を探してみると、出力を確認できる。
~
・docker-compose.yamlのvolumesに指定した~
 フォルダを探してみると、出力を確認できる。~
~
・[[Jupyter Notebook>#y2664b01]](Files)からも確認できる。~
 http://127.0.0.1:8888/tree#notebooks

--CSV入力~
上記ファイルを入力する方法は...、

---「In []:」に以下を貼り付けて RUN [▶] を押下する。
 from pyspark.sql import SparkSession
 from pyspark.sql import DataFrame
 from pyspark.sql.types import StructField, StructType, StringType, IntegerType
 
 trainers_schema = StructType([
     StructField('id',      IntegerType(), True),
     StructField('name',    StringType(),  True),
     StructField('gender',  StringType(),  True),
     StructField('age',     IntegerType(), True),
 ])
 
 trainers_df: DataFrame = spark.read.format("csv")\
     .options(header="false")\
     .load("path/to/output.csv", schema=trainers_schema)
 
 trainers_df.show()

---以下のような結果が表示され、動作が確認できる。
 +---+------+------+---+
 | id|  name|gender|age|
 +---+------+------+---+
 |  1|サトシ|  male| 10|
 |  2|シゲル|  male| 10|
 |  3|カスミ|female| 12|
 +---+------+------+---+

-以下は、[[Spark SQLとDataFrame API>Spark SQL]]を使用した操作。

--前述のtrainers_df.show()以降に以下を追記して RUN [▶] を押下する。

---Spark SQLを使用する。
---[[Spark SQL]]を使用する。
 trainers_df.createOrReplaceTempView('trainers');
 male_trainers_df = spark.sql('''
   SELECT *
   FROM   trainers
   WHERE  gender = 'male'
 ''')
 male_trainers_df.show()
※ トリプル・クォーテーションは改行を扱える。

---DataFrame APIを使用する。
 male_trainers_df = trainers_df.filter(trainers_df['gender'] == 'male')
 male_trainers_df.show()

--以下のような結果が表示され、動作が確認できる。
 +---+------+------+---+
 | id|  name|gender|age|
 +---+------+------+---+
 |  1|サトシ|  male| 10|
 |  2|シゲル|  male| 10|
 |  3|カスミ|female| 12|
 +---+------+------+---+
 
 +---+------+------+---+
 | id|  name|gender|age|
 +---+------+------+---+
 |  1|サトシ|  male| 10|
 |  2|シゲル|  male| 10|
 +---+------+------+---+

-以下は、ユーザー定義関数(UDF)を使用した操作。

--前述に

---ユーザー定義関数(UDF)のimportを追加し、
 from pyspark.sql.functions import udf

---trainers_df.show()以降に以下を追記して RUN [▶] を押下する。
 @udf(StringType())
 def name_with_suffix(name: str, gender: str) -> str:
   return name + {'male': 'くん', 'female': 'さん'}.get(gender, '氏')
 
 spark.udf.register('name_with_suffix', name_with_suffix)
 
 dearest_trainers = spark.sql('''
   SELECT name_with_suffix(name, gender) as hoge
   FROM   trainers
 ''')
 
 dearest_trainers.show()

--以下のような結果が表示され、動作が確認できる。
 +---+------+------+---+
 | id|  name|gender|age|
 +---+------+------+---+
 |  1|サトシ|  male| 10|
 |  2|シゲル|  male| 10|
 |  3|カスミ|female| 12|
 +---+------+------+---+
 
 +----------+
 |      hoge|
 +----------+
 |サトシくん|
 |シゲルくん|
 |カスミさん|
 +----------+

***[[on Azure Databricks>https://techinfoofmicrosofttech.osscons.jp/index.php?Azure%20Databricks%E3%83%81%E3%83%A5%E3%83%BC%E3%83%88%E3%83%AA%E3%82%A2%E3%83%AB#r15035b9]] [#p629aafe]
リンク先で、Databricksで動かしてみる。
***DataFrameに対する様々な操作 [#w64c6c2e]
[[Spark SQLとDataFrame API>Spark SQL]]が使用できる。

***構造化ストリーミングでウィンドウ操作 [#x623bccd]
[[コチラ>https://techinfoofmicrosofttech.osscons.jp/index.php?.NET%20for%20Apache%20Spark%E3%83%81%E3%83%A5%E3%83%BC%E3%83%88%E3%83%AA%E3%82%A2%E3%83%AB#w3d503ab]]で上手く行かなかったモノのリベンジ。
-DataFrameの生成
 +-----+-----+
 |   c1|   c2|
 +-----+-----+
 |Test1|Test2|
 +-----+-----+

>https://github.com/apache/spark/blob/master/examples/src/main/python/sql/streaming/structured_network_wordcount_windowed.py
--DataFrame API~
[[前述>#ffe40eef]]のTuple&Listの二次元表からDataFrameを生成する例等。

--Spark SQL
 from pyspark.sql import SparkSession
 spark: SparkSession = SparkSession.builder.appName("SimpleApp").getOrCreate()
 # do something to prove it works
 df = spark.sql('SELECT "Test1" as c1, "Test2" as c2 ')
 df.show()

-カラムの追加~
DataFrame や、それを支える内部的な RDD はイミュータブル (不変) なオブジェクトになっている。~
そのため、カラムを追加するときは既存のオブジェクトを変更するのではなく、新たなオブジェクトを作ることになる。
 +-----+-----+-----+
 |   c1|   c2|   c3|
 +-----+-----+-----+
 |Test1|Test2|XXXXX|
 +-----+-----+-----+

--DataFrame API

---指定列の値
 df = df.withColumn('c3', df.c2)
 df.show()
 +-----+-----+-----+
 |   c1|   c2|   c3|
 +-----+-----+-----+
 |Test1|Test2|Test2|
 +-----+-----+-----+

---リテラル値
 import pyspark.sql.functions as f
 df = df.withColumn('c3', f.lit('hoge'))
 df.show()
 +-----+-----+-----+
 |   c1|   c2|   c3|
 +-----+-----+-----+
 |Test1|Test2| hoge|
 +-----+-----+-----+

--Spark SQL
 df.createOrReplaceTempView('table')
 df = spark.sql('select c1, c2, c2 as c3 from table')
 df.show()

-カラムのリネーム
 +-----+-----+-----+
 |   c1|   c2| hoge|
 +-----+-----+-----+
 |Test1|Test2|XXXXX|
 +-----+-----+-----+

--DataFrame API
 df = df.withColumnRenamed('c3', 'hoge')
 df.show()

--Spark SQL
 df.createOrReplaceTempView('table')
 df = spark.sql('select c1, c2, c3 as hoge from table')
 df.show()

-カラムの削除
 +-----+-----+
 |   c1|   c2|
 +-----+-----+
 |Test1|Test2|
 +-----+-----+

--DataFrame API
 df = df.drop('hoge')
 df.show()

--Spark SQL 
 df.createOrReplaceTempView('table')
 df = spark.sql('select c1, c2 from table')
 df.show()

-テーブルの結合
 +---+----+      +---+----+   +---+---+---+
 | c1| c21|      | c1| c22|   | c1|c21|c22|
 +---+----+ join +---+----+ = +---+---+---+
 |  1| aaa|      |  1| bbb|   |  1|aaa|bbb|
 +---+----+      +---+----+   +---+---+---+

--DataFrameの生成
 from pyspark.sql import SparkSession
 spark: SparkSession = SparkSession.builder.appName("SimpleApp").getOrCreate()
 # do something to prove it works
 df1 = spark.sql('SELECT "1" as c1, "aaa" as c21')
 df2 = spark.sql('SELECT "1" as c1, "bbb" as c22')
 df1.show()
 df2.show() 

--DataFrame API

---カラム名が同じ
 df1.join(df2, "c1", "inner").show()

---カラム名が異なる
 df1.join(df2, df1.c1 == df2.c1, "inner").show()
※ &color(red){c1がダブるが...。};
--Spark SQL
 df1.createOrReplaceTempView('t1')
 df2.createOrReplaceTempView('t2')
 df = spark.sql('select t1.c1, t1.c21, t2.c22 from t1 inner join t2 on t1.c1 = t2.c1')
 df.show()

-セルの分割

--DataFrameの生成
 from pyspark.sql import SparkSession
 spark: SparkSession = SparkSession.builder.appName("SimpleApp").getOrCreate()
 # do something to prove it works
 df = spark.sql('SELECT "aaa-bbb-ccc" as c1, "XXX" as c2')
 df.show()
 
 +-----------+---+
 |         c1| c2|
 +-----------+---+
 |aaa-bbb-ccc|XXX|
 +-----------+---+

--列に分割~
分割したデータを列に追加し、~
必要に応じて、c1カラムを削除する。
 import pyspark.sql.functions as f
 split_col = f.split(df['c1'], '-')
 df = df.withColumn('c2', split_col.getItem(0))
 df = df.withColumn('c3', split_col.getItem(1))
 df = df.withColumn('c4', split_col.getItem(2))
 df.show()
 
 +-----------+---+---+---+---+
 |         c1| c2| d1| d2| d3|
 +-----------+---+---+---+---+
 |aaa-bbb-ccc|XXX|aaa|bbb|ccc|
 +-----------+---+---+---+---+

--行に分割~
分割したデータを列に追加し、~
他のカラムには同じ値を入れる。

---explode
 import pyspark.sql.functions as f
 df = df.withColumn('c1', f.split(f.col('c1'), '-'))
 df = df.select(f.explode(f.col('c1')), 'c2')
 df = df.withColumnRenamed('col', 'c1')
 df.show()
 
 +---+---+
 | c1| c2|
 +---+---+
 |aaa|XXX|
 |bbb|XXX|
 |ccc|XXX|
 +---+---+

---posexplode
 import pyspark.sql.functions as f
 df = df.withColumn('c1', f.split(f.col('c1'), '-'))
 df = df.select(f.posexplode(f.col('c1')), 'c2')
 df = df.withColumnRenamed('pos', 'index')
 df = df.withColumnRenamed('col', 'c1')
 df.show()
 
 +-----+---+---+
 |index| c1| c2|
 +-----+---+---+
 |    0|aaa|XXX|
 |    1|bbb|XXX|
 |    2|ccc|XXX|
 +-----+---+---+

***[[構造化ストリーミング>#m0435b58]]でウィンドウ操作 [#x623bccd]
(Sliding Window Operations in Spark Structured Streaming)

-[[コチラ>https://techinfoofmicrosofttech.osscons.jp/index.php?.NET%20for%20Apache%20Spark%E3%83%81%E3%83%A5%E3%83%BC%E3%83%88%E3%83%AA%E3%82%A2%E3%83%AB#w3d503ab]]で上手く行かなかったモノのリベンジ。~
https://github.com/apache/spark/blob/master/examples/src/main/python/sql/streaming/structured_network_wordcount_windowed.py
 import sys
 
 from pyspark.sql import SparkSession
 from pyspark.sql.functions import explode
 from pyspark.sql.functions import split
 from pyspark.sql.functions import window
 
 host = "xxx.xxx.xxx.xxx"
 port = 9999
 windowSize = 10
 slideSize  = 10
 if slideSize > windowSize:
     print("<slideSize> must be less than or equal to <windowSize>", file=sys.stderr)
 windowDuration = '{} seconds'.format(windowSize)
 slideDuration = '{} seconds'.format(slideSize)
 
 spark = SparkSession\
     .builder\
     .appName("StructuredNetworkWordCountWindowed")\
     .getOrCreate()
 
 # Create DataFrame representing the stream of input lines from connection to host:port
 lines = spark\
     .readStream\
     .format('socket')\
     .option('host', host)\
     .option('port', port)\
     .option('includeTimestamp', 'true')\
     .load()
 
 # Split the lines into words, retaining timestamps
 # split() splits each line into an array, and explode() turns the array into multiple rows
 words = lines.select(
     explode(split(lines.value, ' ')).alias('word'),
     lines.timestamp
 )
 
 # Group the data by window and word and compute the count of each group
 windowedCounts = words.groupBy(
     window(words.timestamp, windowDuration, slideDuration),
     words.word
 ).count().orderBy('window')
 
 # Start running the query that prints the windowed word counts to the console
 query = windowedCounts\
     .writeStream\
     .outputMode('complete')\
     .format('console')\
     .option('truncate', 'false')\
     .start()
 
 query.awaitTermination()

-結果
-接続~
java.net.ConnectException: Connection refused (Connection refused)が発生。

--java.net.ConnectException: Connection refused (Connection refused)が発生。
--使用するIPアドレス、≒ [[ホストアクセスのIPアドレスに注意が必要>Dockerのネットワーク設定#af36791e]]。

--[[ホストへのアクセスのIPアドレスに注意が必要>Dockerのネットワーク設定#af36791e]](127.0.0.1ではダメらしい
---WSL / WSL2で[[ncコマンド>https://dotnetdevelopmentinfrastructure.osscons.jp/index.php?nc%20%28netcat%29%20%E3%82%B3%E3%83%9E%E3%83%B3%E3%83%89]]は使えるが、~
・WSLでは、Windows上で動作しており、~
・WSL2では、LinuxのVM上で動作している。

--[[ncコマンド>https://dotnetdevelopmentinfrastructure.osscons.jp/index.php?nc%20%28netcat%29%20%E3%82%B3%E3%83%9E%E3%83%B3%E3%83%89]]は使えるが、WSL2同士のncはlocalhostでないと繋がらない。~
>...と言うのも[[WSL2のホストアクセスのIPアドレス>https://techinfoofmicrosofttech.osscons.jp/index.php?WSL%20%E2%86%92%20WSL2#yf2b317a]]は、~
WindowsのIPなのでWSL2同士の通信をブリッジしない)
---WSL2同士の場合、localhost(WSL2のVM)であれば通信をブリッジできる。

--と言う事で、WSL2だとncが使えない問題が発生してしまう。
---[[Docker Desktop for WSL2のホストアクセスのIPアドレス>https://techinfoofmicrosofttech.osscons.jp/index.php?WSL%20%E2%86%92%20WSL2#yf2b317a]]はWindowsのIPなので、~
このIPだとWSL2のncとDocker Desktop for WSL2の通信をブリッジしない。

--WSL2で上記のサンプルを実行する場合、~
以下の様にする事で、繋がるようになる。
---Docker Desktop for WSL2は使用しない。
---WSL2のターミナルを起動し、WSL2の中からコンテナを起動する。
---もう一枚、WSL2のターミナルを起動しncする。
---[[起動したコンテナの中に入りホストアクセスのIPアドレスを取得する。>Dockerのネットワーク設定#bb6baf03]]
---取得したIPアドレスを使用してコンテナからWSL2のncへアクセスする。
---疎通確認後、ココで使用したIPアドレスを上記のコード中に埋め込む。

-結果~
以下のように動作した。
 -------------------------------------------
 Batch: 4
 -------------------------------------------
 +------------------------------------------+----+-----+
 |window                                    |word|count|
 +------------------------------------------+----+-----+
 |{2021-09-29 10:27:20, 2021-09-29 10:27:30}|444 |6    |
 |{2021-09-29 10:27:20, 2021-09-29 10:27:30}|111 |6    |
 |{2021-09-29 10:27:20, 2021-09-29 10:27:30}|222 |6    |
 |{2021-09-29 10:27:20, 2021-09-29 10:27:30}|333 |6    |
 |{2021-09-29 10:27:50, 2021-09-29 10:28:00}|ccc |4    |
 |{2021-09-29 10:27:50, 2021-09-29 10:28:00}|bbb |4    |
 |{2021-09-29 10:27:50, 2021-09-29 10:28:00}|aaa |4    |
 |{2021-09-29 10:27:50, 2021-09-29 10:28:00}|ddd |4    |
 +------------------------------------------+----+-----+
古いウィンドウが蓄積していくので、
--ソレをどうやって削除するか?とか、
--どうやってウィンドウ毎にファイル出力するか?

>とか、そう言うのがまだ良く解らない(→ [[outputMode>Spark Structured Streaming#cf63f37d]]で制御するらしい)。

-補足

--一定時間で停止
 from time import sleep
 
 ...
 
 # query.awaitTermination()
 
 sleep(600) # 単位は秒らしい
 query.stop()

--追加モード~
追加モードでは、withWatermarkが必要。~
ただし、orderBy('window')は使えなくなる。
 # Split the lines into words, retaining timestamps
 # split() splits each line into an array, and explode() turns the array into multiple rows
 words = lines.select(
     explode(split(lines.value, ' ')).alias('word'),
     lines.timestamp
 ).withWatermark("timestamp", "1 minute")
 
 # Group the data by window and word and compute the count of each group
 windowedCounts = words.groupBy(
     window(words.timestamp, windowDuration, slideDuration),
     words.word
 ).count()
 
 # Start running the query that prints the windowed word counts to the console
 query = windowedCounts\
     .writeStream\
     .outputMode('append')\
     .format('console')\
     .option('truncate', 'false')\
     .start()

--CSV出力~
以下で行けるはずだが、WSL2にしたら出力できなくなった。
 query = windowedCounts\
     .writeStream\
     .outputMode('append')\
     .format("parquet")\
     .option("checkpointLocation", "path/to/")\
     .option("path", "path/to/")\
     .option('truncate', 'false')\
     .start()

***様々な入力元・出力先の例 [#yd135245]

-バッチの場合

--CSVファイル~
[[コチラ>#ffe40eef]]で説明済み。

--Parquetファイル~
[[コチラ>#p629aafe]]で説明済み(入力のみ)。

-[[構造化ストリーミング>#m0435b58]]の場合
--入力:[[Scalaの入力サンプル>Spark Structured Streaming#cf63f37d]]をPySpark的に書く。
---[[kafka>Apache Kafka]]
 kafka_stream_df = spark \
   .readStream \
   .format("kafka") \
   .option("kafka.bootstrap.servers", 'hostname:10090,...,hostname:10099') \
   .option("subscribe", "test") \
   .option("startingOffsets", "earliest") \
   .option("groupId", "streamtest") \
   .load()
※ [[Kafkaチュートリアル]]で作成した環境と(WSL2上のコンテナ同士を)接続できなかった(何故?)。

--出力:[[Scalaの出力サンプル>Spark Structured Streaming#cf63f37d]]をPySpark的に書く。

***[[on Azure Databricks>https://techinfoofmicrosofttech.osscons.jp/index.php?Azure%20Databricks%E3%83%81%E3%83%A5%E3%83%BC%E3%83%88%E3%83%AA%E3%82%A2%E3%83%AB#r15035b9]] [#p629aafe]
リンク先で、[[上記のサンプル>#ffe40eef]]をDatabricksで動かしてみる。

*参考 [#z4b7048e]

**環境構築 [#vcf9cacd]
-PySpark環境構築メモ - Smile Engineering Blog~
https://smile-jsp.hateblo.jp/entry/2020/05/07/012432

-WindowsでSpark(PySpark)環境をつくる - goodbyegangsterのブログ~
https://goodbyegangster.hatenablog.com/entry/2018/06/27/022915

-Jupyter Notebookの使い方 | Python入門~
https://www.javadrive.jp/python/jupyter-notebook/

**構造化ストリーミング [#m0435b58]
-[[Spark Structured Streaming]]

-PySparkとKafkaでStreaming処理を行う | 私の備忘録~
https://www.my-memorandum.tk/pyspark-structuedstreaming-docker

-[[Qiita > masato>#yce576d5]]

**EnsekiTT Blog [#z408f56e]
-Dockerでデータ分析環境を作るためにJupyter Docker Stacksを使ってみた話~
https://ensekitt.hatenablog.com/entry/2018/06/29/200000
-うっかりコンテナを削除しても大丈夫なようにdocker-composeを使うことにした話~
https://ensekitt.hatenablog.com/entry/2018/07/01/200000
>タイトルからでは解り難いが、jupyter/datascience-notebookの~
/home/jovyan/ディレクトリをdocker-composeでマウントする内容。

**CUBE SUGAR CONTAINER [#m977e9b5]
-PySpark: 時刻と文字列を相互に変換する (DataFrame / Spark SQL)~
https://blog.amedama.jp/entry/2018/01/29/212711
-PySpark: Jupyter Notebook からローカルの PySpark ドライバを操作する~
https://blog.amedama.jp/entry/2018/01/29/215458
-PySpark の UDF (User Defined Function) を試す~
https://blog.amedama.jp/entry/2018/01/31/210755
-PySpark の DataFrame を SparkSQL で操作する~
https://blog.amedama.jp/entry/2018/03/03/173257
-PySpark のスクリプトファイルで引数を扱う~
https://blog.amedama.jp/entry/2018/03/17/113516

-Python: PySpark で DataFrame にカラムを追加する~
https://blog.amedama.jp/entry/2019/08/17/125115

**Simple is Beautiful. [#g83b52c2]
-PySparkで追加したカラムにリテラル値を追加する~
https://blog.kozakana.net/2020/11/add-literal-values-to-columns-added-by-pyspark/
-PySparkで日付情報を別カラムに分割する~
https://blog.kozakana.net/2020/11/split-date-information-into-separate-columns-with-pyspark/
-PySparkで配列を展開してそれぞれの行にする~
https://blog.kozakana.net/2020/12/expand-the-array-into-each-row-with-pyspark/
-PySparkで日付データを0パディングして別カラムに分ける~
https://blog.kozakana.net/2021/01/separate-datetime-and-zero-padding-in-pyspark/

-PySparkで行をフィルタリングする~
https://blog.kozakana.net/2020/12/filtering-rows-in-pyspark/
-PySparkで重複行を削除する~
https://blog.kozakana.net/2020/12/remove-duplicate-lines-in-pyspark/
-PySparkでgroupbyで集計したデータを配列にして一行にまとめる~
https://blog.kozakana.net/2020/12/concat-the-aggregated-data-in-an-array-by-groupby-with-pyspark/~

-PySparkのDataFrameをSQLで操作する~
https://blog.kozakana.net/2020/12/query-pyspark-dataframe-in-sql/

**Qiita [#f615d0ec]
-Ubuntu18.04 (WSL)でpyspark+jupyterの環境を手早く作る~
https://qiita.com/gaborotta/items/0d324f58ae3f0149db2a

-Apache Spark を Jupyter Notebook で試す (on ローカル Docker~
https://qiita.com/mangano-ito/items/dac5582a331d40a484ad

-Jupyter NotebookでのpySparkコードサンプル~
https://qiita.com/kazurof/items/0e8e46771cd845b7edbb

-【PySpark】dataframe操作サンプルコード集~
https://qiita.com/YujiHamada3/items/0220bb68efb5c1e6ef62

***masato [#yce576d5]
PythonとSensorTag, Kafka, Spark Streamingのストリーム処理

-Part 1: Raspberry Pi 3~
https://qiita.com/masato/items/c5b4fbd3257c3b8b3b13
-Part 2: KafkaとLandoop~
https://qiita.com/masato/items/12e746ffa112eaaabd39
-Part 3: Apache AvroとSchema Registry~
https://qiita.com/masato/items/9ca08a198fb29457163a
-Part 4: Kafka ConnectでMongoDBに出力する~
https://qiita.com/masato/items/825a38787b07f62a84c2
-Part 5: Apache Toree でJupyterからSparkに接続する~
https://qiita.com/masato/items/55ee77def529f4189560
-Part 6: JupyterからPySpark Streamingのウィンドウ集計をする~
https://qiita.com/masato/items/574b38e45014a6ae7d88

**Stack Overflow [#u043ee8c]
-pyspark - Split Spark Dataframe string column into multiple columns~
https://stackoverflow.com/questions/39235704/split-spark-dataframe-string-column-into-multiple-columns


トップ   新規 一覧 単語検索 最終更新   ヘルプ   最終更新のRSS