spark Flashcards
Tell me about Yourself
Hello everyone, I am Sonali Sahu, Currently I am Working as a Data Engineer - I at TCS. Overall, I have 3 Years of Experience. Currently, I am leading a team of 4 Members in my project. The tech stack I have worked on is Python, SQL, Pyspark, hive, Azure Synapse Analytics for Analytics, Azure Data Factory for ETL orchestration, Azure Blob/ Data Lake for Storage.
How can we print bad records of a csv?
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName(“CSVReadWithBadRecords”).getOrCreate()
Set the path for bad records
bad_records_path = “hdfs://your_hdfs_path/bad_records”
Read the CSV file with the badRecordsPath option
df = spark.read.option(“badRecordsPath”, bad_records_path).csv(“your_csv_file.csv”)
How is head () and take () different in Spark?
head() is used on DF while take() can be used on rdd/df also.
How is data defined in spark?
data = [(1,’Sample Name’),(2,’Sample Name2’)]
which means list of tuples, where length of tuple is equal to length of columns in schema.
In which scenario can we get Driver OOM?
- collect (), which collects data from all partitions and loads on driver node.
- broadcast join (), where a considerably small table is broadcasted to all partitions but its size is higher than the space available in the driver. There rises Driver OOM.
in which scenario can we get executor OOM?
- Big Partition (Data Skewness)
- Yarn Memory Overhead
- High Concurrency
Most Important Questions
What is AQE in spark?
Spark 1.x - Introduced Catalyst Optimizer
Spark 2.x - Introduced Cost - Based Optimizer
Spark 3.0 - Introduced Adaptive Query Execution
conf command: -
spark.conf.set(“spark.sql.adaptive.enabled”.True)
Spark Optimizer selector selects the most efficient physical plan.
cost based optimizer will look for statistics.
last collected and what is the time difference when the data is changed. There might be a huge gap
and in this scenario , cost based model may not perform well.
in AQE, let say there are 10 stages in the application. 1st stage is completed and its result is return to the cluster. the AQE will try to optimize the physical plan. SO, it will accurate statistics.
Features of AQE: -
1. Dynamically coalescing shuffle partitions
2. Dynamically Switching join strategies
3. Dynamically optimizing skew joins.
What are the Join Strategies in Spark?
Apache Spark provides several join strategies to perform efficient join operations on DataFrames or Datasets. The choice of join strategy depends on the characteristics of the data, the size of the tables, and the available system resources. Here are some common join strategies in Spark:
-
Shuffle Hash Join:
- Shuffle hash join is the default join strategy in Spark. It’s used when one or both of the DataFrames being joined are large enough to require shuffling the data.
- It redistributes and partitions the data based on the join keys and then performs a local hash join on each partition.
-
Broadcast Join:
- Broadcast join is used when one of the DataFrames is small enough to fit in memory on each worker node. This smaller DataFrame is broadcast to all worker nodes, and the join is performed locally.
- Broadcast joins are much more efficient for smaller tables and can significantly reduce the amount of shuffling.
-
Sort-Merge Join:
- Sort-merge join is used when both DataFrames to be joined are sorted based on the join keys.
- It performs a merge operation on the sorted DataFrames, making it particularly efficient when both DataFrames are already sorted.
-
Bucketed Join:
- Bucketed join is a join strategy for tables that have been pre-partitioned into buckets. Each bucket contains data with the same join key.
- This strategy can significantly reduce data shuffling, as it leverages the bucketing information to co-locate matching buckets on the same worker node.
-
Cartesian Join:
- Cartesian join is the most resource-intensive join strategy. It combines every row from the first DataFrame with every row from the second DataFrame, resulting in a Cartesian product.
- This strategy should be used with caution, as it can lead to a large number of output records.
-
Broadcast Hash Join:
- Similar to broadcast join, broadcast hash join is used when one DataFrame can fit in memory, and the other DataFrame is hashed and broadcast to all worker nodes for the join operation.
-
Map-side Join:
- Map-side join is a variation of join used in Spark MapReduce. It leverages the MapReduce framework to perform joins efficiently.
The choice of join strategy should be based on the size of the DataFrames, the available system resources, and the specific characteristics of the data. Efficient use of join strategies can significantly impact the performance of your Spark applications, so it’s essential to select the right strategy for your use case. Additionally, Spark’s query optimizer may also make decisions on the join strategy based on cost and data statistics.
What is check pointing in spark?
to give save point in plans to save that point and recover when application fails.
Checkpoint, breaks the lineage where as cache/persist don’t.
Check point saves the data to storage while cache/persist writes it the memory/disk.
checkpoint saved data is still available when application session is closed while it is not for cache / persist.
What is Serializer in Spark?
Serialization refers to converting objects into a stream of bytes and vice-versa in an optimal way to transfer it over nodes of network or store it in a file/memory buffer.
2 types: -
Java Serialization (Default)
Kryo Serialization (Recommended by Spark)
spark. Writer example using partitionBy
df.write.option(“header”, True)\
.partitionBy(“State”,”City”)\
.mode(“overwrite”)
.saveAsTable(‘path/to/file”)
spark. Writer example using bucketBy
df.write.option(“header”, True)\
.bucketBy(“State”,”City”)\
.mode(“overwrite”)
.saveAsTable(‘path/to/file”)
What is spark submit command ?
spark-submit \
–class your.main.class \
–master <master-url> \
--deploy-mode <deploy-mode> \
--executor-memory <executor-memory> \
--total-executor-cores <total-cores> \
--conf spark.executor.extraJavaOptions=<extra-java-options> \
--conf spark.driver.memory=<driver-memory> \
--conf spark.driver.cores=<driver-cores> \
--conf spark.sql.shuffle.partitions=<shuffle-partitions> \
--conf spark.default.parallelism=<default-parallelism> \
--conf spark.eventLog.enabled=true \
--conf spark.eventLog.compress=true \
--conf spark.eventLog.dir=hdfs://<hdfs-path>/event-log \
your-application.jar</hdfs-path></default-parallelism></shuffle-partitions></driver-cores></driver-memory></extra-java-options></total-cores></executor-memory></deploy-mode></master-url>
Most Frequently Asked
How do you optimize spark jobs?
- Broadcast Join
- Caching
- Shuffle Partition
- Filter before shuffling
what is sharding?
Sharding is a database partitioning technique used in data engineering and database management to horizontally partition a database into smaller, more manageable pieces called “shards.” Each shard is a self-contained database that stores a subset of the data. Sharding is typically used to address scalability and performance challenges associated with very large databases.
Most challenging scenario in your Project?
The most challenging scenario in a Data Engineering project with respect to data size is often dealing with extremely large volumes of data, especially when it exceeds the capacity of a single machine or storage system. This requires implementing distributed data processing and storage solutions, such as data partitioning, sharding, and optimizing data transfer and transformation pipelines to handle Big Data efficiently.
What file formats have you worked with?
CSV, parquet