PySpark tutorials Flashcards

https://sparkbyexamples.com/pyspark-tutorial/

1
Q

データフレームの作成

A

https://sparkbyexamples.com/pyspark/pyspark-concatenate-columns/

createDataFrame() を使用する

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName(‘SparkByExamples.com’).getOrCreate()

data = [(‘James’,’’,’Smith’,’1991-04-01’,’M’,3000),
(‘Michael’,’Rose’,’’,’2000-05-19’,’M’,4000),
(‘Robert’, ‘’, ‘Williams’, ‘1978-09-05’, ‘M’, 4000),
(‘Maria’, ‘Anne’, ‘Jones’, ‘1967-12-01’, ‘F’, 4000),
(‘Jen’, ‘Mary’, ‘Brown’, ‘1980-02-17’, ‘F’, -1)
]

columns = [“firstname”,”middlename”,”lastname”,”dob”,”gender”,”salary”]

df = spark.createDataFrame(data=data, schema = columns)

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
2
Q

データフレームの表示

A

df.printSchema()
df.show()

+———+———-+——–+———-+——+——+
|firstname|middlename|lastname|dob |gender|salary|
+———+———-+——–+———-+——+——+
|James | |Smith |1991-04-01|M |3000 |
|Michael |Rose | |2000-05-19|M |4000 |
|Robert |
+———+———-+——–+———-+——+——+

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
3
Q

外部データソースからのデータフレーム

A

df = spark.read.csv(“/tmp/resources/zipcodes.csv”)
df.printSchema()

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
4
Q

一時テーブルを作成

A

df.createOrReplaceTempView(“PERSON_DATA”)

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
5
Q

SQLクエリの実行

A

df.createOrReplaceTempView(“PERSON_DATA”)

groupDF = spark.sql(“SELECT gender, count(*) from PERSON_DATA group by gender”)

groupDF.show()

+——+——–+
|gender|count(1)|
+——+——–+
| M| 3|
+——+——–+

F| 2|

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
6
Q

TCPソケットからのストリーミング

A

df = spark.readStream
.format(“socket”)
.option(“host”,”localhost”)
.option(“port”,”9090”)
.load()

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
7
Q

ソケットから読み取ったデータの出力

A

df.printSchema()
root
|– value: string (nullable = true)

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
8
Q

コンソールにストリーミング

A

query = count.writeStream
.format(“console”)
.outputMode(“complete”)
.start()
.awaitTermination()

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
9
Q

カフカからのストリーミング

A

df = spark.readStream
.format(“kafka”)
.option(“kafka.bootstrap.servers”, “192.168.1.100:9092”)
.option(“subscribe”, “json_topic”)
.option(“startingOffsets”, “earliest”) // From starting
.load()

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
10
Q

Kafka の別のトピックにメッセージを書き込む

A

writeStream()

df.selectExpr(“CAST(id AS STRING) AS key”, “to_json(struct(*)) AS value”)
.writeStream
.format(“kafka”)
.outputMode(“append”)
.option(“kafka.bootstrap.servers”, “192.168.1.100:9092”)
.option(“topic”, “josn_data_topic”)
.start()
.awaitTermination()

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
11
Q

列を選択する

A

df.select(df[“gender”]).show()

select()

df.select(df.gender).show()

from pyspark.sql.functions import col
# df.select(col(“gender”)).show()

+——+
|gender|
+——+
| M|
| M|
| F|
| F|
+——+

M|

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
12
Q

データセットのすべての要素を (すべてのノードから) ドライバー ノードに取得する。

A

collect()
print(groupDF.collect())

[Row(gender=’M’, count(1)=3), Row(gender=’F’, count(1)=2)]

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
13
Q

1) DataType を変更
2) 値を更新
3) 既存の列から列を作成
4) 新しい列を追加
5) 列名の変更
6) 列を削除

A

withColumn()
1) DataType を変更
df.withColumn(“salary”,col(“salary”).cast(“Integer”)).show()

2) 値を更新
df.withColumn(“salary”,col(“salary”)*100).show()

3) 既存の列から列を作成
df.withColumn(“CopiedColumn”,col(“salary”)* -1).show()
# 「salary」列に値 -1 を乗算して、新しい列「CopiedColumn」を作成

4) 新しい列を追加
df.withColumn(“Country”, lit(“USA”)).show()
df.withColumn(“Country”, lit(“USA”)) \
.withColumn(“anotherColumn”,lit(“anotherValue”)) \
.show()

5) 列名の変更
df.withColumnRenamed(“gender”,”sex”) \
.show(truncate=False)

6) 列を削除
df.drop(“salary”) \
.show()

How well did you know this?
1
Not at all
2
3
4
5
Perfectly