ELT with Apache Spark Flashcards
Extract data from a single file and a directory of files (sql)
SELECT * FROM file_format./path/to/file
SELECT * FROM json.${DA.paths.kafka_events}
Create a view and/or temporary view from a reference to a file. What are temporary views? (sql)
CREATE OR REPLACE (TEMP) VIEW event_view
AS SELECT * FROM json.${DA.paths.kafka_events}
Temporary views existing only for the current SparkSession, so on Databricks they are isolated to the current notebook, job or DBSQL query.
Create a Common Table Expression (CTE) from a reference to a file. What are CTEs? (sql)
WITH cte_json
AS (SELECT * FROM json.${DA.paths.kafka_events}
)
SELECT * FROM cte_json
CTEs only alias the results of a query while that query is being planned an executed. They cannot be used after the query has finished.
Register a Table on External Data with read options (sql)
CREATE TABLE table_identifier (col_name1 col_type1, …)
USING data_source
OPTIONS (key1 = val1, key2 = val2, …)
LOCATION = path
Limits of Tables with External Data Sources
- When querying external data sources, we cannot expect the performance guarantees associated with Delta Lake and Lakehouse
- E.g. Delta Lake tables guarantee you’re always querying the latest version of your data, but external sources may be referencing an older cached version.
Extract Data from a SQL Database (sql)
CREATE TABLE
USING JDBC
OPTIONS (
url = “jdbc:{databaseServerType}://{jdbcHostname}:{jdbcPort}”,
dbtable = “{jdbcDatabase}.table”,
user = “{jdbcUsername}”,
password = “{jdbcPassword}”
)
What are the two ways we can count null values in a specific field? (sql)
SELECT count_if(field_x IS NULL) FROM table1;
SELECT count(*) FROM table1 WHERE field_x IS NULL;
What is the difference between count(col) and count(*)? (sql)
- count(col) skips NULL values when counting specific columns or expressions
- count(*) is a special case that counts the total number of rows (including rows that are only NULL)
Return the count of non-null rows for a field (python)
from pyspark.sql.functions import col
usersDF = spark.read.table(“users_dirty”)
usersDF.selectExpr(“count_if(email IS NULL)”)
usersDF.where(col(“email”).isNull()).count()
Dedupe an entire table (sql & python)
%sql
SELECT DISTINCT(*) FROM table1
%python
usersDF.distinct().display()
Dedupe rows based on a specific column (sql)
CREATE OR REPLACE TEMP VIEW deduped_users AS
SELECT user_id, user_first_touch_timestamp, max(email) AS email, max(updated) AS updated
FROM users_dirty
WHERE user_id IS NOT NULL
GROUP BY user_id, user_first_touch_timestamp;
SELECT count(*) FROM deduped_users
In the above, we use the aggregate function max as a hack to keep the fields “email” and “updated” and capture non-null rows when multiple records are present
Dedupe rows based on a specific column (python)
from pyspark.sql.functions import max
dedupedDF = (usersDF
.where(col(“user_id”).isNotNull())
.groupBy(“user_id”, “user_first_touch_timestamp”)
.agg(max(“email”).alias(“email”),
max(“updated”).alias(“updated”))
)
dedupedDF.count()
Validate that the primary key is unique across all rows (sql)
SELECT max(row_count) <= 1 no_duplicate_ids FROM (
SELECT user_id, count(*) AS row_count
FROM deduped_users
GROUP BY user_id)
(response should be one column “no_duplicate_ids” with one value “true”)
Validate that the primary key is unique across all rows (python)
from pyspark.sql.functions import count
display(dedupedDF
.groupBy(“user_id”)
.agg(count(“*”).alias(“row_count”))
.select((max(“row_count”) <= 1).alias(“no_duplicate_ids”)))
Validate that each “email” field is associated with at most one “user_id” (sql)
SELECT max(user_id_count) <= 1 at_most_one_id FROM (
SELECT email, count(user_id) AS user_id_count
FROM deduped_users
WHERE email IS NOT NULL
GROUP BY email)