Transformation Functions Flashcards

1
Q

Access the File Systems

A

%fs ls
%fs ls /some_folder_name
dbutils.fs.ls(“”)

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
2
Q

Create Table/DataFrame

  1. from %sql
  2. from spark.sql()
  3. if you have an RDD, list, or pandas dataframe
  4. return a table as a dataframe
  5. create temporary view from Dataframe
A
  1. CREATE TABLE IF NOT EXISTS
  2. df = spark.sql(SELECT * FROM table_name)
  3. spark.createDataFrame()
  4. spark.table()
  5. df.createOrReplaceTempView(“”)
How well did you know this?
1
Not at all
2
3
4
5
Perfectly
3
Q

Select/Filter

sql & python/pyspark

A

SELECT * FROM table WHERE column = value

df. filter(df.column = value)
df. filter(“column = ‘value’”)

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
4
Q

Sql Group By in Pyspark

A

df.groupBy()

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
5
Q

DataFrame Reader Function

just parquest reading example

A

spark.read.parquet(“filepath”)

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
6
Q

DataFrame Writer Function

example

A

df.write.option() \
.format() \
.mode() \
.parquet(outPath)

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
7
Q

Accessing Column in PySpark

3 different way

A

df[“column_name”]

df. column_name
df. col(“column_name”)

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
8
Q

Column Operators

and/or, math, equality

A

&, | : and, or
*, +, = : math and comparison operators
==, != : equal, not equal

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
9
Q

Column Methods
(4 examples)
(think about sorting ,changing data types, changing column names, NULL)

A

.alias() : column alias
.cast(), .astype(): cast column to different data type
isNull(), isNotNull(): is null, not null
.asc(), .desc(): ascending/decending

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
10
Q

read in a file

what are the 5 components of the method string?

A

df = spark.read \
.option(“sep”, “,”) \ # the seperator
.option(“header”, True) \ # first row = header
.option(“inferSchema”, True) \ # what schema
.csv(productsCsvPath) # csv path

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
11
Q

define the schema
(function used and components)
(read with user defined schema, what changes?)

A

StructType([
StructField(“col_name”, data_type, True),
StructField(“full_name”, LongType(), True),
StructField(“email”, StringType(), True)
])
——————————————————————-
.option(“inferSchema”, True) changes to
.schema(userDefinedSchema)

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
12
Q

DataFrame Action Methods
(4)
(remember viewing data, calculations, summaries, collections)

A

.show(), .head(), .take():: display n num. rows
.count()
.describe(), .summary(): basic stats.
.collect(): array of all rows in the DataFrame

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
13
Q

Python Row Methods
(5)
(think about indexing, accessing fields, returning values in another format, and count)

A
.index(): return first index of values
.count()
asDict(): return as dictionary
row.key: access fields like attributes
row["key"]: access fields like dictionary values
How well did you know this?
1
Not at all
2
3
4
5
Perfectly
14
Q

Aggregation methods (5)
(what methods usually precedes?)
(think about common calculations)

A

groupBy() usually precedes

.agg(), .sum()
.avg(), .mean()
.count()
.max(), .min()
.pivot()
How well did you know this?
1
Not at all
2
3
4
5
Perfectly
15
Q

Some of the Pyspark Built In Aggregate Functions
(9)
(remember sum. stats. & summaries of data subsets )

A

Remember these are functions not methods, so you imbed these within a method or another function do not chain.

approx_count_distinct(): approx. num. of distinct items

collect_list(): return list obj. w/ dups.

stddev_samp(): sample standard deviation

var_pop(): pop. variance

collect_set(col): returns a set of objects with duplicate elements eliminated. # important one to remember because not a collect function or method

sumDistinct()
avg(); mean()
corr()
max()

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
16
Q

Pyspark Built in Math Functions (4)

think about largest value in a column, log transformations, rounding, square root

A

ceil(): ceiling of a given column
log()
round()
sqrt()

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
17
Q

Date Time Functions (6)

A

date_format: convert date/time to string

add_months: returns date numMonths after startDate

dayofweek: day of the week as integer

from_unixtime: covert num. of sec. from unix epoch to string

minute: extracts minutes as integer

unix_timestamp: converts time string to Unix timestamp

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
18
Q

String Functions (5)

remember regular expressions, SQL functions

A

translate: converts to charters in search to replacement charters specified in function

regexp_replace: replace substring to match regular expression string

regexp_extract / Itrim: extracts groups from string that matches JAVA regular expresion

lower, upper: coverts string to lower case string

split: splits string based on a pattern

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
19
Q

Collection Functions (4)

functions that deal with arrays

A
  1. array_contains(col, value): returns true/false based on a condition
  2. explode(col): creates a new row for each element in a given array or map column
  3. slice(x, start, length): returns an array containing all the elements in x from index start with specified length. Creates a subset of an array with a start position and number of elements.

x - the array to be sliced

  1. element_at(col, extraction): Returns element of array at given index in extraction if col is array. Creates a subset of an array with a start position.

extraction - index (number) to check for in the array

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
20
Q

Properties of UDFs(3)

A
  1. can’t be optimized by the catalyst optimizer
  2. functions must be serialized and sent to executors
  3. additional overhead from python interpreter on executors running Python UDF
How well did you know this?
1
Not at all
2
3
4
5
Perfectly
21
Q

coalesce function

what is it used for, when is it used

A

repartitions a dataframe

made for narrow transformations (no shuffling) and can only be used to reduce the number of partitions

22
Q

repartition function

what is it used for, when is it used

A

repartitions a dataframe

made for wide transformations and can be used to reduce or increase the number of partitions

23
Q

stream query function example

5 components

A

spark.readStream
.filter(col(“col_name”) == value)
.groupBy(“col_name”).aggregate_function()
.writeStream
.start()

24
Q

Other Functions (2)

column function and literals

A

col() : returns a Column based on the given column name. Used often with other functions to identify the column on which the function will be used.

lit() : Creates a Column of literal values (values defined by the function). lit(5).alias(‘height’) returns a column named height with cell values of five.

25
Q

Method to cache a DataFrame

A

df.cache().count()

cache() does not immediately materialize the data in cache. An action using the DataFrame must be executed for Spark to actually cache the data

26
Q

Method to remove a cached DataFrame

what should you do when you no longer are using cache?

A

df.unpersist()

As a best practice, you should always evict your DataFrames from cache when you no longer need them.

27
Q

.explain() method

what does it do?

A

prints the logical & physical plans of queries

28
Q

Get number of DataFrame partitions?

Get number of cores or slots?

A

df.rdd.getNumPartitions()

print(spark.sparkContext.defaultParallelism)

29
Q

Repartition DataFrame?

Coalesce DataFrame?

A
repartitionedDF = df.repartition(num_of_partitions)
coalesceDF = df.coalesce(num_of_partitions)
30
Q

Get number of shuffle partitions?

Set number of shuffle partitions?

Set default shuffle partitions to number of cores in cluster?

A

spark. conf.get(“spark.sql.shuffle.partitions”)
spark. conf.set(“spark.sql.shuffle.partitions”, “8”)
spark. conf.set(“spark.sql.default.parallelism”, sc.defaultParallelism)

31
Q

Determine if AQE is enabled?

A

spark.conf.get(“spark.sql.adaptive.enabled”)

32
Q

Check to see if you data is streaming?
Get status of streaming query?
Stop streaming query?

A

df. isStreaming
df. status
df. stop()

33
Q

Write streaming query results to parquet:

  1. Configure streaming query to write out to parquet in “append” mode
  2. Set query name to “xxxxx”
  3. Set a trigger interval of x second
  4. Set checkpoint location to xxxxPath
  5. Set output filepath to xxxOutputPath
A
df.writeStream
   .outputMode("append")
   .format("parquet")
   .queryName("xxxxx")
   .trigger(processingTime = "x second")
   .option("checkpointLocation", xxxxPath)
  .start(xxOutputPath)
34
Q

Call SQL query within Spark?

A

spark.sql(“”” multi-line SQL statement “””)

%sql

35
Q

List of all active streaming queries?

Then stop those queries.

A

for i in spark.streams.active:
print(i.name)
i.stop()

36
Q

Variant of select that accepts SQL expression?

What can you add to the select statement?

A

df.selectExpr(“column_name as new_column_name”)

you can add any valid non-aggregating SQL statements

37
Q

What are backtick ( ` ) characters used for?

A

used for refencing a column within an expression

df.selectExpr(“column_name as new_column_name”)

38
Q

Get unique rows

Get unique rows then count

A

df.select().distinct().count()

39
Q

Get sample of data frame?

Get random split?

A

df. sample(withReplacement, fraction, seed)

df. randomSplit(array[0.25, 0.75], seed)

40
Q

Appending Rows
(by position and by name)
(one function does not take into account the position of the columns in the second dataframe)

A

df. union(secondDF) : union by position

df. unionByName(secondDF): union by name

41
Q

Sorting Rows

A

df. sort()
df. orderBy
df. desc()
df. acs()

42
Q

What is the map() function?

A

The map() function takes another function and applies it to each value in the identified column. It must be grouped as a key-value pair map(key1, value1…)

Exploding maps turns them into columns.

43
Q

Window Functions

What are the 3 types of window functions?

A

Performs an aggregation on a specified “window” (subset) of columns.

3 types of window functions:

  1. ranking
  2. analytic
  3. aggregation

example:
windowSpec = Window.partitionBy(“cust_id”, “date”)
.rowsBetween()

maxPurchase = max(col(“quantity”)).over(windowSpec)

44
Q

.mode()

definition
(usually follows what)
(4 parameter options, there is only one parameter)

A

specifies the behavior that should be applied to data or a table that already exist

usually preceded by a .write method

parameter options:

  1. ‘append’
  2. ‘overwrite’
  3. ‘error’ : throws exception if data already exists
  4. ‘ignore’ : ignores operation if data already exist
45
Q

Define what each method does:

1) .format()
2) .parquet()
3) .csv()
4) .json()
5) .save()
6) .saveAsTable()

A

1) specifies the read, write format of the file
2) loads or saves to a parquet file, include path to file
3) load file in csv format, include path to file
4) load file in json format, include path to file
5) save file, include path to file, combine with .format() to specify the type of format to save the file too.
6) save as a global table, include the name

46
Q

Common key value pairs used with .option()

6

A

.option(“header”, True)
.option(“sep”, “\t”)
.option(“inferSchema”, True)
.option(“checkpointLocation”, path)
.option(“maxFilesPerTrigger”, num_of_files)
.option(“numPartitions”, num_of_partitions) # max part.

47
Q

.withColumn()

.withColumnRenamed()

A

creates a new column based on the conditions in the column expression
.withColumn(“new_col”, col(“old_col”).cast(“int”))

renames the column (existing_col, new_col)

48
Q

.join() parameters (3)

joins supported 8
(5 optional)

A

.join(other = , on = , how = )

join is a method attached to the end of the first dataframe, other refers to the other dataframe to join on a string of the column name

joins supported:
inner, cross, outer, full, left, right, semi, anti

optional wording additions:
full [outer]
left [outer]
right [outer]
[left] semi
[left] anti
49
Q
  1. Grouping Sets
  2. Rollups
  3. Cubes
A
  1. grouping sets allow for groupBy with only a subset of columns; only available in SQL statement
  2. grouping sets for DFs
  3. applies aggregate expressions to all possible combinations of the grouping set in a DF
50
Q

Semi Join
Anti Join
Cross

(def.)

A

semi join: like a filter, from the left table, looks to see if same values are in the right, only pulls in matching records

anti join: like a NOT IN statement, from the left table, looks to see if same values are in the right, only pulls in
NON matching records

cross join: creates a Cartesian product, every combination of left and right table rows

51
Q

add a watermark

A

.withWatermark(eventTime = , delayThreshold = )

both options accept strings

method to specify how long the system waits for late events

52
Q

.fillna() parameters

A

fillna(value, subset=[])