.NET 開発基盤部会 Wiki」は、「Open棟梁Project」,「OSSコンソーシアム .NET開発基盤部会」によって運営されています。

目次

概要

詳細

Notebook系

Apache Sparkの特性上、インタラクティブな環境の方が習得が捗る。

Jupyter Notebook

Azure Databricks

チュートリアル

on Jupyter Notebook on Docker

  • ローカルでもできそうだったが、手軽そうなDockerをチョイス。
    ローカルだと、Jupyter Notebookに、別途、PySparkを入れる必要がある。
  • docker run で起動する場合
    docker run -it -p 8888:8888 jupyter/pyspark-notebook
  • docker-compose で起動する場合
  • docker-compose.yaml
    version: '3'
    services:
      notebook:
        image: jupyter/pyspark-notebook
        ports:
          - '8888:8888'
        environment:
          - GRANT_SUDO=yes
          - TZ=Asia/Tokyo
        volumes:
          - ./work:/home/jovyan/work
          - ./path:/home/jovyan/path
  • docker-compose up
    >docker-compose up -d
    ※ /jovyanをマウントすると、Kernel errorになる。
      多分、jovyan直下に必要なファイルが存在している。
  • 最後に表示されたURLにブラウザからアクセスする。
    [C 03:39:12.997 NotebookApp]
    To access the notebook, open this file in a browser:
        file:///home/jovyan/.local/share/jupyter/runtime/nbserver-7-open.html
    Or copy and paste one of these URLs:
        http://xxxxx:8888/?token=xxxxx
        or http://127.0.0.1:8888/?token=xxxxx
  • Jupyter Notebookが表示されるので、
    New から Notebook: Python3 を選択し Notebook を開く。
  • ハロー・ワールド風
  • 「In []:」に以下を貼り付けて RUN [▶] を押下する。
    from pyspark.sql import SparkSession
    spark: SparkSession = SparkSession.builder.appName("SimpleApp").getOrCreate()
    # do something to prove it works
    spark.sql('SELECT "Test1" as c1').show()
  • 以下のような結果が表示され、動作が確認できる。
    +----+
    |  c1|
    +----+
    |Test|
    +----+
  • Jupyter Notebookでは、Session?が維持されるので、
    以降の処理で、SparkSession?を再度定義する必要は無い。
  • Tuple&Listの二次元表からDataFrame?を生成する。
  • 「In []:」に以下を貼り付けて RUN [▶] を押下する。
    from typing import List, Tuple
    from pyspark.sql import SparkSession
    from pyspark.sql import DataFrame
    from pyspark.sql.types import StructField, StructType, StringType, IntegerType
    
    Trainer = Tuple[int, str, str, int]
    trainers: List[Trainer] = [
        (1, 'サトシ', 'male',   10),
        (2, 'シゲル', 'male',   10),
        (3, 'カスミ', 'female', 12),
    ]
    
    trainers_schema = StructType([
        StructField('id',      IntegerType(), True),
        StructField('name',    StringType(),  True),
        StructField('gender',  StringType(),  True),
        StructField('age',     IntegerType(), True),
    ])
    
    trainers_df: DataFrame = spark.createDataFrame(
        spark.sparkContext.parallelize(trainers),
        trainers_schema
    )
    
    trainers_df.show()
  • 以下のような結果が表示され、動作が確認できる。
    +---+------+------+---+
    | id|  name|gender|age|
    +---+------+------+---+
    |  1|サトシ|  male| 10|
    |  2|シゲル|  male| 10|
    |  3|カスミ|female| 12|
    +---+------+------+---+
  • DataFrame? を Row の list で返す。
  • 前述のtrainers_df.show()以降に以下を追記して RUN [▶] を押下する。
    result = trainers_df.collect()
    print(result)
  • 以下のような結果が表示され、動作が確認できる。
    [Row(id=1, name='サトシ', gender='male', age=10), Row(id=2, name='シゲル', gender='male', age=10), Row(id=3, name='カスミ', gender='female', age=12)]
  • CSV入出力
    WSL2に乗り換えたら、出力ができなくなったので、
    複雑な検証を行う場合は、潔くVMでやったほうが良いかも。
  • 出力
  • 前述のtrainers_df.show()以降に以下を追記して RUN [▶] を押下する。
    trainers_df.coalesce(1).write.mode('overwrite').csv("path/to/output.csv")
  • CSV出力の確認
    ・コンテナに入って確認する。
    >docker ps
    CONTAINER ID   IMAGE                        COMMAND CREATED STATUS PORTS NAMES
    xxxxxxxxxxxx   jupyter/pyspark-notebook...
    ...
    
    >docker exec -i -t xxxxxxxxxxxx /bin/bash
    ... /home/jovyan/path/to を探してみると、出力を確認できる。

    ・docker-compose.yamlのvolumesに指定した
     フォルダを探してみると、出力を確認できる。

    Jupyter Notebook(Files)からも確認できる。
     http://127.0.0.1:8888/tree#notebooks
  • CSV入力
    上記ファイルを入力する方法は...、
  • 「In []:」に以下を貼り付けて RUN [▶] を押下する。
    from pyspark.sql import SparkSession
    from pyspark.sql import DataFrame
    from pyspark.sql.types import StructField, StructType, StringType, IntegerType
    
    trainers_schema = StructType([
        StructField('id',      IntegerType(), True),
        StructField('name',    StringType(),  True),
        StructField('gender',  StringType(),  True),
        StructField('age',     IntegerType(), True),
    ])
    
    trainers_df: DataFrame = spark.read.format("csv")\
        .options(header="false")\
        .load("path/to/output.csv", schema=trainers_schema)
    
    trainers_df.show()
  • 以下のような結果が表示され、動作が確認できる。
    +---+------+------+---+
    | id|  name|gender|age|
    +---+------+------+---+
    |  1|サトシ|  male| 10|
    |  2|シゲル|  male| 10|
    |  3|カスミ|female| 12|
    +---+------+------+---+
  • 前述のtrainers_df.show()以降に以下を追記して RUN [▶] を押下する。
  • Spark SQLを使用する。
    trainers_df.createOrReplaceTempView('trainers');
    male_trainers_df = spark.sql('''
      SELECT *
      FROM   trainers
      WHERE  gender = 'male'
    ''')
    male_trainers_df.show()
    ※ トリプル・クォーテーションは改行を扱える。
  • DataFrame? APIを使用する。
    male_trainers_df = trainers_df.filter(trainers_df['gender'] == 'male')
    male_trainers_df.show()
  • 以下のような結果が表示され、動作が確認できる。
    +---+------+------+---+
    | id|  name|gender|age|
    +---+------+------+---+
    |  1|サトシ|  male| 10|
    |  2|シゲル|  male| 10|
    |  3|カスミ|female| 12|
    +---+------+------+---+
    
    +---+------+------+---+
    | id|  name|gender|age|
    +---+------+------+---+
    |  1|サトシ|  male| 10|
    |  2|シゲル|  male| 10|
    +---+------+------+---+
  • 以下は、ユーザー定義関数(UDF)を使用した操作。
  • 前述に
  • ユーザー定義関数(UDF)のimportを追加し、
    from pyspark.sql.functions import udf
  • trainers_df.show()以降に以下を追記して RUN [▶] を押下する。
    @udf(StringType())
    def name_with_suffix(name: str, gender: str) -> str:
      return name + {'male': 'くん', 'female': 'さん'}.get(gender, '氏')
    
    spark.udf.register('name_with_suffix', name_with_suffix)
    
    dearest_trainers = spark.sql('''
      SELECT name_with_suffix(name, gender) as hoge
      FROM   trainers
    ''')
    
    dearest_trainers.show()
  • 以下のような結果が表示され、動作が確認できる。
    +---+------+------+---+
    | id|  name|gender|age|
    +---+------+------+---+
    |  1|サトシ|  male| 10|
    |  2|シゲル|  male| 10|
    |  3|カスミ|female| 12|
    +---+------+------+---+
    
    +----------+
    |      hoge|
    +----------+
    |サトシくん|
    |シゲルくん|
    |カスミさん|
    +----------+

DataFrame?に対する様々な操作

Spark SQLとDataFrame APIが使用できる。

  • DataFrame?の生成
    +-----+-----+
    |   c1|   c2|
    +-----+-----+
    |Test1|Test2|
    +-----+-----+
  • DataFrame? API
    前述のTuple&Listの二次元表からDataFrame?を生成する例等。
  • Spark SQL
    from pyspark.sql import SparkSession
    spark: SparkSession = SparkSession.builder.appName("SimpleApp").getOrCreate()
    # do something to prove it works
    df = spark.sql('SELECT "Test1" as c1, "Test2" as c2 ')
    df.show()
  • カラムの追加
    DataFrame? や、それを支える内部的な RDD はイミュータブル (不変) なオブジェクトになっている。
    そのため、カラムを追加するときは既存のオブジェクトを変更するのではなく、新たなオブジェクトを作ることになる。
    +-----+-----+-----+
    |   c1|   c2|   c3|
    +-----+-----+-----+
    |Test1|Test2|XXXXX|
    +-----+-----+-----+
  • DataFrame? API
  • 指定列の値
    df = df.withColumn('c3', df.c2)
    df.show()
    +-----+-----+-----+
    |   c1|   c2|   c3|
    +-----+-----+-----+
    |Test1|Test2|Test2|
    +-----+-----+-----+
  • リテラル値
    import pyspark.sql.functions as f
    df = df.withColumn('c3', f.lit('hoge'))
    df.show()
    +-----+-----+-----+
    |   c1|   c2|   c3|
    +-----+-----+-----+
    |Test1|Test2| hoge|
    +-----+-----+-----+
  • Spark SQL
    df.createOrReplaceTempView('table')
    df = spark.sql('select c1, c2, c2 as c3 from table')
    df.show()
  • カラムのリネーム
    +-----+-----+-----+
    |   c1|   c2| hoge|
    +-----+-----+-----+
    |Test1|Test2|XXXXX|
    +-----+-----+-----+
  • DataFrame? API
    df = df.withColumnRenamed('c3', 'hoge')
    df.show()
  • Spark SQL
    df.createOrReplaceTempView('table')
    df = spark.sql('select c1, c2, c3 as hoge from table')
    df.show()
  • カラムの削除
    +-----+-----+
    |   c1|   c2|
    +-----+-----+
    |Test1|Test2|
    +-----+-----+
  • DataFrame? API
    df = df.drop('hoge')
    df.show()
  • Spark SQL
    df.createOrReplaceTempView('table')
    df = spark.sql('select c1, c2 from table')
    df.show()
  • テーブルの結合
    +---+----+      +---+----+   +---+---+---+
    | c1| c21|      | c1| c22|   | c1|c21|c22|
    +---+----+ join +---+----+ = +---+---+---+
    |  1| aaa|      |  1| bbb|   |  1|aaa|bbb|
    +---+----+      +---+----+   +---+---+---+
  • DataFrame?の生成
    from pyspark.sql import SparkSession
    spark: SparkSession = SparkSession.builder.appName("SimpleApp").getOrCreate()
    # do something to prove it works
    df1 = spark.sql('SELECT "1" as c1, "aaa" as c21')
    df2 = spark.sql('SELECT "1" as c1, "bbb" as c22')
    df1.show()
    df2.show() 
  • DataFrame? API
  • カラム名が同じ
    df1.join(df2, "c1", "inner").show()
  • カラム名が異なる
    df1.join(df2, df1.c1 == df2.c1, "inner").show()
    c1がダブるが...。
  • Spark SQL
    df1.createOrReplaceTempView('t1')
    df2.createOrReplaceTempView('t2')
    df = spark.sql('select t1.c1, t1.c21, t2.c22 from t1 inner join t2 on t1.c1 = t2.c1')
    df.show()
  • セルの分割
  • DataFrame?の生成
    from pyspark.sql import SparkSession
    spark: SparkSession = SparkSession.builder.appName("SimpleApp").getOrCreate()
    # do something to prove it works
    df = spark.sql('SELECT "aaa-bbb-ccc" as c1, "XXX" as c2')
    df.show()
    
    +-----------+---+
    |         c1| c2|
    +-----------+---+
    |aaa-bbb-ccc|XXX|
    +-----------+---+
  • 列に分割
    分割したデータを列に追加し、
    必要に応じて、c1カラムを削除する。
    import pyspark.sql.functions as f
    split_col = f.split(df['c1'], '-')
    df = df.withColumn('c2', split_col.getItem(0))
    df = df.withColumn('c3', split_col.getItem(1))
    df = df.withColumn('c4', split_col.getItem(2))
    df.show()
    
    +-----------+---+---+---+---+
    |         c1| c2| d1| d2| d3|
    +-----------+---+---+---+---+
    |aaa-bbb-ccc|XXX|aaa|bbb|ccc|
    +-----------+---+---+---+---+
  • 行に分割
    分割したデータを列に追加し、
    他のカラムには同じ値を入れる。
  • explode
    import pyspark.sql.functions as f
    df = df.withColumn('c1', f.split(f.col('c1'), '-'))
    df = df.select(f.explode(f.col('c1')), 'c2')
    df = df.withColumnRenamed('col', 'c1')
    df.show()
    
    +---+---+
    | c1| c2|
    +---+---+
    |aaa|XXX|
    |bbb|XXX|
    |ccc|XXX|
    +---+---+
  • posexplode
    import pyspark.sql.functions as f
    df = df.withColumn('c1', f.split(f.col('c1'), '-'))
    df = df.select(f.posexplode(f.col('c1')), 'c2')
    df = df.withColumnRenamed('pos', 'index')
    df = df.withColumnRenamed('col', 'c1')
    df.show()
    
    +-----+---+---+
    |index| c1| c2|
    +-----+---+---+
    |    0|aaa|XXX|
    |    1|bbb|XXX|
    |    2|ccc|XXX|
    +-----+---+---+

構造化ストリーミングでウィンドウ操作

(Sliding Window Operations in Spark Structured Streaming)

  • コチラで上手く行かなかったモノのリベンジ。
    https://github.com/apache/spark/blob/master/examples/src/main/python/sql/streaming/structured_network_wordcount_windowed.py
    import sys
    
    from pyspark.sql import SparkSession
    from pyspark.sql.functions import explode
    from pyspark.sql.functions import split
    from pyspark.sql.functions import window
    
    host = "xxx.xxx.xxx.xxx"
    port = 9999
    windowSize = 10
    slideSize  = 10
    if slideSize > windowSize:
        print("<slideSize> must be less than or equal to <windowSize>", file=sys.stderr)
    windowDuration = '{} seconds'.format(windowSize)
    slideDuration = '{} seconds'.format(slideSize)
    
    spark = SparkSession\
        .builder\
        .appName("StructuredNetworkWordCountWindowed")\
        .getOrCreate()
    
    # Create DataFrame representing the stream of input lines from connection to host:port
    lines = spark\
        .readStream\
        .format('socket')\
        .option('host', host)\
        .option('port', port)\
        .option('includeTimestamp', 'true')\
        .load()
    
    # Split the lines into words, retaining timestamps
    # split() splits each line into an array, and explode() turns the array into multiple rows
    words = lines.select(
        explode(split(lines.value, ' ')).alias('word'),
        lines.timestamp
    )
    
    # Group the data by window and word and compute the count of each group
    windowedCounts = words.groupBy(
        window(words.timestamp, windowDuration, slideDuration),
        words.word
    ).count().orderBy('window')
    
    # Start running the query that prints the windowed word counts to the console
    query = windowedCounts\
        .writeStream\
        .outputMode('complete')\
        .format('console')\
        .option('truncate', 'false')\
        .start()
    
    query.awaitTermination()
  • 接続
    java.net.ConnectException?: Connection refused (Connection refused)が発生。
  • WSL / WSL2でncコマンドは使えるが、
    ・WSLでは、Windows上で動作しており、
    ・WSL2では、LinuxのVM上で動作している。
  • WSL2同士の場合、localhost(WSL2のVM)であれば通信をブリッジできる。
  • WSL2で上記のサンプルを実行する場合、
    以下の様にする事で、繋がるようになる。
  • 結果
    以下のように動作した。
    -------------------------------------------
    Batch: 4
    -------------------------------------------
    +------------------------------------------+----+-----+
    |window                                    |word|count|
    +------------------------------------------+----+-----+
    |{2021-09-29 10:27:20, 2021-09-29 10:27:30}|444 |6    |
    |{2021-09-29 10:27:20, 2021-09-29 10:27:30}|111 |6    |
    |{2021-09-29 10:27:20, 2021-09-29 10:27:30}|222 |6    |
    |{2021-09-29 10:27:20, 2021-09-29 10:27:30}|333 |6    |
    |{2021-09-29 10:27:50, 2021-09-29 10:28:00}|ccc |4    |
    |{2021-09-29 10:27:50, 2021-09-29 10:28:00}|bbb |4    |
    |{2021-09-29 10:27:50, 2021-09-29 10:28:00}|aaa |4    |
    |{2021-09-29 10:27:50, 2021-09-29 10:28:00}|ddd |4    |
    +------------------------------------------+----+-----+
    古いウィンドウが蓄積していくので、
    • ソレをどうやって削除するか?とか、
    • どうやってウィンドウ毎にファイル出力するか?

とか、そう言うのがまだ良く解らない(→ outputModeで制御するらしい)。

  • 補足
  • 一定時間で停止
    from time import sleep
    
    ...
    
    # query.awaitTermination()
    
    sleep(600) # 単位は秒らしい
    query.stop()
  • 追加モード
    追加モードでは、withWatermarkが必要。
    ただし、orderBy('window')は使えなくなる。
    # Split the lines into words, retaining timestamps
    # split() splits each line into an array, and explode() turns the array into multiple rows
    words = lines.select(
        explode(split(lines.value, ' ')).alias('word'),
        lines.timestamp
    ).withWatermark("timestamp", "1 minute")
    
    # Group the data by window and word and compute the count of each group
    windowedCounts = words.groupBy(
        window(words.timestamp, windowDuration, slideDuration),
        words.word
    ).count()
    
    # Start running the query that prints the windowed word counts to the console
    query = windowedCounts\
        .writeStream\
        .outputMode('append')\
        .format('console')\
        .option('truncate', 'false')\
        .start()
  • CSV出力
    以下で行けるはずだが、WSL2にしたら出力できなくなった。
    query = windowedCounts\
        .writeStream\
        .outputMode('append')\
        .format("parquet")\
        .option("checkpointLocation", "path/to/")\
        .option("path", "path/to/")\
        .option('truncate', 'false')\
        .start()

様々な入力元・出力先の例

  • バッチの場合
  • Parquetファイル
    コチラで説明済み(入力のみ)。
  • 構造化ストリーミングの場合
    • 入力:Scalaの入力サンプルPySpark的に書く。
      • kafka
        kafka_stream_df = spark \
          .readStream \
          .format("kafka") \
          .option("kafka.bootstrap.servers", 'hostname:10090,...,hostname:10099') \
          .option("subscribe", "test") \
          .option("startingOffsets", "earliest") \
          .option("groupId", "streamtest") \
          .load()
        Kafkaチュートリアルで作成した環境と(WSL2上のコンテナ同士を)接続できなかった(何故?)。

on Azure Databricks

リンク先で、上記のサンプルをDatabricksで動かしてみる。

参考

環境構築

構造化ストリーミング

EnsekiTT Blog

CUBE SUGAR CONTAINER

Simple is Beautiful.

Qiita

masato

PythonとSensorTag?, Kafka, Spark Streamingのストリーム処理

Stack Overflow


トップ   編集 凍結 差分 バックアップ 添付 複製 名前変更 リロード   新規 一覧 単語検索 最終更新   ヘルプ   最終更新のRSS
Last-modified: 2021-10-19 (火) 13:43:14 (4d)