Spark Dataframe commands Flashcards
Describe a dataframe in your own words
Dataframe is like a table with rows and columns
How do you read a table from Hive into Spark dataframe with select statement- Spark 2.6
spark.sql(“Select * from db.mytable”)
How do you read a table from Hive into Spark dataframe without select statement- Spark 2.6
spark.table(“db.myTable”)
How to display a dataframe
df.show()
How to display exactly 100 rows of a dataframe
df.show(100)
Why do we pass True/False in show
True expands the columns and False compresses the columns during show
Select specific columns
df.select(‘col1’,’col2’,’col3’)
Can I pass a list of columns within the select statement
Yes. df.select([‘col1’, ‘col2’,’col3’])
How do I change the column name without using withColumnRenamed?
df.selectExpr(“col1”,”col2 as test_column”)
How do I pull specific rows from a dataframe - For example where a certain column in my dataframe is true
df.filter(“col1 = True”)
import statement to import functions
from pyspark.sql import functions as func
Get the total number of records in a dataframe
df.count()
How do I get the count of distinct values in a column?
df.dropDuplicates(“col1”).count()
What is the difference between df.dropDuplicates() and df.dropDuplicates(“col1”)
dropDuplicates() drops the duplicates in the entire dataframe and dropDuplicates(“col1”) just drops the duplicates in specific column
How do I see the schema of a dataframe
df.printSchema()
How do I see the column names along with the datatypes
df.printSchema()
How do I retrieve the columns to a python list
df.columns
df.columns() - Is this correct and what will be the output
No. The braces shouldnt be present. It throws an error
How do I drop a column from a dataframe?
df.drop(“col1”)
df.drop([“col1”,”col2”]) - Is this correct and why
yes it is correct. We can pass a list in drop function
Groupby syntax with count
df.groupBy(“col1”).agg(func.count(“col2”))
Order the rows in a dataframe on a certain column.
df.orderBy(func.asc(“col1”))
Groupby on multiple columns syntax with count
df.groupBy(“col1”,”col2”).agg(func.count(“col2”))
Order the rows in a dataframe on multiple columns.
df.orderBy(func.asc(“col1”), func.desc(“col2”))
case expression general syntax
case when col1 = ‘Y’ then ‘True’ when col1 = ‘N’ then ‘False’ else ‘NA’ end
drop multiple columns
df.drop(“col1”,”col2”)
drop duplicate values in multiple columns
dropDuplicates([“col1”, “col2”])
Create a new column in the dataframe. The new column is a flag that has true or false. If a column value is > 100 then True else false
df.withColumn(“flag”, func.expr(“case when col1 >= 100 then True else False end”))
I have a dataframe with some records. I need to flag all the records as ‘True’ before I proceed further. How do I do that
df.withColumn(“flag”, func.lit(True))
Rename a column
df.withColumnRenamed(“old_col_name”, “new_col_name”)
Two ways to rename a column
withColumnRenamed, selectExpr
Ways to create dataframe
- Reading hive tables
- Reading CSV or JSON files
- Create dataframe from list
- Create dataframe from rdd
How to read csv files into dataframe?
- df = spark.read.csv(“file.csv”)
2. df = spark.read.format(“csv”).load(“file.csv”)
Column names for this dataframe - df = spark.read.format(“csv”).load(“file.csv”)
_c0, _c1, _c2…
How to load header for csv read command
df2 = spark.read.option("header",True).csv("file.csv") df2 = spark.read.options(header = 'True').csv("file.csv")
PySpark reads all columns as a ________ data type by default
string
Read multiple csv files into a single dataframe
df = spark.read.csv(“path1,path2,path3”)
Read all CSV files from a directory into DataFrame
df = spark.read.csv(“Folder path”)
Specify a specific delimiter while reading csv
df3 = spark.read.option("delimiter",",") .csv("test.csv") df3 = spark.read.options(delimiter=',') .csv("test.csv")
How to change the default datatype read by spark from a csv
df3 = spark.read.option("inferschema", True) .csv("test.csv") df3 = spark.read.options(inferschema='True') .csv("test.csv")
Set both delimiter and inferschema
df3 = spark.read.option(“delimiter”,”,”).option(“inferschema”,True) .csv(“test.csv”)
df3 = spark.read.options(inferschema=’True’, delimiter = ‘|’) .csv(“test.csv”)
how to import datatypes
from pyspark.sql.types import *
Read custom schema - I don’t want default string schema and also I don’t want inferschema but would like to change to custom datatype
from pyspark.sql.types import *
from pyspark.sql.types import StructType,StructField, StringType, IntegerType
schema = StructType([
StructField(‘firstname’, StringType(), True),
StructField(‘middlename’, StringType(), True),
StructField(‘id’, IntegerType(), True)
])
df = spark.read.format(“csv”).option(“Header”, True).schema(schema).load(“file.csv”)
df = spark.read.option(“Header”, True).schema(schema).csv(“file.csv”)
Write a dataframe to csv file with no header
df. write.format(“csv”).option(“header”, True).save(“demo.csv”)
df. write.option(“header”, True).csv(“demo.csv”)
Modes while saving a dataframe as a file
- overwrite – mode is used to overwrite the existing file.
- append – To add the data to the existing file.
- ignore – Ignores write operation when the file already exists.
- error – This is a default option when the file already exists, it returns an error.
append mode
df.write.mode(“append”).option(“header”, True).csv(“demo.csv”)
create spark session
spark = SparkSession \
.builder \
.appName(“App1”) \
.getOrCreate()
Check the type of variable
type(df)
Read a json file
- df = spark.read.json(“file.json”)
2. df = spark.read.format(“json”).load(“file.json”)
import statement to import SparkSession
from pyspark.sql import SparkSession
[{ "RecordNumber": 2, "Zipcode": 704, "ZipCodeType": "STANDARD", "City": "PASEO COSTA DEL SUR", "State": "PR" }, { "RecordNumber": 10, "Zipcode": 709, "ZipCodeType": "STANDARD", "City": "BDA SAN LUIS", "State": "PR" }]
Read the multiline json records
df = spark.read.options(mutliline=”True”).json(“file.json”)
Read multiple json files
df = spark.read.json([“json path1”,”json path2”,”json path3”])
Read all json files in a directory
df = spark.read.json(“files/*.json”)
Pass custom schema for each of the columns for json
from pyspark.sql.types import *
from pyspark.sql.types import StructType,StructField, StringType
schema = StructType([
StructField(‘firstname’, StringType(), True),
StructField(‘middlename’, StringType(), True),
StructField(‘lastname’, StringType(), True)
])
df = spark.read.options(header = ‘True’).schema(schema).json(“file.json”)
Write a dataframe as json file
df.write.json(“file.json”)
Write a dataframe as json file - append mode
df.write.mode(“append”).json(“file.json”)
Create a dataframe using parallelize
from pyspark.sql import Row
dept = [Row(“A”,10),
Row(“B”,20),
Row(“C”,30)]
rdd = spark.sparkContext.parallelize(dept) df = rdd.toDF(col1, col2)
Create dataframe from list without using parallelize
dept = [("A",10), ("B",20), ("C",30)] col_names= ("col_1_name", "col_2_name") df = spark.createDataFrame(data = dept, schema = col_names)
I have two tables. One table has id and location. The second table has all the ids who are assigned a parking space . I need a output report of the id, location and whether parking space is allocated - is_parking_allocated (Y/N)
join
I have two tables. Table 1 - ids, location; Table 2 - ids salary. Output: ids, location and salary
join
I have two tables. Table 1 - Parts and price Table 2 - Only the parts are purchased in the last 2 months. Output: Parts, price and a flag if the parts are purchased in the last 2 monts - flag_2_months
join
Declare udf
from pyspark.sql import functions as func
def split_str("s"): return s.split("_")[1]
split_str_udf = func.udf(split_str) #udf registration
df1 = df.withColumn(“last_name”, split_str(“full_name”))
Read avro
df = spark.read.format(“avro”).load(“avro_file_path”)
save avro
df.write.format(“avro”).save(“avro_file_path”)
left outer join df1 with df2 with alias; join on id present in both tables and select two columns, one from each table
df1.alias(“a”).join(df2.alias(“b”), df1.id == df2.id, “left_outer”).select(“a.col1”,”b.col2”)
inner join df1 with df2 with alias; join on id present in both tables and select two columns, one from each table
df1.alias(“a”).join(df2.alias(“b”), df1.id == df2.id, “inner”).select(“a.col1”,”b.col2”)
right outer join df2 with df1 with alias; join on id present in both tables and select two columns, one from each table
df2.alias(“a”).join(df1.alias(“b”), df2.id == df1.id, “right_outer”).select(“df1.col1”,”df2.col2”)