Delta Lake Flashcards
What is Delta Lake?
- Open storage format
- Optimised storage layer that provides the foundation for storing data and tables in Databricks Lakehouse Platform
- Open source software that extends Parquet data files with:
- File based transaction log for ACID transactions
- Scalable metadata handling
- Default storage format for all operations on Databricks
- All tables on Databricks are delta tables unless specified
How does Delta Lake implement ACID transactions?
Atomicity
- Each transaction treated as single unit
- Entire statement or none is executed
- Prevents data loss and corruption
Consistency
- Ensures transactions only make changes to tables in predefined, predictable ways
- Ensures that corruption or errors in your data do not create unintended consequences
Isolation
- Ensures that concurrent transactions don’t interfere with or affect one another
Durability
- Ensures that changes to your data made by successfully executed transactions will be saved
Why strive to achieve ACID transactions?
- Ensure the highest possible data reliability and integrity
- Ensure data never falls into an inconsistent state
Features and benefits of Delta Lake (7)
- Schema Enforcement
- ACID Transactions
- Schema Evolution
- Time Travel
- Unified Batch and Streaming
- Scalable Metadata Handling
- Optimistic concurrency control
External vs Managed tables
Managed Tables
- Data stored internally within the Databricks managed storage (DBFS).
- Databricks controls both the metadata and the data. If you drop the table, both the table metadata and the actual data are removed
External Tables
- Data is stored outside Databricks (e.g. cloud object storage)
- The table metadata is still stored within the Databricks metastore
- When dropping a table, only the metadata in the DB metastore is removed, the data remains in the external location
When to use Managed Tables
- Simpler management: DB fully manages both your data and metadata
- It is the default option if a location is not specified when creating a table
- Data is managed in a unified way, especially for Delta format
When to use External Tables
- When you want to store your data outside Databricks and still use it within Databricks
- Useful for datasets that need to be accessed from multiple systems or environments
- Better for data sharing (through cloud storage access)
- Store data independently of the Databricks cluster (persistence)
How to create a managed table (sql)
We create a managed table by not specifying a location. We can specify the schema (db) to use
USE ${da.schema_name}_default_location;
CREATE OR REPLACE TABLE managed_table (width INT, length INT, height INT);
INSERT INTO managed_table
VALUES (3, 2, 1);
SELECT * FROM managed_table;
How do we find the location of a managed table? (sql & python)
By default, managed tables in a schema without the location specified will be created in the dbfs:/user/hive/warehouse/<schema_name>.db/ directory.</schema_name>
%sql
DESCRIBE EXTENDED managed_table;
%python
tbl_location = spark.sql(f”DESCRIBE DETAIL managed_table”).first().location
print(tbl_location)
files = dbutils.fs.ls(tbl_location)
display(files)
What are CTAS statements and why do we use them?
CREATE TABLE AS SELECT statements allow us to create and populate Delta tables using data retrieved from an input query.
- They automatically infer schema information from query results and do NOT support manual schema declaration
- Useful for external data ingestion from sources with well-defined schemas, such as Parquet files and tables
- Do not support specifying additional file options
- Not ideal for files such as CSV (create TEMP VIEW first with manual schema then use CTAS)
CTAS syntax (sql)
CREATE TABLE sales AS
SELECT * FROM parquet.’${variable}.path’;
What are the two types of constraints that can be enforced at table creation, and how are they added (sql)?
Constraints enforce the quality and integrity of data added to a table through automatic verification. When a constraint is violated, the transaction will fail with error.
There are two types:
- NOT NULL (cannot be null), e.g.:
ALTER TABLE people10m ALTER COLUMN middleName DROP NOT NULL; - CHECK (boolean expression) e.g.:
ALTER TABLE people10m ADD CONSTRAINT dateWithinRange CHECK (birthDate > ‘1900-01-01’);
Existing constraints can be viewed with DESCRIBE EXTENDED table;
What are two optional metadata fields we can add to the table at creation and how are they added?
- current_timestamp(): records the timestamp when the logic is executed
- input_file_name(): records the source data file for each record in the table
E.g.
CREATE OR REPLACE TABLE users_pii
COMMENT “Contains PII”
LOCATION “${da.paths.working_dir}/tmp/users_pii”
PARTITIONED BY (first_touch_date)
AS
SELECT *,
cast(cast(user_first_touch_timestamp/1e6 AS TIMESTAMP) AS DATE) first_touch_date,
current_timestamp() updated,
input_file_name() source_file
FROM parquet.${da.paths.datasets}/ecommerce/raw/users-historical/
;
Difference between DEEP CLONE and SHALLOW CLONE
DEEP CLONE fully copies data and metadata from a source table to a target. Occurs incrementally, so executing this command again can sync changes from source to target
SHALLOW CLONE just copies the Delta transaction logs, so the data doesn’t actually move. Good option to quickly create a copy of a table without risking modifying the current table.
Benefits of overwriting a table as opposed to deleting and recreating a table and what are the two ways of overwriting a table with Spark SQL?
- Overwriting is much faster because it doesn’t need to list the directory recursively or delete any files
- The old version can still be accessed via time travel
- It’s an atomic operation. Concurrent queries can still read the table while you are overwriting
- Due to ACID guarantees, if overwriting fails, the table will be in its previous state
Can be accomplished with CREATE OR REPLACE TABLE (CRAS) or INSERT OVERWRITE
CRAS vs INSERT OVERWRITE syntax (sql)
CREATE OR REPLACE TABLE table_name
AS SELECT * FROM file_path
INSERT OVERWRITE table_name
SELECT * FROM file_path
CRAS vs INSERT OVERWRITE differences
Both achieve a near identical outcome, however:
INSERT OVERWRITE:
- can only overwrite an existing table, not create a new one like CRAS
- can overwrite with new records that match the current table schema (“safer” overwriting technique)
- can overwrite individual partitions
CRAS allows us to completely redefine the schema (and contents) of the target table, but INSERT OVERWRITE will fail if we try to do so.
How and why to use INSERT INTO
INSERT INTO is for appending rows:
- Append new rows to an existing Delta table
- Allows incremental updates to existing tables
- More efficient than overwriting each time
INSERT INTO table_name
SELECT * FROM file_path
How and why to use MERGE INTO
- Upsert data from a source table, view or DataFrame into a target Delta table using MERGE
- Supports inserts, updates and deletes that are completed as single transaction
- Multiple conditions can be added
- Extensive options for implementing custom logic
MERGE INTO target a
USING source b
ON {merge_condition}
WHEN MATCHED THEN {matched_action}
WHEN NOT MATCHED THEN {not_matched_action}
How and why to use an insert-only MERGE
- Avoid inserting duplicate records
MERGE INTO events a
USING events_update b
ON a.user_id = b.user_id AND a.event_timestamp = b.event_timestamp
WHEN NOT MATCHED AND b.traffic_source = ‘email’ THEN
INSERT *
How to load data incrementally into a Delta Table
Use COPY INTO as an idempotent way to incrementally ingest data from external systems
- Data schema should be consistent
- Duplicate records should try to be excluded or handled downstream
- Potentially much cheaper than full table scans for data that grows predictably
- Valuable when new files continue to appear in a directory
COPY INTO sales
FROM “${variables}/path”
FILEFORMAT = PARQUET
How can we explore the files backing a Delta Table and what would we expect to see?
%python
display(dbutils.fs.ls(f”{table/file/path”))
- We likely see a number of Parquet files (the data iteself)
- There will also be a directory called _delta_log which records the transactions to the Delta Lake table
- Each transaction against the table will result in a json file being written to the log
- We can also use DESCRIBE DETAIL to see details about a delta table including the number of files
How to Optimise and Index a Delta Table
- Sometimes a Delta Table can be backed by many small files, such as when a number of operations occurred with only one or so records inserted
- Using the OPTIMIZE command, we can combine toward an optimal size to replace the existing data files by combining records and rewriting the results.
- We can also specify ZORDER indexing, which speeds up data retrieval when filtering on provided fields by colocating data with similar values within data files.
OPTIMIZE table_name
ZORDER BY column_name
Explain Time Travel and ways to do so (sql & python)
Previous versions of delta tables can be queried by specifying integer version or timestamp:
df = spark.read \
.format(“delta”) \
.option(“timestampAsOf”, “2019-01-01”) \
.load(“/path/to/my/table”)
SELECT count(*) FROM my_table
TIMESTAMP AS OF “2019-01-01”
df = spark.read \
.format(“delta”) \
.option(“versionAsOf”, “5238”) \
.load(“/path/to/my/table”)
SELECT count(*) FROM my_table
VERSION AS OF 5238
How can you restore a table to a previous version/timestamp (sql)?
RESTORE TABLE employee TO TIMESTAMP AS OF ‘2022-08-02 00:00:00’;
RESTORE TABLE employee TO VERSION AS OF 1;
How can we clean up unused data files behind a delta table?
- Databricks cleans up stale log files after 30 days by default
- Keeping lots of log files is expensive
- VACUUM allows us to purge old data files:
VACUUM table_name [RETAIN num HOURS] [DRY RUN]
For hours less than 7 days you have to set the following to false:
SET spark.databricks.delta.retentionDurationCheck.enabled = false