Data Engineer Professional Flashcards
Which utility can a data engineer use to read passed parameters inside a notebook?
- dbutils.secrets
- dbutils.library
- dbutils.fs
- dbutils.notebook
- dbutils.widgets
- dbutils.widgets
Which of the following describes the minimal permissions a data engineer needs to view the metrics, driver logs and Spark UI of an existing cluster?
- Can Attach To
- Can Restart
- Can Manage
- Cluster creation allowed + Can Attach To privileges
- Cluster creation allowed + Can Restart privileges
- Can Attach To
For production Databricks jobs, which of the following cluster types is recommended to use?
- All-purpose clusters
- Production clusters
- Job clusters
- On-premises clusters
- Serverless clusters
Job clusters
(Job clusters are dedicated clusters for a job or task run. A job cluster auto terminates once the job is completed, which saves cost compared to all-purpose clusters.)
If a Delta Lake table is created with the following query:
CREATE TABLE target AS SELECT * FROM source
What will be the result of running DROP TABLE source
- An error will occur indicating that other tables are based on this source table
- Both the target and source tables will be dropped
- No table will be dropped until CASCADE keyword is added to the command
- Only the source table will be dropped, but the target table will be no more queryable
- Only the source table will be dropped, while the target table will not be affected
- Only the source table will the dropped, while the target table will not be affected
Which of the following describes the minimal permissions a data engineer needs to start and terminate an existing cluster ?
- Can Attach To
- Can Restart
- Can Manage
- Cluster creation allowed + Can Attach To privileges
- Cluster creation allowed + Can Restart privileges
- Can Restart
The data engineering team has a Delta Lake table created with following query:
CREATE TABLE customers_clone
LOCATION ‘dbfs:/mnt/backup’
AS SELECT * FROM customers
A data engineer wants to drop the table with the following query:
DROP TABLE customers_clone
Which statement describes the result of running this drop command?
- An error will occur as the table is deep cloned from the customers table
- An error will occur as the table is shallowly clones from the customers table
- Only the table’s metadata will be deleted from the catalog, while the data files will be kept in the storage
- Both the table’s metadata and the data files will be deleted
- The table will not be dropped until VACUUM command is run
- Only the table’s metadata will be deleted from the catalog, while the data files will be kept in the storage.
(External (unmanaged) tables are tables whose data is stored in an external storage path by using a LOCATION clause.
When you run DROP TABLE on an external table, only the table’s metadata is deleted, while the underlying data files are kept.)
Which of the following describes the minimal permissions a data engineer needs to edit the configurations of an existing cluster ?
- Can Restart
- Can Manage
- Cluster creation allowed + Can Restart
- Cluster creation allowed + Can Manage
- Only administrators can edit the configurations on existing clusters
- Can Manage
Given the following code block in a notebook
db_password = dbutils.secrets.get(scope=”dev”, key=”database_password”)
print (db_password)
Which statement describes what will happen when the above code is executed?
- An interactive input box will appear in the notebook
- The string “REDACTED” will be printed
- The error message “Secrets can not be printed” will be shown
- The string value of the password will be printed in plain text
- If the user has “Can Read” permission, the string value of the password will be printed in plain text, otherwise “REDACTED”
- The string “REDACTED” will be printed.
A junior data engineer is using the %sh magic command to run some legacy code. A senior data engineer has recommended refactoring the code instead.
Which of the following could explain why a data engineer may need to avoid using the %sh magic command ?
- %sh restarts the Python interpreter. This clears all the variables declared in the notebook
- %sh executes shell code only on the local driver machine which leads to significant performance overhead
- %sh can not access storage to persist the output
- All the above reasons
- None of these reasons
- %sh executes shell code only on the local driver machine which leads to significant performance overhead
Given a Delta table ‘products’ with the following schema:
name STRING, category STRING, expiration_date DATE, price FLOAT
When executing the below query:
SELECT * FROM products
WHERE price > 30.5
Which of the following will be leaverged by the query optimizer to identify the data files to load?
- Column statistics in the Hive metastore
- Column statistics in the metadata of Parquet files
- File statistics in the Delta transaction log
- File statistics in the Hive metastore
- None of the above
- File statistics in the Delta transaction log
In the Transaction log, Delta Lake captures statistics for each data file of the table. These statistics indicate per file:
Total number of records
Minimum value in each column of the first 32 columns of the table
Maximum value in each column of the first 32 columns of the table
Null value counts for in each column of the first 32 columns of the table
When a query with a selective filter is executed against the table, the query optimizer uses these statistics to generate the query result. it leverages them to identify data files that may contain records matching the conditional filter.
For the SELECT query in the question, The transaction log is scanned for min and max statistics for the price column.
The data engineering team has a table ‘orders_backup’ that was created using Delta Lake’s SHALLOW CLONE functionality from the table ‘orders’. Recently, the team started getting an error when querying the ‘orders_backup’ table indicating that some data files are no longer present.
Which of the following correctly explains this error ?
- The VACUUM command was run on the orders table
- The VACUUM command was run on the orders_backup table
- The OPTIMIZE command was run on the orders table
- The OPTIMIZE command was run on the orders_backup table
- The REFRESH command was run on the orders_backup table
- The VACUUM command was run on the orders table
With Shallow Clone, you create a copy of a table by just copying the Delta transaction logs.
That means that there is no data moving during Shallow Cloning.
Running the VACUUM command on the source table may purge data files referenced in the transaction log of the clone. In this case, you will get an error when querying the clone indicating that some data files are no longer present.
A data engineer has a Delta Lake table named ‘orders_archive’ created using the following command:
CREATE TABLE orders_archive
DEEP CLONE orders
They want to sync up the new changes in the orders table to the clone.
Which of the following commands can be run to achieve this task?
- REFRESH orders_archive
- SYNC orders_archive
- INSERT OVERWRITE orders_archive SELECT * FROM orders
- CREATE OR REPLACE TABLE orders_archive DEEP CLONE orders
- DROP TABLE orders_archive
CREATE TABLE orders_archive
DEEP CLONE orders
- CREATE OR REPLACE TABLE orders_archive DEEP CLONE orders
Cloning can occur incrementally. Executing the CREATE OR REPLACE TABLE command can sync changes from the source to the target location.
Now, If you run DESCRIBE HISTORY orders_archive, you will see a new version of CLONE operation occurred on the table.
The data engineering team has a Delta Lake table named ‘daily_activities’ that is completely overwritten each night with new data received from the source system.
For auditing purposes, the team wants to set up a post-processing task that uses Delta Lake Time Travel functionality to determine the difference between the new version and the previous version of the table. They start by getting the current table version via this code:
current_version = spark.sql(“SELECT max(version) FROM (DESCRIBE HISTORY daily_activities)”).collect()[0][0]
Which of the following queries can be used by the team to complete this task ?
- SELECT * FROM daily_activities
UNION
SELECT * FROM daily_activities AS VERSION = {current_version-1} - SELECT * FROM daily_activities
UNION ALL
SELECT * FROM daily_activities@v{current_version-1} - SELECT * FROM daily_activities
INTERSECT
SELECT * FROM daily_activities AS VERSION = {current_version-1} - SELECT * FROM daily_activities
EXCEPT
SELECT * FROM daily_activities@v{current_version-1} - SELECT * FROM daily_activities
MINUS
SELECT * FROM daily_activities AS VERSION = {current_version-1}
- SELECT * FROM daily_activities
EXCEPT
SELECT * FROM daily_activities@v{current_version-1}
Each operation that modifies a Delta Lake table creates a new table version. You can use history information to audit operations or query a table at a specific point in time using:
Version number
SELECT * FROM my_table@v36
SELECT * FROM my_table VERSION AS OF 36
Timestamp
SELECT * FROM my_table TIMESTAMP AS OF “2019-01-01”
Using the EXCEPT set operator, you can get the difference between the new version and the previous version of the table
The data engineering team wants to build a pipeline that receives customers data as change data capture (CDC) feed from a source system. The CDC events logged at the source contain the data of the records along with metadata information. This metadata indicates whether the specified record was inserted, updated, or deleted. In addition to a timestamp column identified by the field update_time indicating the order in which the changes happened. Each record has a primary key identified by the field customer_id.
In the same batch, multiple changes for the same customer could be received with different update_time. The team wants to store only the most recent information for each customer in the target Delta Lake table.
Which of the following solutions meets these requirements?
- Enable Delta Lake’s Change Data Feed (CDF) on the target table to automatically merge the received CDC feed
- Use MERGE INTO to upsert the most recent entry for each customer_id into the table
- Use MERGE INTO with SEQUENCY BY clause on the update_time for ordering how operations should be applied
- Use DropDuplicates function to remove duplicates by customer_id, then merge the duplicate records into the table.
- Use the option mergeSchema when writing the CDC data into the table to automatically merge the changed data with its most recent schema
- Use MERGE INTO to upsert the most recent entry for each customer_id into the table
A data engineer is using a foreachBatch logic to upsert data in a target Delta table.
The function to be called at each new microbatch processing is displayed below with a blank:
def upsert_data(microBatchDF, batch_id):
microBatchDF.createOrReplaceTempView(“sales_microbatch”)
sql_query = """ MERGE INTO sales_silver a USING sales_microbatch b ON a.item_id=b.item_id AND a.item_timestamp=b.item_timestamp WHEN NOT MATCHED THEN INSERT * """ \_\_\_\_\_\_\_\_\_\_\_\_\_\_\_\_
Which option correctly fills in the blank to execute the sql query in the function on a cluster with Databricks Runtime below 10.5 ?
- spark.sql(sql_query)
- batch_id.sql(sql_query)
- microBatchDF.sql(sql_query)
- microBatchDF.sparkSession.sql(sql_query)
- microBatchDF._jdf.sparkSession.sql(sql_query)
- microBatchDF._jdf.sparkSession.sql(sql_query)
Usually, we use spark.sq() function to run SQL queries. However, in this particular case, the spark session can not be accessed from within the microbatch process. Instead, we can access the local spark session from the microbatch dataframe.
For clusters with Databricks Runtime version below 10.5, the syntax to access the local spark session is:
microBatchDF._jdf.sparkSession().sql(sql_query)
The data engineering team has a singleplex bronze table called ‘orders_raw’ where new orders data is appended every night. They created a new Silver table called ‘orders_cleaned’ in order to provide a more refined view of the orders data
The team wants to create a batch processing pipeline to process all new records inserted in the orders_raw table and propagate them to the orders_cleaned table.
Which solution minimizes the compute costs to propagate this batch of data?
- Use time travel capabilities in Delta Lake to compare the latest version of orders_raw with one version prior, then write the difference to the orders_cleansed table
- Use Spark Structured Streaming to process the new records from orders_raw in batch mode use the trigger availableNow option
- Use Spark Structured Streaming foreachBatch logic to process the new records from orders_raw using trigger(processingTime=”24 hours”)
- Use batch overwrite logic to reprocess all records in orders_raw and overwrite the orders_cleaned table
- Use insert-only merge into the orders_cleansed table using orders_raw data based on a composite key
- Use Spark Structured Streaming to process the new records from orders_raw in batch mode use the trigger availableNow option
Databricks supports trigger(availableNow=True) for Delta Lake and Auto Loader sources. This functionality consumes all available records in an incremental batch.
There is also the trigger(once=True) option for incremental batch processing. However, this setting is now deprecated in the newer Databricks Runtime versions.
NOTE: You may still see this option in the current certification exam version. However, Databricks recommends you use trigger(availableNow=True) for all future incremental batch processing workloads.
The data engineering team has a Silver table called ‘sales_cleaned’ where new sales data is appended in near real-time.
They want to create a new Gold-layer entity against the ‘sales_cleaned’ table to calculate the year-to-date (YTD) of the sales amount. The new entity will have the following schema:
country_code STRING, category STRING, ytd_total_sales FLOAT, updated TIMESTAMP
It’s enough for these metrics to be recalculated once daily. But since they will be queried very frequently by several business teams, the data engineering team wants to cut down the potential costs and latency associated with materializing the results.
Which of the following solutions meets these requirements?
- Define the new entity as a view to avoid persisting the results each time the metrics are recalculated
- Define the new entity as a global temporary view since it can be shared between the notebooks or jobs that share computing resources
- Configuring a nightly batch job to recalculate the metrics and store them as a table overwritten with each update
- Create multiple tables, one per business team so the metrics can be queried quickly and efficiently
- All the above solutions meet the requirements since Databricks uses the Delta Caching feature
- Configuring a nightly batch job to recalculate the metrics and store them as a table overwritten with each update
Data engineers must understand how materializing results is different between views and tables on Databricks, and how to reduce total compute and storage cost associated with each materialization depending on the scenario.
Consider using a view when:
Your query is not complex. Because views are computed on demand, the view is re-computed every time the view is queried. So, frequently querying complex queries with joins and subqueries increases compute costs
You want to reduce storage costs. Views do not require additional storage resources.
Consider using a gold table when:
Multiple downstream queries consume the table, so you want to avoid re-computing complex ad-hoc queries every time.
Query results should be computed incrementally from a data source that is continuously or incrementally growing.
A data engineer wants to calculate predictions using a MLFlow model logged in a given “model_url”. They want to register the model as a Spark UDF in order to apply it to a test dataset.
Which code block allows the data engineer to register the MLFlow model as a Spark UDF ?
- predict_udf = mlflow.pyfunc.spark_udf(spark, “model_url”)
- predict_udf = mlflow.spark_udf(spark, “model_url”)
- predict_udf = mlflow.udf(spark, “model_url”)
- predict_udf = pyfunc.spark_udf(spark, “model_url”)
- predict_udf = mlflow.pyfunc(spark, “model_url”)
- predict_udf = mlflow.pyfunc.spark_udf(spark, “model_url”)
Mlflow.pyfunc.spark_udf function allows to register a MLFlow model as a Apache Spark UDF. It needs at least 2 parameters:
spark: A SparkSession object
model_uri: the location, in URI format, of the MLflow model
Once the Spark UDF created, it can be applied to a dataset to calculate the predictions:
predict_udf = mlflow.pyfunc.spark_udf(spark, “model_url”)
pred_df = data_df.withColumn(“prediction”, predict_udf(*column_list))
“A Delta Lake’s functionality that automatically compacts small files during individual writes to a table by performing two complementary operations on the table”
Which of the following is being described in the above statement?
- Optimized writes
- Auto compaction
- Auto Optimize
- OPTIMIZE command
- REORG TABLE command
- Auto Optimize
Auto Optimize is a functionality that allows Delta Lake to automatically compact small data files of Delta tables. This can be achieved during individual writes to the Delta table.
Auto optimize consists of 2 complementary operations:
- Optimized writes: with this feature enabled, Databricks attempts to write out 128 MB files for each table partition.
- Auto compaction: this will check after an individual write, if files can further be compacted. If yes, it runs an OPTIMIZE job with 128 MB file sizes (instead of the 1 GB file size used in the standard OPTIMIZE)
The data engineering team has a large external Delta table where new changes are merged very frequently. They enabled Optimized writes and Auto Compaction on the table in order to automatically compact small data files to target files of size 128 MB. However, when they look at the table directory, they see that most data files are smaller than 128 MB.
Which of the following likely explains these smaller file sizes?
- Optimized Writes and Auto Compaction have no effect on large Delta tables. The table needs to be partitioned so Auto Compaction can be applied at partition level.
- Optimized Writes and Auto Compaction have no effect on external tables. The table needs to be managed in order to store the information of file sizes in the Hive metastore.
- Optimized Writes and Auto Compaction automatically generate smaller data files to reduce the duration of future MERGE operations.
- Auto compaction supports Auto Z-Ordering which is more expensive than just compaction
- The team needs to look at the table’s auto_optimize directory, where all new compacted files are written
- Optimized Writes and Auto Compaction automatically generate smaller data files to reduce the duration of future MERGE operations.
Having many small files can help minimize rewrites during some operations like merges and deletes. For such operations, Databricks can automatically tune the file size of Delta tables. As a result, it can generate data files smaller than the default 128MB. This helps in reducing the duration of future MERGE operations.
Which statement regarding streaming state in Stream-Stream Joins is correct?
- Stream-Stream Joins are not stateful. Spark does not buffer past inputs as a streaming state for the input streams.
- Spark buffers past inputs as a streaming state only for the left input stream, so that it can match future right inputs with past left inputs.
- Spark buffers past inputs as a streaming state only for the right input stream, so that it can match future left inputs with past right inputs.
- Spark buffers past inputs as a streaming state for both input streams, so that it can match every future input with past inputs.
- Stream-Stream Joins does not support limiting the state information using watermarks.
When performing stream-stream join, Spark buffers past inputs as a streaming state for both input streams, so that it can match every future input with past inputs. This state can be limited by using watermarks.
Which statement regarding static Delta tables in Stream-Static joins is correct?
- Static Delta tables must be small enough to be broadcasted to all worker nodes in the cluster
- Static Delta tables need to be partitioned in order to be used in stream-static join
- Static Delta tables need to be refreshed with REFRESH TABLE command for each microbatch of a stream-static join
- The latest version of the static Delta table is returned each time it is queried by a microbatch of the stream-static join
- The latest version of the static Delta table is returned only for the first microbatch of the stream-static join. Then, it will be cached to be used by any upcoming microbatch.
- The latest version of the static Delta table is returned each time it is queried by a microbatch of the stream-static join
Stream-static joins take advantage of Delta Lake guarantee that the latest version of the static delta table is returned each time it is queried in a join operation with a data stream.
A data engineer has the following streaming query with a blank:
spark.readStream
.table(“orders_cleaned”)
____________________________
.groupBy(
“order_timestamp”,
“author”)
.agg(
count(“order_id”).alias(“orders_count”),
avg(“quantity”).alias(“avg_quantity”))
.writeStream
.option(“checkpointLocation”, “dbfs:/path/checkpoint”)
.table(“orders_stats”)
For handling late-arriving data, they want to maintain the streaming state information for 30 minutes.
Which option correctly fills in the blank to meet this requirement ?
- trigger(processingTime=”30 minutes”)
- .awaitTermination(“order_timestamp”, “30 minutes”)
- .awaitWatermark(“order_timestamp”,”30 minutes”)
- .withWatermark(“order_timestamp”, “30 minutes”)
- .window(“order_timestamp”,”30 minutes”)
- .withWatermark(“order_timestamp”, “30 minutes”)
Given the following streaming query:
spark.readStream
.table(“orders_cleaned”)
.withWatermark(“order_timestamp”, “10 minutes”)
.groupBy(
window(“order_timestamp”, “5 minutes”).alias(“time”),
“author”)
.agg(
count(“order_id”).alias(“orders_count”),
avg(“quantity”).alias(“avg_quantity”))
.writeStream
.option(“checkpointLocation”, “dbfs:/path/checkpoint”)
.table(“orders_stats”)
Which of the following statements best describe this query ?
- It calculates business-level aggregates for each non-overlapping ten-minute interval. Incremental state information is maintained for 5 minutes for late-arriving data.
- It calculates business-level aggregates for each non-overlapping five-minute interval. Incremental state information is maintained for 10 minutes for late-arriving data.
- It calculates business-level aggregates for each overlapping five-minute interval. Incremental state information is maintained for 10 minutes for late-arriving data.
- It calculates business-level aggregates for each overlapping ten-minute interval. Incremental state information is maintained for 5 minutes for late-arriving data.
- None of the above
- It calculates business-level aggregates for each non-overlapping five-minute interval. Incremental state information is maintained for 10 minutes for late-arriving data.
Pyspark.sql.functions.window function bucketizes rows into one or more time windows given a timestamp specifying column. In this query, we are performing aggregations per order_timestamp for each non-overlapping five minute interval.
pyspark.sql.DataFrame.withWatermark function allows you to only track state information for a window of time in which we expect records could be delayed. Here we define a watermark of 10 minutes.
Which statement regarding checkpointing in Spark Structured Streaming is Not correct?
- Checkpoints store the current state of a streaming job to cloud storage
- Checkpointing allows the streaming engine to track the progress of stream processing
- Checkpoints can be shared between separate streams
- To specify the checkpoint in a streaming query, we use the checkpointLocation option
- Checkpointing with write-ahead logs mechanism ensure fault-tolerant stream processing
- Checkpoints can be shared between separate streams
Checkpoints cannot be shared between separate streams. Each stream needs to have its own checkpoint directory to ensure processing guarantees.
Which of the following statements best describes Delta Lake Auto Compaction?
- Auto Compaction occurs after a write to a table has succeeded to check if files can further be compacted; if yes, it runs an OPTIMIZE job with Z-Ordering towards a file size of 128 MB.
- Auto Compaction occurs after a write to a table has succeeded to check if files can further be compacted; if yes, it runs an OPTIMIZE job without Z-Ordering towards a file size of 128 MB.
- Auto Compaction occurs after a write to a table has succeeded to check if files can further be compacted; if yes, it runs an OPTIMIZE job with Z-Ordering towards a file size of 1 GB.
- Auto Compaction occurs after a write to a table has succeeded to check if files can further be compacted; if yes, it runs an OPTIMIZE job without Z-Ordering toward a file size of 1 GB.
- None of the above
- Auto Compaction occurs after a write to a table has succeeded to check if files can further be compacted; if yes, it runs an OPTIMIZE job without Z-Ordering towards a file size of 128 MB.
Auto Compaction is part of the Auto Optimize feature in Databricks. it checks after an individual write, if files can further be compacted, if yes, it runs an OPTIMIZE job with 128 MB file sizes instead of the 1 GB file size used in the standard OPTIMIZE.
Auto compaction does not support Z-Ordering as Z-Ordering is significantly more expensive than just compaction.
Which of the following statements best describes Auto Loader ?
- Auto loader allows applying Change Data Capture (CDC) feed to update tables based on changes captured in source data
- Auto loader monitors a source location, in which files accumulate, to identify and ingest only newly arriving files with each command run. Files that have already been ingested in previous runs are skipped.
- Auto loader allows cloning a source Delta table to a target destination at a specific version.
- Auto loader defines data quality expectations on the contents of a dataset, and reports the records that violate these expectations in metrics
- Auto loader enables efficient insert, update, delete, and rollback capabilities by adding a storage layer that provides better data reliability to data lakes.
- Auto loader monitors a source location, in which files accumulate, to identify and ingest only newly arriving files with each command run. Files that have already been ingested in previous runs are skipped.
Which of the following functions can a data engineer use to return a new DataFrame containing the distinct rows from a given DataFrame based on multiple columns?
- pyspark.sql.DataFrame.drop
- pyspark.sql.DataFrame.distinct
- pyspark.sql.DataFrame.dropDuplicates
- pyspark.sql.DataFrame.na.drop
- pyspark.sql.DataFrame.dropna
- pyspark.sql.DataFrame.dropDuplicates
Which of the following approaches allows to correctly perform streaming deduplication ?
- De-duplicate records within each batch, and then append the result into the target table
- De-duplicate records within each batch, and then merge the result into the target table using insert-only merge
- De-duplicate records within each batch, rank the result, and then insert only records having rank = 1 into the target table
- De-duplicate records in all batches with watermarking, and then overwrite the target table by the result
- None of the above
- De-duplicate records within each batch, and then merge the result into the target table using insert-only merge
A junior data engineer is testing the following code block to get the newest entry for each item added in the ‘sales’ table since the last table update.
from pyspark.sql import functions as F
from pyspark.sql.window import Window
window = Window.partitionBy(“item_id”).orderBy(F.col(“item_time”).desc())
ranked_df = (spark.readStream
.table(“sales”)
.withColumn(“rank”, F.rank().over(window))
.filter(“rank == 1”)
.drop(“rank”)
)
display(ranked_df)
However, the command fails when executed. Why?
- The query output can not be displayed. They should use spark.writeStream to persist the query result.
- Watermarking is missing. It should be added to allow tracking state information for the window of time.
- Non-time-based window operations are not supported on streaming DataFrames. They need to be implemented inside a foreachBatch logic instead.
- The item_id field is not unique. Records must be de-duplicated on the item_id using dropDuplicates function.
- The item_id field is not unique. The drop(“rank”) must be called before applying the rank function in order to drop any duplicate record.
- Non-time-based window operations are not supported on streaming DataFrames. They need to be implemented inside a foreachBatch logic instead.
If you try to call such a window operation on a streaming DataFrames, this will generate an error indicating that “Non-time-based window operations are not supported on streaming DataFrames”.
Instead, these window operations need to be implemented inside a foreachBatch logic.
Given the following query on the Delta table ‘customers’ on which Change Data Feed is enabled:
spark.readStream
.option(“readChangeFeed”, “true”)
.option(“startingVersion”, 0)
.table (“customers”)
.filter (col(“_change_type”).isin([“update_postimage”]))
.writeStream
.option (“checkpointLocation”, “dbfs:/checkpoints”)
.trigger (availableNow=True)
.table(“customers_updates”)
Which statement describes the results of this query each time it is executed ?
- Newly updated records will be merged into the target table, modifying previous entries with the same primary keys.
- Newly updated records will be appended to the target table
- Newly updated records will overwrite the target table
- The entire history of updates records will be appended to the target table at each execution, which leads to duplicate entries
- The entire history of update records will overwrite the target table at each execution
- Newly updated records will be appended to the target table
Databricks supports reading table’s changes captured by CDF in streaming queries using spark.readStream. This allows you to get only the new changes captured since the last time the streaming query was run.
The query in the question then appends the data to the target table at each execution since it’s using the default writing mode, which is ‘append’.
The data engineering team maintains a Type 1 table that is overwritten each night with new data received from the source system.
A junior data engineer has suggested enabling the Change Data Feed (CDF) feature on the table in order to identify those rows that were updated, inserted, or deleted.
Which response to the junior data engineer’s suggestion is correct?
- CDF can not be enabled on existing tables, it can only be enabled on newly created tables.
- Table data changes captured by CDF can only be read in streaming mode
- CDF is useful when only a small fraction of records are updated in each batch.
- CDF is useful when the table is SCD2
- All of the above
- CDF is useful when only a small fraction of records are updated in each batch.
Generally speaking, we use CDF for sending incremental data changes to downstream tables in a multi-hop architecture. So, use CDF when only small fraction of records updated in each batch. Such updates are usually received from external sources in CDC format. If most of the records in the table are updated, or if the table is overwritten in each batch, like in the question, don’t use CDF.
Given the following query on the Delta table customers on which Change Data Feed is enabled:
spark.read
.option(“readChangeFeed”, “true”)
.option(“startingVersion”, 0)
.table (“customers”)
.filter(col(“_change_type”).isin([“update_postimage”]))
.write
.mode(“overwrite”)
.table(“customers_updates”)
Which statement describes the results of this query each time it is executed?
- Newly updated records will be merged into the target table, modifying previous entries with the same primary keys
- Newly updated records will be appended to the target table
- Newly updated records will overwrite the target table
- The entire history of updates records will be appended to the target table at each execution, which leads to duplicate entries
- The entire history of updated records will overwrite the target table at each execution
- The entire history of updated records will overwrite the target table at each execution
Reading table’s changes, captured by CDF, using spark.read means that you are reading them as a static source. So, each time you run the query, all table’s changes (starting from the specified startingVersion) will be read.
The query in the question then writes the data in mode “overwrite” to the target table, which completely overwrites the table at each execution.
A data engineer wants to ingest input json data into a target Delta table. They want the data ingestion to happen incrementally in near real-time.
Which option correctly meets the specified requirement ?
- spark.ReadStream
.format(“autoloader”)
.option(“autoloader.format”,”json”)
.load(source_path)
.writeStream
.option(“checkpointLocation”, checkpointPath)
.start(“target_table”)
2.spark.readStream
.format(“autoloader”)
.option(“autoloader.format”,”json”)
.load(source_path)
.writeStream
.option(“checkpointLocation”, checkpointPath)
.trigger(real-time=True)
.start(“target_table”)
3.spark.readStream
.format(“cloudFiles”)
.option(“cloudFiles.format”,”json”)
.load(source_path)
.writeStream
.option(“checkpointLocation”, checkpointPath)
.start(“target_table”)
- spark.readStream
.format(“cloudFiles”)
.option”cloudFiles.format”,”json”)
.load(source_path)
.writeStream
.trigger(real-time=True)
.start(“target_table”) - spark.readStream
.format(“cloudFiles”)
.option(“cloudFiles.format”,”json”)
.load(source_path)
.writeStream
.trigger(availableNow=True)
.start(“target_table”)
3.spark.readStream
.format(“cloudFiles”)
.option(“cloudFiles.format”,”json”)
.load(source_path)
.writeStream
.option(“checkpointLocation”, checkpointPath)
.start(“target_table”)
In order to ingest input json data into a target Delta table, we use Autoloader. Auto Loader is based on Spark Structured Streaming and provides a Structured Streaming source called ‘cloudFiles’.
If you want the data ingestion to happen incrementally in near real-time, you can use the default trigger method which is trigger(processingTime=”500ms”). This allows the processing of data in micro-batches at a fixed interval of half a second.
Given the following Structured Streaming query:
(spark.table(“orders”)
.withColumn(“total_after_tax”, col(“total”)+col(“tax”))
.writeStream
.option(“checkpointLocation”, checkpointPath)
.outputMode(“append”)
._____________
.table(“new_orders”)
)
Fill in the blank to make the query executes a micro-batch to process data every 2 minutes
- trigger(once=”2 minutes”)
- trigger(processingTime=”2 minutes”)
3.processingTime(“2 minutes”) - trigger(“2 minutes”)
- trigger()
- trigger(processingTime=”2 minutes”)
In Spark Structured Streaming, in order to process data in micro-batches at a user-specified intervals, you can use the processingTime trigger method. This allows you to specify a time duration as a string. By default, it’s “500ms”.
Which statement regarding Delta Lake File Statistics is Correct?
- By default, Delta Lake captures statistics in the Hive metastore on the first 16 columns of each table
- By default, Delta Lake captures statistics in the Hive metastore on the first 32 columns of each table
- By default, Delta Lake captures statistics in the transaction log on the first 16 columns of each table
- By default, Delta Lake captures statistics in the transaction log on the first 32 columns of each table
- By default, Delta Lake captures statistics in both Hive metastore and transaction log for each added data file
- By default, Delta Lake captures statistics in the transaction log on the first 32 columns of each table
Delta Lake automatically captures statistics in the transaction log for each added data file of the table. By default, Delta Lake collects the statistics on the first 32 columns of each table. These statistics indicate per file:
Total number of records
Minimum value in each column of the first 32 columns of the table
Maximum value in each column of the first 32 columns of the table
Null value counts for in each column of the first 32 columns of the table
These statistics are leveraged for data skipping based on query filters.
A data engineer uses the following SQL query:
GRANT USAGE ON DATABASE sales_db TO finance_team
Which of the following is the benefit of the USAGE privilege ?
- Gives read access on the database
- Gives full permissions on the entire database
- Gives the ability to view database objects and their metadata
- No effect but it’s required to perform any action on the database
- Usage privilege is not back of the Databricks model
- No effect but it’s required to perform any action on the database
The data engineering team is using the LOCATION keyword for every new Delta Lake table created in the Lakehouse.
Which of the following describes the purpose of using the LOCATION keyword in this case ?
- The LOCATION keyword is used to configure the created Delta Lake tables as managed tables
- The LOCATION keyword is used to configure the created Delta Lake tables as external tables
- The LOCATION keyword is used to define the created Delta Lake tables in an external database
- The LOCATION keyword is used to define the created Delta Lake tables in a database over a JDBC connection
- The LOCATION keyword is used to set a default schema and checkpoint location for the created Delta Lake tables
- The LOCATION keyword is used to configure the created Delta Lake tables as external tables
External (unmanaged) tables are tables whose data is stored in an external storage path by using a LOCATION clause.
A data engineer wants to create a Delta Lake table for storing user activities of a website. The table has the following schema:
user_id LONG, page STRING, activity_type LONG, ip_address STRING, activity_time TIMESTAMP, activity_date DATE
Based on the above schema, which column is a good candidate for partitioning the Delta Table?
- user_id
- activity_type
- page
- activity_time
- activity_date
- activity_date
When choosing partitioning columns, it’s good to consider the fact that records with a given value (the activities of a given user) will continue to arrive indefinitely. In such a case, we use a datetime column for partitioning. This allows your partitions to be optimized, and allows you to easily archive those partitions of previous time periods, if necessary.
The data engineering team has a large Delta table named ‘users’. A recent query on the table returned some entries with negative values in the ‘age’ column.
To avoid this issue and enforce data quality, a junior data engineer decided to add a CHECK constraint to the table with the following command:
ALTER TABLE users ADD CONSTRAINT valid_age CHECK (age> 0);
However, the command fails when executed.
Which statement explains the cause of this failure?
- The syntax for adding the CHECK constraint is incorrect. Instead, the command should be:
ALTER TABLE users ADD CONSTRAINT ON COLUMN age (CHECK > 0) - The users table already exists; CHECK constraints can only be added during table creation using CREATE TABLE command
- The users table already contains rows that violate the new constraint; all existing rows must satisfy the constraint before adding it to the table
- The users table is not partitioned on the age column. CHECK constraints can only be added on partitioning columns.
- The users table already contains rows; CHECK constraints can only be added on empty tables.
- The users table already contains rows that violate the new constraint; all existing rows must satisfy the constraint before adding it to the table
ADD CONSTRAINT command verifies that all existing rows in the table satisfy the constraint before adding it to the table. Otherwise, the command failed with an error that says some rows in the table violate the new CHECK constraint.
A data engineer has added a CHECK constraint to the sales table using the following command:
ALTER TABLE sales ADD CONSTRAINT valid_date CHECK (item_date >= ‘2024-01-01’);
In addition, they have added a comment on the item_date column using the following command:
ALTER TABLE sales ALTER COLUMN item_date COMMENT “Date must be newer than Jan 1, 2024”;
Which of the following commands allows the data engineer to verify that both the constraint and the column comment have been successfully added on the table ?
- SHOW TBLPROPERTIES sales
- DESCRIBE TABLE sales
- DESCRIBE DETAIL sales
- DESCRIBE EXTENDED sales
- SHOW TABLES sales
- DESCRIBE EXTENDED sales
DESCRIBE TABLE EXTENDED or simply DESCRIBE EXTENDED allows to show the added tables constraints in the ‘Table Properties’ field. It shows both the name and the actual condition of the check constraints.
In addition, DESCRIBE EXTENDED allows to show the comments on each column, and the comment on the table itself.
Which of the following is the benefit of Delta Lake File Statistics ?
- They are leveraged for process time forecasting when executing selective queries
- They are leveraged for data skipping when executing selective queries
- They are leveraged for data compression in order to improve Delta Caching
- They are used as checksums to check data corruption in parquet files
- None of the above
- They are leveraged for data skipping when executing selective queries
These statistics are leveraged for data skipping based on query filters. For example, if you are querying the total number of records in a table, Delta will not calculate the count by scanning all data files. Instead, it will leverage these statistics to generate the query result
In which SCD type does new data overwrite existing data?
Type 1 SCD
The data engineering team created a new Databricks job for processing sensitive financial data. A financial analyst asked the team to transfer the “Owner” privilege of this job to the “finance” group.
A junior data engineer that has the “CAN MANAGE” permission on the job is attempting to make this privilege transfer via Databricks Job UI, but it keeps failing.
- The “Owner” privilege is assigned at job creation to the creator and cannot be changed. The job must be re-created using the “finance” group’s credentials.
- Databricks Jobs UI doesn’t support changing the owners of jobs. Databricks REST API needs to be used instead.
- Having the CAN MANAGE permisson is not enough to grant “Owner” privileges to a group. The data engineer must be the current owner of the job.
- Having the CAN MANGE permission is not enough to grant “Owner” privileges to a group. The data engineer must be a workspace administrator.
- Groups can not be owners of Databricks jobs. The owner must be an individual user.
- Groups can not be owners of Databricks jobs. The owner must be an individual user.
The data engineering team noticed that a partitioned Delta Lake table is suffering greatly. They are experiencing slowdowns for most general queries on this table.
The team tried to run an OPTIMIZE command on the table, but this did not help to resolve the issue.
Which of the following likely explains the cause of these slowdowns?
- The table has too many old data files that need to be purged. They need to run a VACUUM command instead.
- The table is over-partitioned or incorrectly partitioned. This requires a full rewrite of all data files to resolve the issue.
- They are applying the OPTIMIZE command on the whole table - it must be applied at each partition separately.
- They are applying the OPTIMIZE command without ZORDER. Z-ordering is needed on the partitioning columns.
- The transaction log is too large - log files older than a certain age must be deleted or archived at partition boundaries.
- The table is over-partitioned or incorrectly partitioned. This requires a full rewrite of all data files to resolve the issue.
Data that is over-partitioned or incorrectly partitioned will suffer greatly. Files cannot be combined or compacted across partition boundaries, so partitioned small tables increase storage costs and total number of files to scan. This leads to slowdowns for most general queries. Such an issue requires a full rewrite of all data files to remedy.
The data engineering team has the following query for processing customers’ requests to be forgotten:
DELETE FROM customers
WHERE customer_id IN
(SELECT customer_id FROM delete_requests)
Which statement describes the results of executing this query ?
- The identified records will be deleted from the customers tables, and their associated data files will be permanently purged from the table directory.
- The identified records will be deleted from both the customers and delete_requests tables, and their associated data files will be permanently purged from the tables directories.
- The identified records will be deleted from the customers table, but they will still be accessible in the table history until a VACUUM command is run.
- The identified records will be deleted from both customers and delete_requests tables, but they will still be accessible in the table history until VACUUM commands are run.
- The identified records will be deleted from the customers tables, but they will still be accessible in the table history until updating the status of the requests in the delete_requests table.
- The identified records will be deleted from the customers table, but they will still be accessible in the table history until a VACUUM command is run.
Delete requests, also known as requests to be forgotten, require deleting user data that represent Personally Identifiable Information or PII, such as the name and the email of the user.
Because of how Delta Lake tables time travel are implemented, deleted values are still present in older versions of the data. Remember, deleting data does not delete the data files from the table directory. Instead, it creates a copy of the affected files without these deleted records. So, to fully commit these deletes, you need to run VACUUM commands on the customers table.
Given the following commands:
CREATE DATABASE db_hr;
LOCATION ‘/mnt/hr_external’;
USE db_hr;
CREATE TABLE employees;
In which of the following locations will the employees table be located?
- dbfs:/user/hive/warehouse
- dbfs:/user/hive/warehouse/db_hr.db
- /mnt/hr_external
- /mnt/hr_external/db_hr.db
- More info needed
- /mnt/hr_external/db_hr.db
Since we are creating the database here with the LOCATION keyword, it will be created as an external database under the specified location ‘/mnt/hr_external’. The database folder has the extension (.db)
And since we are creating the table without specifying a location, the table becomes a managed table created under the database directory (in db_hr.db folder)
The data engineering team has a secret scope named ‘DataOps-Prod’ that contains all secrets needed by DataOps engineers in a production workspace.
Which of the following is the minimum permission required for the DataOps engineers to use the secrets in this scope ?
- MANAGE permission on the “DataOps-Prod” scope
- READ permission on the “DataOps-Prod” scope
- MANAGE permission on each secret in the “DataOps-Prod” scope
- READ permission on each secret in the “DataOps-Prod” scope
- Workspace Administrator role
- READ permission on the “DataOps-Prod” scope
The secret access permissions are as follows:
MANAGE - Allowed to change ACLs, and read and write to this secret scope.
WRITE - Allowed to read and write to this secret scope.
READ - Allowed to read this secret scope and list what secrets are available.
Each permission level is a subset of the previous level’s permissions (that is, a principal with WRITE permission for a given scope can perform all actions that require READ permission).
Which of the following is NOT part of the Ganglia UI?
- Memory usage
- Overall workload of the cluster
- CPU usage
- Lifecycle events of the cluster
- Network performance
Lifecycle events of the cluster are not part of Ganglia UI.
Ganglia allows monitoring the performance of Databricks clusters. Ganglia UI provides you with the overall workload of the cluster, in addition to detailed metrics on memory, CPI, and Network usage.
Lifecycle events of the cluster are part of the Cluster Event log.
In the Spark UI, which of the following is NOT one of the metrics in a stage’s details page?
- Duration
- Spill (Memory)
- Spill (Disk)
- DBU Cost
- GC time
- DBU Cost
In Spark UI, the stage’s details page shows summary metrics for completed tasks. This includes:
Duration of tasks.
GC time: is the total JVM garbage collection time.
Shuffle spill (memory): is the size of the deserialized form of the shuffled data in memory.
Shuffle spill (disk): is the size of the serialized form of the data on disk.
and others …
DBU Cost is not part of Spark UI. DBU stands for Databricks Unit and it is a unit of processing capability per hour for pricing purposes. This depends on your cluster configuration which tells you how much DBUs would be consumed if a virtual machine runs for an hour, and then pays for each DBU consumed.
A data engineer is using Databricks REST API to send a GET request to the endpoint ‘api/2.1/jobs/runs/get’ to retrieve the run’s metadata of a multi-task job using its run_id.
Which statement correctly describes the response structure of this API call?
- Each task of this job run will have a unique task_id
- Each task of this job run will have a unique run_id
- Each task of this job run will have a unique job_id
- Each task of this job run will have a unique orchestration_id
- Tasks do not have any unique identifier within a job run
- Each task of this job run will have a unique run_id
Each task of this job run will have a unique run_id to retrieve its output with endpoint ‘api/2.1/jobs/runs/get-output’
A data engineer has noticed the comment ‘# Databricks notebook source’ on the first line of each Databricks Python file’s source code pushed to Github.
Which of the following explain the purpose of this comment ?
- This comment makes it easier for humans to understand the source of the generated code from Databricks
- This comment established the Python files as Databricks notebooks
- This comment is used for Python auto-generated documentation
- This comment adds the Python file to the search index in the Databricks workspace
- There is no special purpose for this comment
- This comment established the Python files as Databricks notebooks
You can convert Python, SQL, Scala, and R scripts to single-cell notebooks by adding a comment to the first cell of the file:
Databricks notebook source
Which of the following statements best describes DBFS?
- Database File System that organizes and maintains data files in Databricks workspace
- Database File System to interact with files in cloud-based object storage
- Abstraction on top of Databricks Lakehouse that provides an open solution to share data to any computing platform
- Abstraction on top of scalable object storage that maps Unix-like file system calls to native cloud storage API calls
- None of the above
- Abstraction on top of scalable object storage that maps Unix-like file system calls to native cloud storage API calls
The Databricks File System (DBFS) is a distributed file system mounted into a Databricks workspace and available on Databricks clusters. DBFS is an abstraction on top of scalable object storage that maps Unix-like filesystem calls to native cloud storage API calls.
A data engineer wants to install a Python wheel scoped to the current notebook’s session, so only the current notebook and any jobs associated with this notebook have access to that library.
Which of the following commands can the data engineer use to complete this task?
- %fs install my_package.whl
- %pip install my_package.whl
- %python install my_package.whl
- %whl install my_package
- Python wheels can not be installed at the notebook level. They can only be installed at the cluster level
- %pip install my_package.whl
‘%pip install’ allows you to install a Python wheel scoped to the current notebook’s session. This library will be only accessible in the current notebook and any jobs associated with this notebook.
Which of the following statements correctly describes the sys.path Python variable ?
- The sys.path variable contains a list of all the parameters passed to a Python notebook
- The sys.path variable contains a list of all the necessary dependancies for a Python notebook
- The sys.path variable contains a list of directories where the Python interpreter searches for modules
- The sys.path variable contains the full pathname of the current working directory of a Python notebook
- The sys.path variable is an alias for os.path
- The sys.path variable contains a list of directories where the Python interpreter searches for modules
Which of the following statements correctly describes assertions in unit testing ?
- An assertion is a boolean expression that checks if two code blocks are integrated logically and interacted as a group
- An assertion is a boolean expression that checks if assumptions made in the code remain true
- An assertion is a command that logs failed units of code in production for later debugging and analysis
- An assertion is a command that shows the differences between the current version of a code unit and the most recently edited version
- An assertion is a set of actions that simulates a user experience to ensure that the application can run properly under real-world scenarios
- An assertion is a boolean expression that checks if assumptions made in the code remain true
When running an existing job via Databricks REST API, which of the following represents the globally unique identifier of the newly triggered run?
- job_id
- run_id
- run_key
- task_id
- task_key
- run_id
Running an existing job via the endpoint ‘/api/2.1/jobs/run-now’ returns the run_id of the triggered run. This represents the globally unique identifier of this newly triggered run.
There are three tasks running in parallel. If there is an error in the notebook 2 that is associated with Task 2, which statement describes the run result of this job ?
- Task 1 will succeed. Task 2 will partially fail. Task 3 will be skipped.
- Task 1 will succeed. Task 2 will completely fail. Task 3 will be skipped.
- Tasks 1 and 3 will succeed, while Task 2 will partially fail.
- Tasks 1 and 3 will succeed, while Task 2 will completely fail.
- All tasks will completely fail.
- Tasks 1 and 3 will succeed, while Task 2 will partially fail.
If a task fails during a job run, only the dependent tasks, if any, will be skipped. Parallel tasks will run and complete.
The failure of a task will always be partial, which means that the operations in the notebook before the code failure will be successfully run and committed, while the operations after the code failure will be skipped.
The data engineering team has a Delta Lake table created with following query:
CREATE TABLE customers_clone
AS SELECT * FROM customers
A data engineer wants to drop the table with the following query:
DROP TABLE customers_clone
Which statement describes the result of running this drop command ?
- An error will occur as the table is deep cloned from the customers table
- An error will occur as the table is shallow cloned from the customers table
- Only the table’s metadata will be deleted from the catalog, while the data files will be kept in the storage
- Both the tables metadata and the data files will be deleted
- The table will not be dropped until VACUUM command is run
- Both the tables metadata and the data files will be deleted
For production Structured Streaming jobs, which of the following retry policies is recommended to use
- Unlimited Retries, with 1 Maximum Concurrent Run
- Unlimited Retries, with Unlimited Concurrent Runs
- No Retries, with 1 Maximum Concurrent Run
- No Retries, with Unlimited Concurrent Runs
- 1 Retry, with 1 Maximum Concurrent Run
- Unlimited Retries, with 1 Maximum Concurrent Run
Retries: Set to Unlimited.
Maximum concurrent runs: Set to 1. There must be only one instance of each query concurrently active.
Cluster: Set this always to use a new job cluster and use the latest Spark version (or at least version 2.1). Queries started in Spark 2.1 and above are recoverable after query and Spark version upgrades.
Notifications: Set this if you want email notification on failures.
Schedule: Do not set a schedule.
Timeout: Do not set a timeout. Streaming queries run for an indefinitely long time.
A data engineer has a MLFlow model logged in a given “model_url”. They have registered the model as a Spark UDF using the following code:
predict_udf = mlflow.pyfunc.spark_udf(spark, “model_url”)
The data engineer wants to apply this model UDF to a test dataset loaded in the “test_df” DataFrame in order to calculate predictions in a new column “prediction”
Which of the following code blocks allows the data engineer to accomplish this task?
- test_df.apply(predict_udf, *column_list).select(“record_id”,”prediction”)
- test_df.select(“record_id”, predict_udf(“column_list).alias(“prediction”))
- predict_udf(“record_id”, test_df).select(“record_id”,”prediction”)
- mlflow.pyfunc.map(predict_udf, test_df, “record_id”).alias(“prediction”)
- mlflow.pyfunc.map(predict_udf, test_df, “record_id”).alias(“prediction”)
- test_df.select(“record_id”, predict_udf(“column_list).alias(“prediction”))
In PySpark Dataframe, you can create a new column based on function return value.
In Delta Lake tables, which of the following is the file format for the transaction log ?
- Delta
- Parquet
- JSON
- Hive-specific format
- Both Parquet and JSON
- Both Parquet and JSON
Delta Lake builds upon standard data formats. Delta lake table gets stored on the storage in one or more data files in Parquet format, along with transaction logs in JSON format.
In addition, Databricks automatically creates Parquet checkpoint files every 10 commits to accelerate the resolution of the current table state.
Which of the following describes the minimal permissions a data engineer needs to modify permissions of an existing cluster ?
- Can Restart
- Can Manage
- Cluster creation allowed + Can Restart
- Cluster creation allowed + Can Mange
- Only administrators can modify permissions
- Can Manage
Which of the following is the default target file size when compacting small files of a Delta table by manually running OPTIMIZE command ?
- 64 MB
- 128 MB
- 256 MB
- 512 MB
- 1024 MB
- 1024 MB
The OPTIMIZE command compact small data files into larger ones. The default value is 1073741824, which sets the size to 1 GB.
A junior data engineer is using the following code to de-duplicate raw streaming data and insert them in a target Delta table
spark.readStream
.table(“orders_raw”)
.dropDuplicates([“order_id”, “order_timestamp”])
.writeStream
.option(“checkpointLocation”, “dbfs:/checkpoints”)
.table(“orders_unique”)
A senior data engineer pointed out that this approach is not enough for having distinct records in the target table when there are late-arriving, duplicate records.
Which of the following could explain the senior data engineer’s remark?
- Watermarking is also needed to only track state information for a window of time in which we expected records could be delayed
- A ranking function is also needed to ensure processing only the most recent records
- A window function is also needed to apply deduplication for each non-overlapping interval
- The new records need also to be deduplicated against previously inserted data into the table
- More information is needed
- The new records need also to be deduplicated against previously inserted data into the table
To perform streaming deduplication, we use dropDuplicates() function to eliminate duplicate records within each new micro batch. In addition, we need to ensure that records to be inserted are not already in the target table. We can achieve this using insert-only merge.
“A feature built into Delta Lake that allows to automatically generate CDC feeds about Delta Lake tables”
Which of the following is being described in the above statement?
- Auto Optimize
- Optimized writes
- Spark Watermarking
- Slowly Changing Dimension (SCD)
- Change Data Feed (CDF)
- Change Data Feed
Change Data Feed ,or CDF, is a new feature built into Delta Lake that allows it to automatically generate CDC feeds about Delta Lake tables.
CDF records row-level changes for all the data written into a Delta table. This includes the row data along with metadata indicating whether the specified row was inserted, deleted, or updated.
A data engineer uses the following SQL query:
GRANT MODIFY ON TABLE employees TO hr_team
Which of the following describes the ability given by the MODIFY privilege ?
- It gives the ability to add data from the table
- It gives the ability to delete data from the table
- It gives the ability to modify data in the table
- All the above abilities are given by the MODIFY privilege
- None of these options correctly describe the ability given by the MODIFY privilege
- All of the above
Which of the following statements regarding the retention policy of Delta Lake CDF is correct?
- Running the VACUUM command on the table deletes CDF data as well
- Running the VACUUM command on the table does not delete CDF data
- Running the VACUUM command on the table does not delete CDF data unless CASCADE clause is set to true
- CDF data files can be purged by running VACUUM CHANGES command
- CDF data files can never be permanently purged from Delta Lake
- Running the VACUUM command on the table deletes CDF data as well
Databricks records change data for UPDATE, DELETE, and MERGE operations in the _change_data folder under the table directory.
The files in the _change_data folder follow the retention policy of the table. Therefore, if you run the VACUUM command, change data feed data is also deleted.
Given the following Structured Streaming query:
(spark.readStream
.table(“orders”)
.writeStream
.option(“checkpointLocation”, checkpointPath)
.table(“Output_Table”)
)
Which of the following is the trigger interval for this query?
- Every half second
- Every half min
- Every half hour
- The query will run in batch mode to process all available data, then stop
- More information is needed
- Every half second
By default, if you don’t provide any trigger interval, the data will be processed every half second. This is equivalent to trigger(processingTime=”500ms”)
A data engineer run the following CTAS statement in a SQL notebook attached to an All-purpose cluster:
CREATE TABLE course_students
AS ( SELECT c.course_name, t.student_id, t.student_name
FROM courses c
LEFT JOIN (
SELECT s.student_id, s.student_name, e.course_id
FROM students s
INNER JOIN enrollments e
ON s.student_id = e.student_id
) t
ON c.course_id = t.course_id
WHERE c.active = true
)
Which statement describes the resulting course_students table ?
- It’s a virtual table that has no physical data. The SELECT statement will be executed each time the course_students table is queried.
- It’s a cluster-scoped virtual table. The SELECT statement will be executed only the first time the course_students table is queried. The query output will be stored in the memory of the currently active cluster.
- It’s a Delta Lake table. The SELECT statement will be executed at the table creation, but its output will be stored in the memory of the currently active cluster.
- It’s a cluster-scoped table. The SELECT statement will be executed at the table creation, but its output will be stored in the memory of the currently active cluster.
- It’s a session-scoped table. The SELECT statement will be executed at the table creation, but its output will be stored in the cache of the current active Spark session.
- It’s a Delta Lake table. The SELECT statement will be executed at the table creation, but its output will be stored in the memory of the currently active cluster.
A data engineer has a streaming job that updates a Delta table named ‘user_activities’ by the results of a join between a streaming Delta table ‘activity_logs’ and a static Delta table ‘users’.
They noticed that adding new users into the ‘users’ table does not automatically trigger updates to the ‘user_activities’ table, even when there were activities for those users in the ‘activity_logs’ table.
Which of the following likely explains this issue?
- The users table must be refreshed with REFRESH TABLE command for each microbatch of this join
- This stream-static join is not stateful by default unless they set the spark configuration delta.statefulStreamStaticJoin to true
- The streaming portion of this stream-static join drives the join process. Only new data appearing on the streaming side of the join will trigger the processing.
- The static portion of the stream-static join drives this join process only in batch mode.
- In Delta Lake, static tables can not be joined with streaming tables.
- The streaming portion of this stream-static join drives the join process. Only new data appearing on the streaming side of the join will trigger the processing.
In stream-static join, the streaming portion of this join drives this join process. So, only new data appearing on the streaming side of the join will trigger the processing. While, adding new records into the static table will not automatically trigger updates to the results of the stream-static join.
Given the following query on the Delta table ‘customers’ on which Change Data Feed is enabled:
spark.read
.option(“readChangeFeed”, “true”)
.option(“startingVersion”, 0)
.table (“customers”)
.filter (col(“_change_type”).isin([“update_postimage”]))
.write
.mode(“append”)
.table(“customers_updates”)
Which statement describes the result of this query each time it is executed?
- Newly updated records will be merged into the target table, modifying previous entries will the same primary keys
- Newly updated records will be appended to the target table
- Newly updated records will overwrite the target table
- The entire history of updated records will be appended to the target table at each execution, which leads to duplicate entries
- The entire history of updated records will overwrite the target table at each execution
- The entire history of updated records will be appended to the target table at each execution, which leads to duplicate entries
Reading table’s changes, captured by CDF, using spark.read means that you are reading them as a static source. So, each time you run the query, all table’s changes (starting from the specified startingVersion) will be read.
The query in the question then appends the data to the target table at each execution since it’s using the ‘append’ writing mode.
A data engineer wants to use Autoloader to ingest input data into a target table, and automatically evolve the schema of the table when new fields are detected.
They use the below query with a blank:
spark.readStream
.format(“cloudFiles”)
.option(“cloudFiles.format”, “json”)
.option(“cloudFiles.schemaLocation”, checkpointPath)
.load(source_path)
.writeStream
.option(“checkpointLocation”, checkpointPath)
.___________
.start(“target_table”)
Which option correctly fills in the blank to meet the specified requirement ?
- option(“cloudFiles.schemaEvolutionMode”,”addNewColumns”)
- option(“cloudFiles.mergeSchema”,True)
- option(“mergeSchema”,True)
- schema(schema_definition, mergeSchema=True)
- Autoloader can not automatically evolve the schema of the table when new fields are detected
- option(“mergeSchema”,True)
Schema evolution is a feature that allows adding new detected fields to the table. It’s activated by adding .option(‘mergeSchema’, ‘true’) to your .write or .writeStream Spark command.
Question 18: Correct
Given the following query:
spark.table(“stream_sink”)
.filter(“recent = true”)
.dropDuplicates([“item_id”, “item_timestamp”])
.write
.mode (“overwrite”)
.table(“stream_data_stage”)
Which statement describes the result of executing this query ?
- An incremental job will overwrite the stream_sink table by those deduplicated records from stream_data_stage that have been added since the last time the job was run
- An incremental job will overwrite the stream_data_stage table by those deduplicated records from stream_sink that have been added since the last time the job was run
- A batch job will overwrite the stream_data_stage table by deduplicated records calculated from all “recent” items in the stream_sink table
- A batch job will overwrite the stream_data_stage table by deduplicated records calculated from all “recent” items in the stream_sink table
- A batch job will overwrite the stream_data_stage table by those deduplicated records from stream_sink that have been added since the last time the job was run
- A batch job will overwrite the stream_data_stage table by deduplicated records calculated from all “recent” items in the stream_sink table
Reading a Delta table using spark.table() function means that you are reading it as a static source. So, each time you run the query, all records in the current version of the ‘stream_sink’ table will be read, filtered and deduplicated.
There is no difference between spark.table() and spark.read.table() function. Actually, spark.read.table() internally calls spark.table().
The query in the question then writes the data in mode “overwrite” to the ‘stream_data_stage’ table, which completely overwrites the table at each execution.