ELT with Apache Spark Flashcards

1
Q

Extract data from a single file and a directory of files (sql)

A

SELECT * FROM file_format./path/to/file

SELECT * FROM json.${DA.paths.kafka_events}

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
2
Q

Create a view and/or temporary view from a reference to a file. What are temporary views? (sql)

A

CREATE OR REPLACE (TEMP) VIEW event_view
AS SELECT * FROM json.${DA.paths.kafka_events}

Temporary views existing only for the current SparkSession, so on Databricks they are isolated to the current notebook, job or DBSQL query.

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
3
Q

Create a Common Table Expression (CTE) from a reference to a file. What are CTEs? (sql)

A

WITH cte_json
AS (SELECT * FROM json.${DA.paths.kafka_events})
SELECT * FROM cte_json

CTEs only alias the results of a query while that query is being planned an executed. They cannot be used after the query has finished.

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
4
Q

Register a Table on External Data with read options (sql)

A

CREATE TABLE table_identifier (col_name1 col_type1, …)
USING data_source
OPTIONS (key1 = val1, key2 = val2, …)
LOCATION = path

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
5
Q

Limits of Tables with External Data Sources

A
  • When querying external data sources, we cannot expect the performance guarantees associated with Delta Lake and Lakehouse
  • E.g. Delta Lake tables guarantee you’re always querying the latest version of your data, but external sources may be referencing an older cached version.
How well did you know this?
1
Not at all
2
3
4
5
Perfectly
6
Q

Extract Data from a SQL Database (sql)

A

CREATE TABLE
USING JDBC
OPTIONS (
url = “jdbc:{databaseServerType}://{jdbcHostname}:{jdbcPort}”,
dbtable = “{jdbcDatabase}.table”,
user = “{jdbcUsername}”,
password = “{jdbcPassword}”
)

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
7
Q

What are the two ways we can count null values in a specific field? (sql)

A

SELECT count_if(field_x IS NULL) FROM table1;
SELECT count(*) FROM table1 WHERE field_x IS NULL;

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
8
Q

What is the difference between count(col) and count(*)? (sql)

A
  • count(col) skips NULL values when counting specific columns or expressions
  • count(*) is a special case that counts the total number of rows (including rows that are only NULL)
How well did you know this?
1
Not at all
2
3
4
5
Perfectly
9
Q

Return the count of non-null rows for a field (python)

A

from pyspark.sql.functions import col
usersDF = spark.read.table(“users_dirty”)

usersDF.selectExpr(“count_if(email IS NULL)”)
usersDF.where(col(“email”).isNull()).count()

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
10
Q

Dedupe an entire table (sql & python)

A

%sql
SELECT DISTINCT(*) FROM table1

%python
usersDF.distinct().display()

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
11
Q

Dedupe rows based on a specific column (sql)

A

CREATE OR REPLACE TEMP VIEW deduped_users AS
SELECT user_id, user_first_touch_timestamp, max(email) AS email, max(updated) AS updated
FROM users_dirty
WHERE user_id IS NOT NULL
GROUP BY user_id, user_first_touch_timestamp;

SELECT count(*) FROM deduped_users

In the above, we use the aggregate function max as a hack to keep the fields “email” and “updated” and capture non-null rows when multiple records are present

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
12
Q

Dedupe rows based on a specific column (python)

A

from pyspark.sql.functions import max
dedupedDF = (usersDF
.where(col(“user_id”).isNotNull())
.groupBy(“user_id”, “user_first_touch_timestamp”)
.agg(max(“email”).alias(“email”),
max(“updated”).alias(“updated”))
)

dedupedDF.count()

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
13
Q

Validate that the primary key is unique across all rows (sql)

A

SELECT max(row_count) <= 1 no_duplicate_ids FROM (
SELECT user_id, count(*) AS row_count
FROM deduped_users
GROUP BY user_id)

(response should be one column “no_duplicate_ids” with one value “true”)

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
14
Q

Validate that the primary key is unique across all rows (python)

A

from pyspark.sql.functions import count

display(dedupedDF
.groupBy(“user_id”)
.agg(count(“*”).alias(“row_count”))
.select((max(“row_count”) <= 1).alias(“no_duplicate_ids”)))

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
15
Q

Validate that each “email” field is associated with at most one “user_id” (sql)

A

SELECT max(user_id_count) <= 1 at_most_one_id FROM (
SELECT email, count(user_id) AS user_id_count
FROM deduped_users
WHERE email IS NOT NULL
GROUP BY email)

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
16
Q

Validate that each “email” field is associated with at most one “user_id” (python)

A

display(dedupedDF
.where(col(“email”).isNotNull())
.groupby(“email”)
.agg(count(“user_id”).alias(“user_id_count”))
.select((max(“user_id_count”) <= 1).alias(“at_most_one_id”)))

17
Q

Cast a column to a timestamp, and extract calendar data from the new field, as well as email domain from the “email” field (sql)

A

The functions “date_format()” and “regexp_extract()” can be used:

SELECT *,
date_format(first_touch, “MMM d, yyyy”) AS first_touch_date,
date_format(first_touch, “HH:mm:ss”) AS first_touch_time,
regexp_extract(email, “(?<=@).+”, 0) AS email_domain
FROM (
SELECT *,
CAST(user_first_touch_timestamp / 1e6 AS timestamp) AS first_touch
FROM deduped_users
)

18
Q

Cast a column to a timestamp, and extract calendar data from the new field, as well as email domain from the “email” field (python)

A

from pyspark.sql.functions import date_format, regexp_extract

display(dedupedDF
.withColumn(“first_touch”, (col(“user_first_touch_timestamp”) / 1e6).cast(“timestamp”))
.withColumn(“first_touch_date”, date_format(“first_touch”, “MMM d, yyyy”))
.withColumn(“first_touch_time”, date_format(“first_touch”, “HH:mm:ss”))
.withColumn(“email_domain”, regexp_extract(“email”, “(?<=@).+”, 0))
)

19
Q

How can we reference nested fields in tables?

A

Use : syntax in queries to access subfields in JSON strings
Use . syntax in queries to access subfields in struct types

20
Q

Describe 5 common array functions

A
  1. explode() - separates the elements of an array into multiple rows, creating a new row for each element
  2. size() - provides a count for the number of elements in an array for each row
  3. collect_set() - collects unique values for a field, including fields within arrays
  4. flatten() - combines multiple arrays into a single array
  5. array_distinct() - removes duplicate elements from an array
21
Q

How do we derive a schema from a JSON string? How do we parse a column containing a JSON string into a struct type using a specified schema? (sql)

A

SELECT schema_of_json(‘{sample_json}’) AS schema

CREATE OR REPLACE TEMP VIEW parsed_events AS SELECT json.* FROM (
SELECT from_json(value, ‘sample_struct’) AS json
FROM events_strings);

22
Q

How to use Pivot to convert from long format to wide format (sql)

A

SELECT *
FROM item_purchases
PIVOT (
sum(item.quantity) FOR item_id IN (
‘P_FOAM_K’,
‘M_STAN_Q’,
‘P_FOAM_S’,
‘M_PREM_Q’,
‘M_STAN_F’)
)

The above will create a new column for each of the values, with a 1 or NULL depending on whether the item has a quantity or not.

23
Q

How to use Pivot to convert from long format to wide format (python)

A

transactionsDF = (item_purchasesDF
.groupBy(“order_id”,
“email”,
“transaction_timestamp”,
“total_item_quantity”,
“purchase_revenue_in_usd”,
“unique_items”,
“items”,
“item”,
“name”,
“price”)
.pivot(“item_id”)
.sum(“item.quantity”)
)
display(transactionsDF)

24
Q

Define a SQL UDF

A

CREATE OR REPLACE FUNCTION sale_announcement(item_name STRING, item_price INT)
RETURNS STRING
RETURN concat(“The “, item_name, “ is on sale for $”, round(item_price * 0.8, 0));

SELECT *, sale_announcement(name, price) AS message FROM item_lookup

25
Q

Identify the location (and other details) of a function

A

DESCRIBE FUNCTION EXTENDED table1

26
Q

Security model for SQL UDFs

A

SQL UDFs:
- persist between execution environments
- exist as objects in the metastore and are governed by the same Table ACLs as databases, tables and views

Permissions needed
- to create a SQL UDF, you need USE CATALOG on the catalog, and USE SCHEMA and CREATE FUNCTION on the schema
- to use a SQL UDF, you need USE CATALOG on the catalog, USE SCHEMA on the schema, and EXECUTE on the function

27
Q

Using CASE/WHEN when defining a SQL UDF

A

CREATE OR REPLACE FUNCTION item_preference(name STRING, price INT)
RETURNS STRING
RETURN CASE
WHEN name = “Standard Queen Mattress” THEN “This is my default mattress”
WHEN name = “Premium Queen Mattress” THEN “This is my favorite mattress”
WHEN price > 100 THEN concat(“I’d wait until the “, name, “ is on sale for $”, round(price * 0.8, 0))
ELSE concat(“I don’t need a “, name)
END;

SELECT *, item_preference(name, price) FROM item_lookup

28
Q

What is needed to create Python UDFs?

A

Use of the @udf() or @pandas_udf() decorators