ELT with Apache Spark Flashcards
Extract data from a single file and a directory of files (sql)
SELECT * FROM file_format./path/to/file
SELECT * FROM json.${DA.paths.kafka_events}
Create a view and/or temporary view from a reference to a file. What are temporary views? (sql)
CREATE OR REPLACE (TEMP) VIEW event_view
AS SELECT * FROM json.${DA.paths.kafka_events}
Temporary views existing only for the current SparkSession, so on Databricks they are isolated to the current notebook, job or DBSQL query.
Create a Common Table Expression (CTE) from a reference to a file. What are CTEs? (sql)
WITH cte_json
AS (SELECT * FROM json.${DA.paths.kafka_events}
)
SELECT * FROM cte_json
CTEs only alias the results of a query while that query is being planned an executed. They cannot be used after the query has finished.
Register a Table on External Data with read options (sql)
CREATE TABLE table_identifier (col_name1 col_type1, …)
USING data_source
OPTIONS (key1 = val1, key2 = val2, …)
LOCATION = path
Limits of Tables with External Data Sources
- When querying external data sources, we cannot expect the performance guarantees associated with Delta Lake and Lakehouse
- E.g. Delta Lake tables guarantee you’re always querying the latest version of your data, but external sources may be referencing an older cached version.
Extract Data from a SQL Database (sql)
CREATE TABLE
USING JDBC
OPTIONS (
url = “jdbc:{databaseServerType}://{jdbcHostname}:{jdbcPort}”,
dbtable = “{jdbcDatabase}.table”,
user = “{jdbcUsername}”,
password = “{jdbcPassword}”
)
What are the two ways we can count null values in a specific field? (sql)
SELECT count_if(field_x IS NULL) FROM table1;
SELECT count(*) FROM table1 WHERE field_x IS NULL;
What is the difference between count(col) and count(*)? (sql)
- count(col) skips NULL values when counting specific columns or expressions
- count(*) is a special case that counts the total number of rows (including rows that are only NULL)
Return the count of null rows for a field (python)
from pyspark.sql.functions import col
usersDF = spark.read.table(“users_dirty”)
usersDF.selectExpr(“count_if(email IS NULL)”)
usersDF.where(col(“email”).isNull()).count()
Dedupe an entire table (sql & python)
%sql
SELECT DISTINCT(*) FROM table1
%python
usersDF.distinct().display()
Dedupe rows based on a specific column (sql)
CREATE OR REPLACE TEMP VIEW deduped_users AS
SELECT user_id, user_first_touch_timestamp, max(email) AS email, max(updated) AS updated
FROM users_dirty
WHERE user_id IS NOT NULL
GROUP BY user_id, user_first_touch_timestamp;
SELECT count(*) FROM deduped_users
In the above, we use the aggregate function max as a hack to keep the fields “email” and “updated” and capture non-null rows when multiple records are present
Dedupe rows based on a specific column (python)
from pyspark.sql.functions import max
dedupedDF = (usersDF
.where(col(“user_id”).isNotNull())
.groupBy(“user_id”, “user_first_touch_timestamp”)
.agg(max(“email”).alias(“email”),
max(“updated”).alias(“updated”))
)
dedupedDF.count()
Validate that the primary key is unique across all rows (sql)
SELECT max(row_count) <= 1 no_duplicate_ids FROM (
SELECT user_id, count(*) AS row_count
FROM deduped_users
GROUP BY user_id)
(response should be one column “no_duplicate_ids” with one value “true”)
Validate that the primary key is unique across all rows (python)
from pyspark.sql.functions import count
display(dedupedDF
.groupBy(“user_id”)
.agg(count(“*”).alias(“row_count”))
.select((max(“row_count”) <= 1).alias(“no_duplicate_ids”)))
Validate that each “email” field is associated with at most one “user_id” (sql)
SELECT max(user_id_count) <= 1 at_most_one_id FROM (
SELECT email, count(user_id) AS user_id_count
FROM deduped_users
WHERE email IS NOT NULL
GROUP BY email)
Validate that each “email” field is associated with at most one “user_id” (python)
display(dedupedDF
.where(col(“email”).isNotNull())
.groupby(“email”)
.agg(count(“user_id”).alias(“user_id_count”))
.select((max(“user_id_count”) <= 1).alias(“at_most_one_id”)))
Cast a column to a timestamp, and extract calendar data from the new field, as well as email domain from the “email” field (sql)
The functions “date_format()” and “regexp_extract()” can be used:
SELECT *,
date_format(first_touch, “MMM d, yyyy”) AS first_touch_date,
date_format(first_touch, “HH:mm:ss”) AS first_touch_time,
regexp_extract(email, “(?<=@).+”, 0) AS email_domain
FROM (
SELECT *,
CAST(user_first_touch_timestamp / 1e6 AS timestamp) AS first_touch
FROM deduped_users
)
Cast a column to a timestamp, and extract calendar data from the new field, as well as email domain from the “email” field (python)
from pyspark.sql.functions import date_format, regexp_extract
display(dedupedDF
.withColumn(“first_touch”, (col(“user_first_touch_timestamp”) / 1e6).cast(“timestamp”))
.withColumn(“first_touch_date”, date_format(“first_touch”, “MMM d, yyyy”))
.withColumn(“first_touch_time”, date_format(“first_touch”, “HH:mm:ss”))
.withColumn(“email_domain”, regexp_extract(“email”, “(?<=@).+”, 0))
)
How can we reference nested fields in tables?
Use : syntax in queries to access subfields in JSON strings
Use . syntax in queries to access subfields in struct types
Describe 5 common array functions
- explode() - separates the elements of an array into multiple rows, creating a new row for each element
- size() - provides a count for the number of elements in an array for each row
- collect_set() - collects unique values for a field, including fields within arrays
- flatten() - combines multiple arrays into a single array
- array_distinct() - removes duplicate elements from an array
How do we derive a schema from a JSON string? How do we parse a column containing a JSON string into a struct type using a specified schema? (sql)
SELECT schema_of_json(‘{sample_json}’) AS schema
CREATE OR REPLACE TEMP VIEW parsed_events AS SELECT json.* FROM (
SELECT from_json(value, ‘sample_struct’) AS json
FROM events_strings);
How to use Pivot to convert from long format to wide format (sql)
SELECT *
FROM item_purchases
PIVOT (
sum(item.quantity) FOR item_id IN (
‘P_FOAM_K’,
‘M_STAN_Q’,
‘P_FOAM_S’,
‘M_PREM_Q’,
‘M_STAN_F’)
)
The above will create a new column for each of the values, with a 1 or NULL depending on whether the item has a quantity or not.
How to use Pivot to convert from long format to wide format (python)
transactionsDF = (item_purchasesDF
.groupBy(“order_id”,
“email”,
“transaction_timestamp”,
“total_item_quantity”,
“purchase_revenue_in_usd”,
“unique_items”,
“items”,
“item”,
“name”,
“price”)
.pivot(“item_id”)
.sum(“item.quantity”)
)
display(transactionsDF)
Define a SQL UDF
CREATE OR REPLACE FUNCTION sale_announcement(item_name STRING, item_price INT)
RETURNS STRING
RETURN concat(“The “, item_name, “ is on sale for $”, round(item_price * 0.8, 0));
SELECT *, sale_announcement(name, price) AS message FROM item_lookup
Identify the location (and other details) of a function
DESCRIBE FUNCTION EXTENDED table1
Security model for SQL UDFs
SQL UDFs:
- persist between execution environments
- exist as objects in the metastore and are governed by the same Table ACLs as databases, tables and views
Permissions needed
- to create a SQL UDF, you need USE CATALOG on the catalog, and USE SCHEMA and CREATE FUNCTION on the schema
- to use a SQL UDF, you need USE CATALOG on the catalog, USE SCHEMA on the schema, and EXECUTE on the function
Using CASE/WHEN when defining a SQL UDF
CREATE OR REPLACE FUNCTION item_preference(name STRING, price INT)
RETURNS STRING
RETURN CASE
WHEN name = “Standard Queen Mattress” THEN “This is my default mattress”
WHEN name = “Premium Queen Mattress” THEN “This is my favorite mattress”
WHEN price > 100 THEN concat(“I’d wait until the “, name, “ is on sale for $”, round(price * 0.8, 0))
ELSE concat(“I don’t need a “, name)
END;
SELECT *, item_preference(name, price) FROM item_lookup
What is needed to create Python UDFs?
Use of the @udf() or @pandas_udf() decorators