Transformation Functions Flashcards
Access the File Systems
%fs ls
%fs ls /some_folder_name
dbutils.fs.ls(“”)
Create Table/DataFrame
- from %sql
- from spark.sql()
- if you have an RDD, list, or pandas dataframe
- return a table as a dataframe
- create temporary view from Dataframe
- CREATE TABLE IF NOT EXISTS
- df = spark.sql(SELECT * FROM table_name)
- spark.createDataFrame()
- spark.table()
- df.createOrReplaceTempView(“”)
Select/Filter
sql & python/pyspark
SELECT * FROM table WHERE column = value
df. filter(df.column = value)
df. filter(“column = ‘value’”)
Sql Group By in Pyspark
df.groupBy()
DataFrame Reader Function
just parquest reading example
spark.read.parquet(“filepath”)
DataFrame Writer Function
example
df.write.option() \
.format() \
.mode() \
.parquet(outPath)
Accessing Column in PySpark
3 different way
df[“column_name”]
df. column_name
df. col(“column_name”)
Column Operators
and/or, math, equality
&, | : and, or
*, +, = : math and comparison operators
==, != : equal, not equal
Column Methods
(4 examples)
(think about sorting ,changing data types, changing column names, NULL)
.alias() : column alias
.cast(), .astype(): cast column to different data type
isNull(), isNotNull(): is null, not null
.asc(), .desc(): ascending/decending
read in a file
what are the 5 components of the method string?
df = spark.read \
.option(“sep”, “,”) \ # the seperator
.option(“header”, True) \ # first row = header
.option(“inferSchema”, True) \ # what schema
.csv(productsCsvPath) # csv path
define the schema
(function used and components)
(read with user defined schema, what changes?)
StructType([
StructField(“col_name”, data_type, True),
StructField(“full_name”, LongType(), True),
StructField(“email”, StringType(), True)
])
——————————————————————-
.option(“inferSchema”, True) changes to
.schema(userDefinedSchema)
DataFrame Action Methods
(4)
(remember viewing data, calculations, summaries, collections)
.show(), .head(), .take():: display n num. rows
.count()
.describe(), .summary(): basic stats.
.collect(): array of all rows in the DataFrame
Python Row Methods
(5)
(think about indexing, accessing fields, returning values in another format, and count)
.index(): return first index of values .count() asDict(): return as dictionary row.key: access fields like attributes row["key"]: access fields like dictionary values
Aggregation methods (5)
(what methods usually precedes?)
(think about common calculations)
groupBy() usually precedes
.agg(), .sum() .avg(), .mean() .count() .max(), .min() .pivot()
Some of the Pyspark Built In Aggregate Functions
(9)
(remember sum. stats. & summaries of data subsets )
Remember these are functions not methods, so you imbed these within a method or another function do not chain.
approx_count_distinct(): approx. num. of distinct items
collect_list(): return list obj. w/ dups.
stddev_samp(): sample standard deviation
var_pop(): pop. variance
collect_set(col): returns a set of objects with duplicate elements eliminated. # important one to remember because not a collect function or method
sumDistinct()
avg(); mean()
corr()
max()
Pyspark Built in Math Functions (4)
think about largest value in a column, log transformations, rounding, square root
ceil(): ceiling of a given column
log()
round()
sqrt()
Date Time Functions (6)
date_format: convert date/time to string
add_months: returns date numMonths after startDate
dayofweek: day of the week as integer
from_unixtime: covert num. of sec. from unix epoch to string
minute: extracts minutes as integer
unix_timestamp: converts time string to Unix timestamp
String Functions (5)
remember regular expressions, SQL functions
translate: converts to charters in search to replacement charters specified in function
regexp_replace: replace substring to match regular expression string
regexp_extract / Itrim: extracts groups from string that matches JAVA regular expresion
lower, upper: coverts string to lower case string
split: splits string based on a pattern
Collection Functions (4)
functions that deal with arrays
- array_contains(col, value): returns true/false based on a condition
- explode(col): creates a new row for each element in a given array or map column
- slice(x, start, length): returns an array containing all the elements in x from index start with specified length. Creates a subset of an array with a start position and number of elements.
x - the array to be sliced
- element_at(col, extraction): Returns element of array at given index in extraction if col is array. Creates a subset of an array with a start position.
extraction - index (number) to check for in the array
Properties of UDFs(3)
- can’t be optimized by the catalyst optimizer
- functions must be serialized and sent to executors
- additional overhead from python interpreter on executors running Python UDF
coalesce function
what is it used for, when is it used
repartitions a dataframe
made for narrow transformations (no shuffling) and can only be used to reduce the number of partitions
repartition function
what is it used for, when is it used
repartitions a dataframe
made for wide transformations and can be used to reduce or increase the number of partitions
stream query function example
5 components
spark.readStream
.filter(col(“col_name”) == value)
.groupBy(“col_name”).aggregate_function()
.writeStream
.start()
Other Functions (2)
column function and literals
col() : returns a Column based on the given column name. Used often with other functions to identify the column on which the function will be used.
lit() : Creates a Column of literal values (values defined by the function). lit(5).alias(‘height’) returns a column named height with cell values of five.
Method to cache a DataFrame
df.cache().count()
cache() does not immediately materialize the data in cache. An action using the DataFrame must be executed for Spark to actually cache the data
Method to remove a cached DataFrame
what should you do when you no longer are using cache?
df.unpersist()
As a best practice, you should always evict your DataFrames from cache when you no longer need them.
.explain() method
what does it do?
prints the logical & physical plans of queries
Get number of DataFrame partitions?
Get number of cores or slots?
df.rdd.getNumPartitions()
print(spark.sparkContext.defaultParallelism)
Repartition DataFrame?
Coalesce DataFrame?
repartitionedDF = df.repartition(num_of_partitions) coalesceDF = df.coalesce(num_of_partitions)
Get number of shuffle partitions?
Set number of shuffle partitions?
Set default shuffle partitions to number of cores in cluster?
spark. conf.get(“spark.sql.shuffle.partitions”)
spark. conf.set(“spark.sql.shuffle.partitions”, “8”)
spark. conf.set(“spark.sql.default.parallelism”, sc.defaultParallelism)
Determine if AQE is enabled?
spark.conf.get(“spark.sql.adaptive.enabled”)
Check to see if you data is streaming?
Get status of streaming query?
Stop streaming query?
df. isStreaming
df. status
df. stop()
Write streaming query results to parquet:
- Configure streaming query to write out to parquet in “append” mode
- Set query name to “xxxxx”
- Set a trigger interval of x second
- Set checkpoint location to xxxxPath
- Set output filepath to xxxOutputPath
df.writeStream .outputMode("append") .format("parquet") .queryName("xxxxx") .trigger(processingTime = "x second") .option("checkpointLocation", xxxxPath) .start(xxOutputPath)
Call SQL query within Spark?
spark.sql(“”” multi-line SQL statement “””)
%sql
List of all active streaming queries?
Then stop those queries.
for i in spark.streams.active:
print(i.name)
i.stop()
Variant of select that accepts SQL expression?
What can you add to the select statement?
df.selectExpr(“column_name
as new_column_name
”)
you can add any valid non-aggregating SQL statements
What are backtick ( ` ) characters used for?
used for refencing a column within an expression
df.selectExpr(“column_name
as new_column_name
”)
Get unique rows
Get unique rows then count
df.select().distinct().count()
Get sample of data frame?
Get random split?
df. sample(withReplacement, fraction, seed)
df. randomSplit(array[0.25, 0.75], seed)
Appending Rows
(by position and by name)
(one function does not take into account the position of the columns in the second dataframe)
df. union(secondDF) : union by position
df. unionByName(secondDF): union by name
Sorting Rows
df. sort()
df. orderBy
df. desc()
df. acs()
What is the map() function?
The map() function takes another function and applies it to each value in the identified column. It must be grouped as a key-value pair map(key1, value1…)
Exploding maps turns them into columns.
Window Functions
What are the 3 types of window functions?
Performs an aggregation on a specified “window” (subset) of columns.
3 types of window functions:
- ranking
- analytic
- aggregation
example:
windowSpec = Window.partitionBy(“cust_id”, “date”)
.rowsBetween()
maxPurchase = max(col(“quantity”)).over(windowSpec)
.mode()
definition
(usually follows what)
(4 parameter options, there is only one parameter)
specifies the behavior that should be applied to data or a table that already exist
usually preceded by a .write method
parameter options:
- ‘append’
- ‘overwrite’
- ‘error’ : throws exception if data already exists
- ‘ignore’ : ignores operation if data already exist
Define what each method does:
1) .format()
2) .parquet()
3) .csv()
4) .json()
5) .save()
6) .saveAsTable()
1) specifies the read, write format of the file
2) loads or saves to a parquet file, include path to file
3) load file in csv format, include path to file
4) load file in json format, include path to file
5) save file, include path to file, combine with .format() to specify the type of format to save the file too.
6) save as a global table, include the name
Common key value pairs used with .option()
6
.option(“header”, True)
.option(“sep”, “\t”)
.option(“inferSchema”, True)
.option(“checkpointLocation”, path)
.option(“maxFilesPerTrigger”, num_of_files)
.option(“numPartitions”, num_of_partitions) # max part.
.withColumn()
.withColumnRenamed()
creates a new column based on the conditions in the column expression
.withColumn(“new_col”, col(“old_col”).cast(“int”))
renames the column (existing_col, new_col)
.join() parameters (3)
joins supported 8
(5 optional)
.join(other = , on = , how = )
join is a method attached to the end of the first dataframe, other refers to the other dataframe to join on a string of the column name
joins supported:
inner, cross, outer, full, left, right, semi, anti
optional wording additions: full [outer] left [outer] right [outer] [left] semi [left] anti
- Grouping Sets
- Rollups
- Cubes
- grouping sets allow for groupBy with only a subset of columns; only available in SQL statement
- grouping sets for DFs
- applies aggregate expressions to all possible combinations of the grouping set in a DF
Semi Join
Anti Join
Cross
(def.)
semi join: like a filter, from the left table, looks to see if same values are in the right, only pulls in matching records
anti join: like a NOT IN statement, from the left table, looks to see if same values are in the right, only pulls in
NON matching records
cross join: creates a Cartesian product, every combination of left and right table rows
add a watermark
.withWatermark(eventTime = , delayThreshold = )
both options accept strings
method to specify how long the system waits for late events
.fillna() parameters
fillna(value, subset=[])