PySpark tutorials Flashcards
https://sparkbyexamples.com/pyspark-tutorial/
データフレームの作成
https://sparkbyexamples.com/pyspark/pyspark-concatenate-columns/
createDataFrame() を使用する
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName(‘SparkByExamples.com’).getOrCreate()
data = [(‘James’,’’,’Smith’,’1991-04-01’,’M’,3000),
(‘Michael’,’Rose’,’’,’2000-05-19’,’M’,4000),
(‘Robert’, ‘’, ‘Williams’, ‘1978-09-05’, ‘M’, 4000),
(‘Maria’, ‘Anne’, ‘Jones’, ‘1967-12-01’, ‘F’, 4000),
(‘Jen’, ‘Mary’, ‘Brown’, ‘1980-02-17’, ‘F’, -1)
]
columns = [“firstname”,”middlename”,”lastname”,”dob”,”gender”,”salary”]
df = spark.createDataFrame(data=data, schema = columns)
データフレームの表示
df.printSchema()
df.show()
+———+———-+——–+———-+——+——+
|firstname|middlename|lastname|dob |gender|salary|
+———+———-+——–+———-+——+——+
|James | |Smith |1991-04-01|M |3000 |
|Michael |Rose | |2000-05-19|M |4000 |
|Robert |
+———+———-+——–+———-+——+——+
外部データソースからのデータフレーム
df = spark.read.csv(“/tmp/resources/zipcodes.csv”)
df.printSchema()
一時テーブルを作成
df.createOrReplaceTempView(“PERSON_DATA”)
SQLクエリの実行
df.createOrReplaceTempView(“PERSON_DATA”)
groupDF = spark.sql(“SELECT gender, count(*) from PERSON_DATA group by gender”)
groupDF.show()
+——+——–+
|gender|count(1)|
+——+——–+
| M| 3|
+——+——–+
F| 2|
TCPソケットからのストリーミング
df = spark.readStream
.format(“socket”)
.option(“host”,”localhost”)
.option(“port”,”9090”)
.load()
ソケットから読み取ったデータの出力
df.printSchema()
root
|– value: string (nullable = true)
コンソールにストリーミング
query = count.writeStream
.format(“console”)
.outputMode(“complete”)
.start()
.awaitTermination()
カフカからのストリーミング
df = spark.readStream
.format(“kafka”)
.option(“kafka.bootstrap.servers”, “192.168.1.100:9092”)
.option(“subscribe”, “json_topic”)
.option(“startingOffsets”, “earliest”) // From starting
.load()
Kafka の別のトピックにメッセージを書き込む
writeStream()
df.selectExpr(“CAST(id AS STRING) AS key”, “to_json(struct(*)) AS value”)
.writeStream
.format(“kafka”)
.outputMode(“append”)
.option(“kafka.bootstrap.servers”, “192.168.1.100:9092”)
.option(“topic”, “josn_data_topic”)
.start()
.awaitTermination()
列を選択する
df.select(df[“gender”]).show()
select()
df.select(df.gender).show()
from pyspark.sql.functions import col
# df.select(col(“gender”)).show()
+——+
|gender|
+——+
| M|
| M|
| F|
| F|
+——+
M|
データセットのすべての要素を (すべてのノードから) ドライバー ノードに取得する。
collect()
print(groupDF.collect())
[Row(gender=’M’, count(1)=3), Row(gender=’F’, count(1)=2)]
1) DataType を変更
2) 値を更新
3) 既存の列から列を作成
4) 新しい列を追加
5) 列名の変更
6) 列を削除
withColumn()
1) DataType を変更
df.withColumn(“salary”,col(“salary”).cast(“Integer”)).show()
2) 値を更新
df.withColumn(“salary”,col(“salary”)*100).show()
3) 既存の列から列を作成
df.withColumn(“CopiedColumn”,col(“salary”)* -1).show()
# 「salary」列に値 -1 を乗算して、新しい列「CopiedColumn」を作成
4) 新しい列を追加
df.withColumn(“Country”, lit(“USA”)).show()
df.withColumn(“Country”, lit(“USA”)) \
.withColumn(“anotherColumn”,lit(“anotherValue”)) \
.show()
5) 列名の変更
df.withColumnRenamed(“gender”,”sex”) \
.show(truncate=False)
6) 列を削除
df.drop(“salary”) \
.show()