「[[.NET 開発基盤部会 Wiki>http://dotnetdevelopmentinfrastructure.osscons.jp]]」は、「[[Open棟梁Project>https://github.com/OpenTouryoProject/]]」,「[[OSSコンソーシアム .NET開発基盤部会>https://www.osscons.jp/dotNetDevelopmentInfrastructure/]]」によって運営されています。 -戻る --[[Python]] --[[Apache Spark]] > [[Spark SQL]] *目次 [#k30dfef7] #contents *概要 [#r180bc95] -[[Apache Spark]]の[[Python]]言語バインディング -[[.NET for Apache Spark>Apache Spark#a2ce8cf5]]のチュートリアル用環境で動作したので。 *詳細 [#i5025fa2] **Notebook系 [#xa89d7e4] [[Apache Spark]]の特性上、インタラクティブな環境の方が習得が捗る。 ***[[Jupyter Notebook>#ffe40eef]] [#y2664b01] ***[[Azure Databricks>#p629aafe]] [#f0ad6acf] **チュートリアル [#n212eaf0] ***on [[Jupyter Notebook]] on [[Docker]] [#ffe40eef] -[[Jupyter Notebook]]で手軽に実行できる。 -ローカルでもできそうだったが、手軽そうな[[Docker]]をチョイス。~ ローカルだと、[[Jupyter Notebook]]に、別途、PySparkを入れる必要がある。 -[[Qiitaの「Apache Spark を Jupyter Notebook で試す (on ローカル Docker」>#f615d0ec]]を参考にした。 -[[Docker]]前提で(ここでは、[[Docker for Windows>https://techinfoofmicrosofttech.osscons.jp/index.php?Docker%20for%20Windows]]を使用した)環境のコンテナを --docker run で起動する場合 docker run -it -p 8888:8888 jupyter/pyspark-notebook --docker-compose で起動する場合 ---docker-compose.yaml version: '3' services: notebook: image: jupyter/pyspark-notebook ports: - '8888:8888' environment: - GRANT_SUDO=yes - TZ=Asia/Tokyo volumes: - ./work:/home/jovyan/work - ./path:/home/jovyan/path ---docker-compose up >docker-compose up -d ※ /jovyanをマウントすると、Kernel errorになる。~ 多分、jovyan直下に必要なファイルが存在している。 -最後に表示されたURLにブラウザからアクセスする。 [C 03:39:12.997 NotebookApp] To access the notebook, open this file in a browser: file:///home/jovyan/.local/share/jupyter/runtime/nbserver-7-open.html Or copy and paste one of these URLs: http://xxxxx:8888/?token=xxxxx or http://127.0.0.1:8888/?token=xxxxx -[[Jupyter Notebook>#y2664b01]]が表示されるので、~ New から Notebook: Python3 を選択し Notebook を開く。 -ハロー・ワールド風 --「In []:」に以下を貼り付けて RUN [▶] を押下する。 from pyspark.sql import SparkSession spark: SparkSession = SparkSession.builder.appName("SimpleApp").getOrCreate() # do something to prove it works spark.sql('SELECT "Test1" as c1').show() --以下のような結果が表示され、動作が確認できる。 +----+ | c1| +----+ |Test| +----+ --Jupyter Notebookでは、Session?が維持されるので、~ 以降の処理で、SparkSessionを再度定義する必要は無い。 -Tuple&Listの二次元表からDataFrameを生成する。 --「In []:」に以下を貼り付けて RUN [▶] を押下する。 from typing import List, Tuple from pyspark.sql import SparkSession from pyspark.sql import DataFrame from pyspark.sql.types import StructField, StructType, StringType, IntegerType Trainer = Tuple[int, str, str, int] trainers: List[Trainer] = [ (1, 'サトシ', 'male', 10), (2, 'シゲル', 'male', 10), (3, 'カスミ', 'female', 12), ] trainers_schema = StructType([ StructField('id', IntegerType(), True), StructField('name', StringType(), True), StructField('gender', StringType(), True), StructField('age', IntegerType(), True), ]) trainers_df: DataFrame = spark.createDataFrame( spark.sparkContext.parallelize(trainers), trainers_schema ) trainers_df.show() --以下のような結果が表示され、動作が確認できる。 +---+------+------+---+ | id| name|gender|age| +---+------+------+---+ | 1|サトシ| male| 10| | 2|シゲル| male| 10| | 3|カスミ|female| 12| +---+------+------+---+ -DataFrame を Row の list で返す。 --前述のtrainers_df.show()以降に以下を追記して RUN [▶] を押下する。 result = trainers_df.collect() print(result) --以下のような結果が表示され、動作が確認できる。 [Row(id=1, name='サトシ', gender='male', age=10), Row(id=2, name='シゲル', gender='male', age=10), Row(id=3, name='カスミ', gender='female', age=12)] -CSV入出力~ WSL2に乗り換えたら、出力ができなくなったので、~ 複雑な検証を行う場合は、潔くVMでやったほうが良いかも。 --出力 ---前述のtrainers_df.show()以降に以下を追記して RUN [▶] を押下する。 trainers_df.coalesce(1).write.mode('overwrite').csv("path/to/output.csv") ---CSV出力の確認~ ・コンテナに入って確認する。 >docker ps CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES xxxxxxxxxxxx jupyter/pyspark-notebook... ... >docker exec -i -t xxxxxxxxxxxx /bin/bash ... /home/jovyan/path/to を探してみると、出力を確認できる。 ~ ・docker-compose.yamlのvolumesに指定した~ フォルダを探してみると、出力を確認できる。~ ~ ・[[Jupyter Notebook>#y2664b01]](Files)からも確認できる。~ http://127.0.0.1:8888/tree#notebooks --CSV入力~ 上記ファイルを入力する方法は...、 ---「In []:」に以下を貼り付けて RUN [▶] を押下する。 from pyspark.sql import SparkSession from pyspark.sql import DataFrame from pyspark.sql.types import StructField, StructType, StringType, IntegerType trainers_schema = StructType([ StructField('id', IntegerType(), True), StructField('name', StringType(), True), StructField('gender', StringType(), True), StructField('age', IntegerType(), True), ]) trainers_df: DataFrame = spark.read.format("csv")\ .options(header="false")\ .load("path/to/output.csv", schema=trainers_schema) trainers_df.show() ---以下のような結果が表示され、動作が確認できる。 +---+------+------+---+ | id| name|gender|age| +---+------+------+---+ | 1|サトシ| male| 10| | 2|シゲル| male| 10| | 3|カスミ|female| 12| +---+------+------+---+ -以下は、[[Spark SQLとDataFrame API>Spark SQL]]を使用した操作。 --前述のtrainers_df.show()以降に以下を追記して RUN [▶] を押下する。 ---[[Spark SQL]]を使用する。 trainers_df.createOrReplaceTempView('trainers'); male_trainers_df = spark.sql(''' SELECT * FROM trainers WHERE gender = 'male' ''') male_trainers_df.show() ※ トリプル・クォーテーションは改行を扱える。 ---DataFrame APIを使用する。 male_trainers_df = trainers_df.filter(trainers_df['gender'] == 'male') male_trainers_df.show() --以下のような結果が表示され、動作が確認できる。 +---+------+------+---+ | id| name|gender|age| +---+------+------+---+ | 1|サトシ| male| 10| | 2|シゲル| male| 10| | 3|カスミ|female| 12| +---+------+------+---+ +---+------+------+---+ | id| name|gender|age| +---+------+------+---+ | 1|サトシ| male| 10| | 2|シゲル| male| 10| +---+------+------+---+ -以下は、ユーザー定義関数(UDF)を使用した操作。 --前述に ---ユーザー定義関数(UDF)のimportを追加し、 from pyspark.sql.functions import udf ---trainers_df.show()以降に以下を追記して RUN [▶] を押下する。 @udf(StringType()) def name_with_suffix(name: str, gender: str) -> str: return name + {'male': 'くん', 'female': 'さん'}.get(gender, '氏') spark.udf.register('name_with_suffix', name_with_suffix) dearest_trainers = spark.sql(''' SELECT name_with_suffix(name, gender) as hoge FROM trainers ''') dearest_trainers.show() --以下のような結果が表示され、動作が確認できる。 +---+------+------+---+ | id| name|gender|age| +---+------+------+---+ | 1|サトシ| male| 10| | 2|シゲル| male| 10| | 3|カスミ|female| 12| +---+------+------+---+ +----------+ | hoge| +----------+ |サトシくん| |シゲルくん| |カスミさん| +----------+ ***DataFrameに対する様々な操作 [#w64c6c2e] [[Spark SQLとDataFrame API>Spark SQL]]が使用できる。 -DataFrameの生成 +-----+-----+ | c1| c2| +-----+-----+ |Test1|Test2| +-----+-----+ --DataFrame API~ [[前述>#ffe40eef]]のTuple&Listの二次元表からDataFrameを生成する例等。 --Spark SQL from pyspark.sql import SparkSession spark: SparkSession = SparkSession.builder.appName("SimpleApp").getOrCreate() # do something to prove it works df = spark.sql('SELECT "Test1" as c1, "Test2" as c2 ') df.show() -カラムの追加~ DataFrame や、それを支える内部的な RDD はイミュータブル (不変) なオブジェクトになっている。~ そのため、カラムを追加するときは既存のオブジェクトを変更するのではなく、新たなオブジェクトを作ることになる。 +-----+-----+-----+ | c1| c2| c3| +-----+-----+-----+ |Test1|Test2|XXXXX| +-----+-----+-----+ --DataFrame API ---指定列の値 df = df.withColumn('c3', df.c2) df.show() +-----+-----+-----+ | c1| c2| c3| +-----+-----+-----+ |Test1|Test2|Test2| +-----+-----+-----+ ---リテラル値 import pyspark.sql.functions as f df = df.withColumn('c3', f.lit('hoge')) df.show() +-----+-----+-----+ | c1| c2| c3| +-----+-----+-----+ |Test1|Test2| hoge| +-----+-----+-----+ --Spark SQL df.createOrReplaceTempView('table') df = spark.sql('select c1, c2, c2 as c3 from table') df.show() -カラムのリネーム +-----+-----+-----+ | c1| c2| hoge| +-----+-----+-----+ |Test1|Test2|XXXXX| +-----+-----+-----+ --DataFrame API df = df.withColumnRenamed('c3', 'hoge') df.show() --Spark SQL df.createOrReplaceTempView('table') df = spark.sql('select c1, c2, c3 as hoge from table') df.show() -カラムの削除 +-----+-----+ | c1| c2| +-----+-----+ |Test1|Test2| +-----+-----+ --DataFrame API df = df.drop('hoge') df.show() --Spark SQL df.createOrReplaceTempView('table') df = spark.sql('select c1, c2 from table') df.show() -テーブルの結合 +---+----+ +---+----+ +---+---+---+ | c1| c21| | c1| c22| | c1|c21|c22| +---+----+ join +---+----+ = +---+---+---+ | 1| aaa| | 1| bbb| | 1|aaa|bbb| +---+----+ +---+----+ +---+---+---+ --DataFrameの生成 from pyspark.sql import SparkSession spark: SparkSession = SparkSession.builder.appName("SimpleApp").getOrCreate() # do something to prove it works df1 = spark.sql('SELECT "1" as c1, "aaa" as c21') df2 = spark.sql('SELECT "1" as c1, "bbb" as c22') df1.show() df2.show() --DataFrame API ---カラム名が同じ df1.join(df2, "c1", "inner").show() ---カラム名が異なる df1.join(df2, df1.c1 == df2.c1, "inner").show() ※ &color(red){c1がダブるが...。}; --Spark SQL df1.createOrReplaceTempView('t1') df2.createOrReplaceTempView('t2') df = spark.sql('select t1.c1, t1.c21, t2.c22 from t1 inner join t2 on t1.c1 = t2.c1') df.show() -セルの分割 --DataFrameの生成 from pyspark.sql import SparkSession spark: SparkSession = SparkSession.builder.appName("SimpleApp").getOrCreate() # do something to prove it works df = spark.sql('SELECT "aaa-bbb-ccc" as c1, "XXX" as c2') df.show() +-----------+---+ | c1| c2| +-----------+---+ |aaa-bbb-ccc|XXX| +-----------+---+ --列に分割~ 分割したデータを列に追加し、~ 必要に応じて、c1カラムを削除する。 import pyspark.sql.functions as f split_col = f.split(df['c1'], '-') df = df.withColumn('c2', split_col.getItem(0)) df = df.withColumn('c3', split_col.getItem(1)) df = df.withColumn('c4', split_col.getItem(2)) df.show() +-----------+---+---+---+---+ | c1| c2| d1| d2| d3| +-----------+---+---+---+---+ |aaa-bbb-ccc|XXX|aaa|bbb|ccc| +-----------+---+---+---+---+ --行に分割~ 分割したデータを列に追加し、~ 他のカラムには同じ値を入れる。 ---explode import pyspark.sql.functions as f df = df.withColumn('c1', f.split(f.col('c1'), '-')) df = df.select(f.explode(f.col('c1')), 'c2') df = df.withColumnRenamed('col', 'c1') df.show() +---+---+ | c1| c2| +---+---+ |aaa|XXX| |bbb|XXX| |ccc|XXX| +---+---+ ---posexplode import pyspark.sql.functions as f df = df.withColumn('c1', f.split(f.col('c1'), '-')) df = df.select(f.posexplode(f.col('c1')), 'c2') df = df.withColumnRenamed('pos', 'index') df = df.withColumnRenamed('col', 'c1') df.show() +-----+---+---+ |index| c1| c2| +-----+---+---+ | 0|aaa|XXX| | 1|bbb|XXX| | 2|ccc|XXX| +-----+---+---+ ***構造化ストリーミングでウィンドウ操作 [#x623bccd] (Sliding Window Operations in Spark Structured Streaming) -[[コチラ>https://techinfoofmicrosofttech.osscons.jp/index.php?.NET%20for%20Apache%20Spark%E3%83%81%E3%83%A5%E3%83%BC%E3%83%88%E3%83%AA%E3%82%A2%E3%83%AB#w3d503ab]]で上手く行かなかったモノのリベンジ。~ https://github.com/apache/spark/blob/master/examples/src/main/python/sql/streaming/structured_network_wordcount_windowed.py import sys from pyspark.sql import SparkSession from pyspark.sql.functions import explode from pyspark.sql.functions import split from pyspark.sql.functions import window host = "xxx.xxx.xxx.xxx" port = 9999 windowSize = 10 slideSize = 10 if slideSize > windowSize: print("<slideSize> must be less than or equal to <windowSize>", file=sys.stderr) windowDuration = '{} seconds'.format(windowSize) slideDuration = '{} seconds'.format(slideSize) spark = SparkSession\ .builder\ .appName("StructuredNetworkWordCountWindowed")\ .getOrCreate() # Create DataFrame representing the stream of input lines from connection to host:port lines = spark\ .readStream\ .format('socket')\ .option('host', host)\ .option('port', port)\ .option('includeTimestamp', 'true')\ .load() # Split the lines into words, retaining timestamps # split() splits each line into an array, and explode() turns the array into multiple rows words = lines.select( explode(split(lines.value, ' ')).alias('word'), lines.timestamp ) # Group the data by window and word and compute the count of each group windowedCounts = words.groupBy( window(words.timestamp, windowDuration, slideDuration), words.word ).count().orderBy('window') # Start running the query that prints the windowed word counts to the console query = windowedCounts\ .writeStream\ .outputMode('complete')\ .format('console')\ .option('truncate', 'false')\ .start() query.awaitTermination() -接続~ java.net.ConnectException: Connection refused (Connection refused)が発生。 --使用するIPアドレス、≒ [[ホストアクセスのIPアドレスに注意が必要>Dockerのネットワーク設定#af36791e]]。 ---WSL / WSL2で[[ncコマンド>https://dotnetdevelopmentinfrastructure.osscons.jp/index.php?nc%20%28netcat%29%20%E3%82%B3%E3%83%9E%E3%83%B3%E3%83%89]]は使えるが、~ ・WSLでは、Windows上で動作しており、~ ・WSL2では、LinuxのVM上で動作している。 ---WSL2同士の場合、localhost(WSL2のVM)であれば通信をブリッジできる。 ---[[Docker Desktop for WSL2のホストアクセスのIPアドレス>https://techinfoofmicrosofttech.osscons.jp/index.php?WSL%20%E2%86%92%20WSL2#yf2b317a]]はWindowsのIPなので、~ このIPだとWSL2のncとDocker Desktop for WSL2の通信をブリッジしない。 --WSL2で上記のサンプルを実行する場合、~ 以下の様にする事で、繋がるようになる。 ---Docker Desktop for WSL2は使用しない。 ---WSL2のターミナルを起動し、WSL2の中からコンテナを起動する。 ---もう一枚、WSL2のターミナルを起動しncする。 ---[[起動したコンテナの中に入りホストアクセスのIPアドレスを取得する。>Dockerのネットワーク設定#bb6baf03]] ---取得したIPアドレスを使用してコンテナからWSL2のncへアクセスする。 ---疎通確認後、ココで使用したIPアドレスを上記のコード中に埋め込む。 -結果~ 以下のように動作した。 ------------------------------------------- Batch: 4 ------------------------------------------- +------------------------------------------+----+-----+ |window |word|count| +------------------------------------------+----+-----+ |{2021-09-29 10:27:20, 2021-09-29 10:27:30}|444 |6 | |{2021-09-29 10:27:20, 2021-09-29 10:27:30}|111 |6 | |{2021-09-29 10:27:20, 2021-09-29 10:27:30}|222 |6 | |{2021-09-29 10:27:20, 2021-09-29 10:27:30}|333 |6 | |{2021-09-29 10:27:50, 2021-09-29 10:28:00}|ccc |4 | |{2021-09-29 10:27:50, 2021-09-29 10:28:00}|bbb |4 | |{2021-09-29 10:27:50, 2021-09-29 10:28:00}|aaa |4 | |{2021-09-29 10:27:50, 2021-09-29 10:28:00}|ddd |4 | +------------------------------------------+----+-----+ 古いウィンドウが蓄積していくので、 --ソレをどうやって削除するか?とか、 --どうやってウィンドウ毎にファイル出力するか? >とか、そう言うのがまだ良く解らない(→ [[outputMode>Spark Structured Streaming#cf63f37d]]で制御するらしい)。 -補足 --一定時間で停止 from time import sleep ... # query.awaitTermination() sleep(600) # 単位は秒らしい query.stop() --追加モード~ 追加モードでは、withWatermarkが必要。~ ただし、orderBy('window')は使えなくなる。 # Split the lines into words, retaining timestamps # split() splits each line into an array, and explode() turns the array into multiple rows words = lines.select( explode(split(lines.value, ' ')).alias('word'), lines.timestamp ).withWatermark("timestamp", "1 minute") # Group the data by window and word and compute the count of each group windowedCounts = words.groupBy( window(words.timestamp, windowDuration, slideDuration), words.word ).count() # Start running the query that prints the windowed word counts to the console query = windowedCounts\ .writeStream\ .outputMode('append')\ .format('console')\ .option('truncate', 'false')\ .start() --CSV出力~ 以下で行けるはずだが、WSL2にしたら出力できなくなった。 query = windowedCounts\ .writeStream\ .outputMode('append')\ .format("parquet")\ .option("checkpointLocation", "path/to/")\ .option("path", "path/to/")\ .option('truncate', 'false')\ .start() ***様々な入力元・出力先の例 [#yd135245] -バッチの場合 --CSVファイル~ [[コチラ>#ffe40eef]]で説明済み。 --Parquetファイル~ [[コチラ>#p629aafe]]で説明済み(入力のみ)。 --[[kafka>Apache Kafka]] ---入力 kafka_stream_df = spark \ .readStream \ .format("kafka") \ .option("kafka.bootstrap.servers", 'hostname:10090,...,hostname:10099') \ .option("subscribe", "test") \ .option("startingOffsets", "earliest") \ .option("groupId", "streamtest") \ .load() ---出力~ ... -構造化ストリーミングの場合 --[[入力>Spark Structured Streaming#cf63f37d]]をPySpark的に書く。 --[[出力>Spark Structured Streaming#cf63f37d]]をPySpark的に書く。 ***[[on Azure Databricks>https://techinfoofmicrosofttech.osscons.jp/index.php?Azure%20Databricks%E3%83%81%E3%83%A5%E3%83%BC%E3%83%88%E3%83%AA%E3%82%A2%E3%83%AB#r15035b9]] [#p629aafe] リンク先で、[[上記のサンプル>#ffe40eef]]をDatabricksで動かしてみる。 *参考 [#z4b7048e] **環境構築 [#vcf9cacd] -PySpark環境構築メモ - Smile Engineering Blog~ https://smile-jsp.hateblo.jp/entry/2020/05/07/012432 -WindowsでSpark(PySpark)環境をつくる - goodbyegangsterのブログ~ https://goodbyegangster.hatenablog.com/entry/2018/06/27/022915 -Jupyter Notebookの使い方 | Python入門~ https://www.javadrive.jp/python/jupyter-notebook/ **構造化ストリーミング [#m0435b58] -PySparkとKafkaでStreaming処理を行う | 私の備忘録~ https://www.my-memorandum.tk/pyspark-structuedstreaming-docker -[[Qiita > masato>#yce576d5]] **EnsekiTT Blog [#z408f56e] -Dockerでデータ分析環境を作るためにJupyter Docker Stacksを使ってみた話~ https://ensekitt.hatenablog.com/entry/2018/06/29/200000 -うっかりコンテナを削除しても大丈夫なようにdocker-composeを使うことにした話~ https://ensekitt.hatenablog.com/entry/2018/07/01/200000 >タイトルからでは解り難いが、jupyter/datascience-notebookの~ /home/jovyan/ディレクトリをdocker-composeでマウントする内容。 **CUBE SUGAR CONTAINER [#m977e9b5] -PySpark: 時刻と文字列を相互に変換する (DataFrame / Spark SQL)~ https://blog.amedama.jp/entry/2018/01/29/212711 -PySpark: Jupyter Notebook からローカルの PySpark ドライバを操作する~ https://blog.amedama.jp/entry/2018/01/29/215458 -PySpark の UDF (User Defined Function) を試す~ https://blog.amedama.jp/entry/2018/01/31/210755 -PySpark の DataFrame を SparkSQL で操作する~ https://blog.amedama.jp/entry/2018/03/03/173257 -PySpark のスクリプトファイルで引数を扱う~ https://blog.amedama.jp/entry/2018/03/17/113516 -Python: PySpark で DataFrame にカラムを追加する~ https://blog.amedama.jp/entry/2019/08/17/125115 **Simple is Beautiful. [#g83b52c2] -PySparkで追加したカラムにリテラル値を追加する~ https://blog.kozakana.net/2020/11/add-literal-values-to-columns-added-by-pyspark/ -PySparkで日付情報を別カラムに分割する~ https://blog.kozakana.net/2020/11/split-date-information-into-separate-columns-with-pyspark/ -PySparkで配列を展開してそれぞれの行にする~ https://blog.kozakana.net/2020/12/expand-the-array-into-each-row-with-pyspark/ -PySparkで日付データを0パディングして別カラムに分ける~ https://blog.kozakana.net/2021/01/separate-datetime-and-zero-padding-in-pyspark/ -PySparkで行をフィルタリングする~ https://blog.kozakana.net/2020/12/filtering-rows-in-pyspark/ -PySparkで重複行を削除する~ https://blog.kozakana.net/2020/12/remove-duplicate-lines-in-pyspark/ -PySparkでgroupbyで集計したデータを配列にして一行にまとめる~ https://blog.kozakana.net/2020/12/concat-the-aggregated-data-in-an-array-by-groupby-with-pyspark/~ -PySparkのDataFrameをSQLで操作する~ https://blog.kozakana.net/2020/12/query-pyspark-dataframe-in-sql/ **Qiita [#f615d0ec] -Ubuntu18.04 (WSL)でpyspark+jupyterの環境を手早く作る~ https://qiita.com/gaborotta/items/0d324f58ae3f0149db2a -Apache Spark を Jupyter Notebook で試す (on ローカル Docker~ https://qiita.com/mangano-ito/items/dac5582a331d40a484ad -Jupyter NotebookでのpySparkコードサンプル~ https://qiita.com/kazurof/items/0e8e46771cd845b7edbb -【PySpark】dataframe操作サンプルコード集~ https://qiita.com/YujiHamada3/items/0220bb68efb5c1e6ef62 ***masato [#yce576d5] PythonとSensorTag, Kafka, Spark Streamingのストリーム処理 -Part 1: Raspberry Pi 3~ https://qiita.com/masato/items/c5b4fbd3257c3b8b3b13 -Part 2: KafkaとLandoop~ https://qiita.com/masato/items/12e746ffa112eaaabd39 -Part 3: Apache AvroとSchema Registry~ https://qiita.com/masato/items/9ca08a198fb29457163a -Part 4: Kafka ConnectでMongoDBに出力する~ https://qiita.com/masato/items/825a38787b07f62a84c2 -Part 5: Apache Toree でJupyterからSparkに接続する~ https://qiita.com/masato/items/55ee77def529f4189560 -Part 6: JupyterからPySpark Streamingのウィンドウ集計をする~ https://qiita.com/masato/items/574b38e45014a6ae7d88 **Stack Overflow [#u043ee8c] -pyspark - Split Spark Dataframe string column into multiple columns~ https://stackoverflow.com/questions/39235704/split-spark-dataframe-string-column-into-multiple-columns