「[[.NET 開発基盤部会 Wiki>http://dotnetdevelopmentinfrastructure.osscons.jp]]」は、「[[Open棟梁Project>https://github.com/OpenTouryoProject/]]」,「[[OSSコンソーシアム .NET開発基盤部会>https://www.osscons.jp/dotNetDevelopmentInfrastructure/]]」によって運営されています。 -戻る --[[Python]] --[[Apache Spark]] *目次 [#k30dfef7] #contents *概要 [#r180bc95] -[[Apache Spark]]の[[Python]]言語バインディング -[[.NET for Apache Spark>Apache Spark#a2ce8cf5]]のチュートリアル用環境で動作したので。 *詳細 [#i5025fa2] **Notebook系 [#xa89d7e4] [[Apache Spark]]の特性上、インタラクティブな環境の方が習得が捗る。 ***[[Jupyter Notebook]] [#y2664b01] ***[[Azure Databricks>#p629aafe]] [#f0ad6acf] **チュートリアル [#n212eaf0] ***on Docker [#ffe40eef] -ローカルでもできそうだったが、手軽そうな[[Docker]]をチョイス。 -以下、[[Qiitaの「Apache Spark を Jupyter Notebook で試す (on ローカル Docker」>#f615d0ec]]を参考にした。 -[[Docker]]前提で(ここでは、[[Docker for Windows>https://techinfoofmicrosofttech.osscons.jp/index.php?Docker%20for%20Windows]]を使用した)環境のコンテナを --docker run で起動する場合 docker run -it -p 8888:8888 jupyter/pyspark-notebook --docker-compose で起動する場合 ---docker-compose.yaml version: '3' services: notebook: image: jupyter/pyspark-notebook ports: - '8888:8888' environment: - GRANT_SUDO=yes - TZ=Asia/Tokyo volumes: - ./work:/home/jovyan/work - ./path:/home/jovyan/path ---docker-compose up >docker-compose up -d ※ /jovyanをマウントすると、Kernel errorになる。~ 多分、jovyan直下に必要なファイルが存在している。 -最後に表示されたURLにブラウザからアクセスする。 [C 03:39:12.997 NotebookApp] To access the notebook, open this file in a browser: file:///home/jovyan/.local/share/jupyter/runtime/nbserver-7-open.html Or copy and paste one of these URLs: http://xxxxx:8888/?token=xxxxx or http://127.0.0.1:8888/?token=xxxxx -[[Jupyter Notebook>#y2664b01]]が表示されるので、~ New から Notebook: Python3 を選択し Notebook を開く。 -ハロー・ワールド風 --「In []:」に以下を貼り付けて RUN [▶] を押下する。 from pyspark.sql import SparkSession spark: SparkSession = SparkSession.builder.appName("SimpleApp").getOrCreate() # do something to prove it works spark.sql('SELECT "Test" as c1').show() --以下のような結果が表示され、動作が確認できる。 +----+ | c1| +----+ |Test| +----+ --Jupyter Notebookでは、Session?が維持されるので、~ 以降の処理で、SparkSessionを再度定義する必要は無い。 -Tuple&Listの二次元表からDataFrameを生成する。 --「In []:」に以下を貼り付けて RUN [▶] を押下する。 from typing import List, Tuple from pyspark.sql import SparkSession from pyspark.sql import DataFrame from pyspark.sql.types import StructField, StructType, StringType, IntegerType Trainer = Tuple[int, str, str, int] trainers: List[Trainer] = [ (1, 'サトシ', 'male', 10), (2, 'シゲル', 'male', 10), (3, 'カスミ', 'female', 12), ] trainers_schema = StructType([ StructField('id', IntegerType(), True), StructField('name', StringType(), True), StructField('gender', StringType(), True), StructField('age', IntegerType(), True), ]) trainers_df: DataFrame = spark.createDataFrame( spark.sparkContext.parallelize(trainers), trainers_schema ) trainers_df.show() --以下のような結果が表示され、動作が確認できる。 +---+------+------+---+ | id| name|gender|age| +---+------+------+---+ | 1|サトシ| male| 10| | 2|シゲル| male| 10| | 3|カスミ|female| 12| +---+------+------+---+ -DataFrame を Row の list で返す。 --前述のtrainers_df.show()以降に以下を追記して RUN [▶] を押下する。 result = trainers_df.collect() print(result) --以下のような結果が表示され、動作が確認できる。 [Row(id=1, name='サトシ', gender='male', age=10), Row(id=2, name='シゲル', gender='male', age=10), Row(id=3, name='カスミ', gender='female', age=12)] -入出力 --CSV出力 ---前述のtrainers_df.show()以降に以下を追記して RUN [▶] を押下する。 trainers_df.coalesce(1).write.mode('overwrite').csv("path/to/output.csv") ---CSV出力の確認~ ・コンテナに入って確認する。 >docker ps CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES xxxxxxxxxxxx jupyter/pyspark-notebook... ... >docker exec -i -t xxxxxxxxxxxx /bin/bash ... /home/jovyan/path/to を探してみると、出力を確認できる。 ~ ・docker-compose.yamlのvolumesに指定した~ フォルダを探してみると、出力を確認できる。~ ~ ・[[Jupyter Notebook>#y2664b01]](Files)からも確認できる。~ http://127.0.0.1:8888/tree#notebooks --CSV入力~ 上記ファイルを入力する方法は...、 ---「In []:」に以下を貼り付けて RUN [▶] を押下する。 from pyspark.sql import SparkSession from pyspark.sql import DataFrame from pyspark.sql.types import StructField, StructType, StringType, IntegerType trainers_schema = StructType([ StructField('id', IntegerType(), True), StructField('name', StringType(), True), StructField('gender', StringType(), True), StructField('age', IntegerType(), True), ]) trainers_df: DataFrame = spark.read.format("csv")\ .options(header="false")\ .load("path/to/output.csv", schema=trainers_schema) trainers_df.show() ---以下のような結果が表示され、動作が確認できる。 +---+------+------+---+ | id| name|gender|age| +---+------+------+---+ | 1|サトシ| male| 10| | 2|シゲル| male| 10| | 3|カスミ|female| 12| +---+------+------+---+ -以下は、[[Spark SQLとDataFrame API>Spark SQL]]を使用した操作。 --前述のtrainers_df.show()以降に以下を追記して RUN [▶] を押下する。 ---Spark SQLを使用する。 trainers_df.createOrReplaceTempView('trainers'); male_trainers_df = spark.sql(''' SELECT * FROM trainers WHERE gender = 'male' ''') male_trainers_df.show() ※ トリプル・クォーテーションは改行を扱える。 ---DataFrame APIを使用する。 male_trainers_df = trainers_df.filter(trainers_df['gender'] == 'male') male_trainers_df.show() --以下のような結果が表示され、動作が確認できる。 +---+------+------+---+ | id| name|gender|age| +---+------+------+---+ | 1|サトシ| male| 10| | 2|シゲル| male| 10| | 3|カスミ|female| 12| +---+------+------+---+ +---+------+------+---+ | id| name|gender|age| +---+------+------+---+ | 1|サトシ| male| 10| | 2|シゲル| male| 10| +---+------+------+---+ -以下は、ユーザー定義関数(UDF)を使用した操作。 --前述に ---ユーザー定義関数(UDF)のimportを追加し、 from pyspark.sql.functions import udf ---trainers_df.show()以降に以下を追記して RUN [▶] を押下する。 @udf(StringType()) def name_with_suffix(name: str, gender: str) -> str: return name + {'male': 'くん', 'female': 'さん'}.get(gender, '氏') spark.udf.register('name_with_suffix', name_with_suffix) dearest_trainers = spark.sql(''' SELECT name_with_suffix(name, gender) as hoge FROM trainers ''') dearest_trainers.show() --以下のような結果が表示され、動作が確認できる。 +---+------+------+---+ | id| name|gender|age| +---+------+------+---+ | 1|サトシ| male| 10| | 2|シゲル| male| 10| | 3|カスミ|female| 12| +---+------+------+---+ +----------+ | hoge| +----------+ |サトシくん| |シゲルくん| |カスミさん| +----------+ ***[[on Azure Databricks>https://techinfoofmicrosofttech.osscons.jp/index.php?Azure%20Databricks%E3%83%81%E3%83%A5%E3%83%BC%E3%83%88%E3%83%AA%E3%82%A2%E3%83%AB#r15035b9]] [#p629aafe] リンク先で、Databricksで動かしてみる。 ***構造化ストリーミングでウィンドウ操作 [#x623bccd] [[コチラ>https://techinfoofmicrosofttech.osscons.jp/index.php?.NET%20for%20Apache%20Spark%E3%83%81%E3%83%A5%E3%83%BC%E3%83%88%E3%83%AA%E3%82%A2%E3%83%AB#w3d503ab]]で上手く行かなかったモノのリベンジ。 >https://github.com/apache/spark/blob/master/examples/src/main/python/sql/streaming/structured_network_wordcount_windowed.py import sys from pyspark.sql import SparkSession from pyspark.sql.functions import explode from pyspark.sql.functions import split from pyspark.sql.functions import window host = "localhost" port = 9999 windowSize = 10 slideSize = 10 if slideSize > windowSize: print("<slideSize> must be less than or equal to <windowSize>", file=sys.stderr) windowDuration = '{} seconds'.format(windowSize) slideDuration = '{} seconds'.format(slideSize) spark = SparkSession\ .builder\ .appName("StructuredNetworkWordCountWindowed")\ .getOrCreate() # Create DataFrame representing the stream of input lines from connection to host:port lines = spark\ .readStream\ .format('socket')\ .option('host', host)\ .option('port', port)\ .option('includeTimestamp', 'true')\ .load() # Split the lines into words, retaining timestamps # split() splits each line into an array, and explode() turns the array into multiple rows words = lines.select( explode(split(lines.value, ' ')).alias('word'), lines.timestamp ) # Group the data by window and word and compute the count of each group windowedCounts = words.groupBy( window(words.timestamp, windowDuration, slideDuration), words.word ).count().orderBy('window') # Start running the query that prints the windowed word counts to the console query = windowedCounts\ .writeStream\ .outputMode('complete')\ .format('console')\ .option('truncate', 'false')\ .start() query.awaitTermination() -結果 --java.net.ConnectException: Connection refused (Connection refused)が発生。 --jupyter/pyspark-notebookは、[[ホストへのアクセスが上手く出来ない可能性がある。>Dockerのネットワーク設定#af36791e]] *参考 [#z4b7048e] -PySpark環境構築メモ - Smile Engineering Blog~ https://smile-jsp.hateblo.jp/entry/2020/05/07/012432 -WindowsでSpark(PySpark)環境をつくる - goodbyegangsterのブログ~ https://goodbyegangster.hatenablog.com/entry/2018/06/27/022915 -Jupyter Notebookの使い方 | Python入門~ https://www.javadrive.jp/python/jupyter-notebook/ **EnsekiTT Blog [#z408f56e] -Dockerでデータ分析環境を作るためにJupyter Docker Stacksを使ってみた話~ https://ensekitt.hatenablog.com/entry/2018/06/29/200000 -うっかりコンテナを削除しても大丈夫なようにdocker-composeを使うことにした話~ https://ensekitt.hatenablog.com/entry/2018/07/01/200000 >タイトルからでは解り難いが、jupyter/datascience-notebookの~ /home/jovyan/ディレクトリをdocker-composeでマウントする内容。 **CUBE SUGAR CONTAINER [#m977e9b5] -PySpark: 時刻と文字列を相互に変換する (DataFrame / Spark SQL)~ https://blog.amedama.jp/entry/2018/01/29/212711 -PySpark: Jupyter Notebook からローカルの PySpark ドライバを操作する~ https://blog.amedama.jp/entry/2018/01/29/215458 -PySpark の UDF (User Defined Function) を試す~ https://blog.amedama.jp/entry/2018/01/31/210755 -PySpark の DataFrame を SparkSQL で操作する~ https://blog.amedama.jp/entry/2018/03/03/173257 -PySpark のスクリプトファイルで引数を扱う~ https://blog.amedama.jp/entry/2018/03/17/113516 **Qiita [#f615d0ec] -Ubuntu18.04 (WSL)でpyspark+jupyterの環境を手早く作る~ https://qiita.com/gaborotta/items/0d324f58ae3f0149db2a -Apache Spark を Jupyter Notebook で試す (on ローカル Docker~ https://qiita.com/mangano-ito/items/dac5582a331d40a484ad -Jupyter NotebookでのpySparkコードサンプル~ https://qiita.com/kazurof/items/0e8e46771cd845b7edbb -【PySpark】dataframe操作サンプルコード集~ https://qiita.com/YujiHamada3/items/0220bb68efb5c1e6ef62