SD General Flashcards
What are the common use cases that event order are really important
Payments transaction.
Video conferencing.
Vehicle control systems.
Online gaming.
Sensor metrics publishing and changes capture.
Why we don’t want to store database in the application machines?
Bottleneck; independent scaling.
Typically application servers become a bottleneck to the system before the actual accessing to the data does.
What is write ahead log and why do we need it
Recovery from crash - data durability. RAM lose data after crash.
This is an append-only file to which every DB modification must be written before it can be committed to the DB. When the database comes back up after a crash, this log is used to restore the DB back to a consistent state.
Write ahead log is completely on disk so it’s durable, we can just replay the records in the WAL to repopulate the data.
Atomicity - revert some writes per errors.
Durability and consistency - replay the logs and rebuild the state.
———————
In database systems that use Write-Ahead Logging (WAL) and implement row-level locking, the behavior regarding when writes are recorded in the WAL relative to locking can vary depending on the specific database’s implementation and configuration. However, there are common patterns and practices that can be generalized to understand how most systems likely behave.
-
Acquiring the Lock:
- Before any write operation (update, delete, or insert) can be applied to a row in a database, the transaction that intends to make the change must first acquire the appropriate lock on that row. This lock ensures that no other concurrent transactions can modify the same row until the lock is released.
-
Writing to the WAL:
- Once the lock is successfully acquired and the row is effectively under the control of the current transaction, the changes are first recorded in the WAL. This step is crucial for ensuring data durability and atomicity; the database logs the intention to make changes before any actual modifications are applied to the data files.
- The WAL entry typically includes information necessary to redo the operation (in case of a crash after the WAL record is written but before the changes are committed) and to undo the operation (in case the transaction needs to be rolled back).
-
Applying Changes to the Data:
- After the WAL is updated, and the changes are securely logged, the transaction then applies these changes to the actual data stored in the database. This step is performed while the row lock is still held, preventing other transactions from accessing the row until the current transaction is complete.
-
Committing the Transaction:
- The transaction can be committed only after the WAL records are safely written to disk. Once the commit is successful, the locks can be released, allowing other transactions to access or modify the row.
- Order of Operations: The crucial aspect is that the WAL is written to after acquiring the necessary locks but before the changes are committed and the locks are released. This ensures that the database can always recover to a consistent state, as the WAL contains all the information required to redo or undo transactions up to the last known good state.
- Handling Deadlocks and Delays: If a lock cannot be acquired because another transaction holds a conflicting lock, the transaction will typically wait until the lock is available. During this waiting period, nothing is yet written to the WAL. If a deadlock is detected (where two or more transactions are waiting on each other indefinitely), one of the transactions will be chosen as a victim and rolled back to resolve the deadlock.
- Performance Implications: The need to acquire locks before writing to the WAL can impact performance, particularly in high-contention environments where many transactions are competing for the same rows. Database systems often implement sophisticated concurrency control mechanisms to minimize locking delays and optimize transaction throughput.
In summary, in systems using WAL and row-level locking, the write to the WAL occurs after the lock on the row is successfully acquired but before the transaction is committed. This sequence ensures that all changes logged in the WAL are guaranteed to be applied atomically and durably, maintaining the integrity and consistency of the database even in the face of system failures or crashes.
What is read and write time complexity of hash indexes? Why hash indexes are only stored in memory? Why range query is expensive for hash index?
Redis for key-value retrieval.
Riak for key-document retrieval.
PostgreSQL, MySQL for exact match queries.
Cassandra use it to map data to nodes using hash value of partition keys.
Basically hash map.
O(1) write and read for hash function, because memory or RAM is O(1) for random access by direct addressing.
Optimized for point queries, where we need to find rows with specific keys.
Subject to the requirement that all indexes can fit into memory. RAM is expensive, less, not durable.
Hash key is randomly and evenly distributed, and inherently lacking of order. Need to scan the entire hash table to find all keys that falls into the range.
Hash indexes are not good for hard disk due to random distribution of data blocks on disk, and therefore lack spatial locality, related data are not stored closer to each other. The disk arm or pointer needs to jump around.
Rehashing and redistribution leads to disk I/O overhead.
What is branching factor in B tree and how does it affect the time complexity of b-tree
MySQL, PostgreSQL, Oracle, MongoDB.
Number of children a node can have. Typically it is several hundred, depends on the storage to store page pointers and the range boundaries. Most databases can fit into B-tree of 3 or 4 levels. A 4 level tree of 4 KB pages with branching factor 500 can store up to 256 TB.
The higher the branching factor is, the less height the tree has, and therefore improving the read and write efficiency.
However, a large node can increase memory overhead, because some keys that act as separators can be stored in memory to speed up get guide to the correct leaf node.
DDIA 81
Why do we still need write ahead logs for B-tree indexes?
DDIA 82.
Operations requiring several pages to be overridden.
If the node crashes when you are partially done with page splits, you are ended up with a corrupted indexes.
How do we delete a record from the log structured file segments in disk, SST or simple unsorted segments
Append a special deletion record sometimes called a tombstone.
LSM tree + SST
Advantages over hash index + log structure files
Paper that proposed this idea came out at 1996.
Apache Cassandra - minimize disk I/O and maximize write throughput.
RocksDB, LevelDB, Google Bigtable
LSM tree is also in memory, same as hash indexes. It can be either red black trees or AVL trees. They are also called memtable.
Inorder traversal and write to disk as sorted string table when it gets full ( a few MBs). Every-time when we write memtable to SSTable, we can discard the corresponding WAL.
In Lucene, indexing engine, the mapping from term to postings list is kept in SSTable like sorted files, which are merged in the background as needed.
DDIA 75
LSM tree and SSTable optimization
Sparse index - the offsets of keys in the log segment.
Bloom filters - what keys does not appear in the database. Never provide false negative - if it says a key does not exist, it would definitely not exist.
Why is LSM tree + SST is a middle ground between hash index and b-tree
Not bounded by memory like hash indexes; can do range query that hash indexes are bad at
Better write performance than b tree while read performance not as good as b tree.
It requires extra CPU bandwidth for compaction
What is address index? Why do we need it sometimes?
Index maps to disk addresses. Like the sparse index we discussed for SSTable.
It’s a type of non clustered index. Don’t store entire row or duplicate data when we have indexes on different column to minimize the data we store.
What is multi-dimensional index and why do we need it?
Like a combo index; composite index.
Example: geospatial dataset, latitude and longitude.
For example, PostGIS implements geospatial indexes as R-trees using PostgreSQL’s Generalized Search Tree indexing facility.
An interesting idea is that multi-dimensional indexes are not just for geographic locations. For example, on an ecommerce website you could use a three-dimensional index on the dimensions (red, green, blue) to search for products in a certain range of colors, or in a database of weather observations you could have a two-dimensional index on (date, temperature) in order to efficiently search for all the observations during the year 2013 where the temperature was between 25 and 30℃. With a onedimensional index, you would have to either scan over all the records from 2013 (regardless of temperature) and then filter them by temperature, or vice versa. A 2D index could narrow down by timestamp and temperature simultaneously. This technique is used by HyperDex.
DDIA 87
What is atomicity
All or nothing principle. All writes for a transaction should succeed or none of them succeed
Example - exchange of money
DDIA 223
What is database consistency
If the transaction fails it fails gracefully, no invariants should be broken - foreign key constraint, unique constraint and check constraint.
Check constraint - specific rules beyond basic data types constraint; businesses rules within the database schema.
For example, employee hiring date always greater than date of birth.
In accounting, credits and debits across all accounts should be balanced.
Atomicity focus on transactions are executed as indivisible units;
consistency focus mainly on maintaining the integrity of database in line with its invariants, rules and constraints.
Application replies on database atomicity and isolation properties in order to achieve consistency, but it’s not up to the database alone.
DDIA 225
What is isolation
No race conditions.
Concurrently executing transactions are isolated from each other.
DDIA 225
What is Durability
No data lost.
Once a transaction has committed successfully, any data it has written will not be forgotten, even if there is a hardware fault or database crashes.
DDIA 226
What is dirty writes? Give an example
Overwrite uncommitted data.
Two transactions concurrently try to update the same object in a database.
If the earlier write is a part of a transaction that has not yet committed and the later write overwrites an uncommitted value.
DDIA 235
What is dirty read?
A transaction has not yet committed or aborted and another transaction see the uncommitted data.
DDIA 234
—————————
Dirty reads are problematic because they can lead to inconsistencies, unreliability, and potential data corruption in a database. When a transaction reads data that has been modified but not yet committed by another transaction, it risks basing its operations on temporary, potentially invalid data. This can result in several issues:
-
Inconsistencies:
- If the transaction that made the uncommitted change is rolled back, the dirty read transaction would have read and possibly acted on data that never becomes permanent.
-
Cascading Rollbacks:
- If a dirty read occurs and the initial transaction is rolled back, subsequent transactions that read the uncommitted data might also need to be rolled back, causing a cascade of rollbacks.
-
Unreliability:
- Applications relying on database transactions may become unreliable if they frequently operate on data that is subject to change. This unpredictability can lead to unexpected behavior and errors.
-
Potential Data Corruption:
- In some cases, decisions made based on dirty reads can lead to corrupt data states that are difficult to detect and correct.
Scenario: Bank Account Transfers
-
Transaction T1: User A transfers $500 from their checking account to their savings account.
- Step 1: Deduct $500 from checking account (uncommitted).
- Step 2: Add $500 to savings account (uncommitted).
-
Transaction T2: User A checks their total balance across both accounts.
- Reads checking account balance.
- Reads savings account balance.
Problem:
- Dirty Read: Transaction T2 reads the balance after the deduction but before the addition.
- Checking account balance: $500 (after deduction).
- Savings account balance: $1000 (before addition).
- Total balance seen by the user: $500 + $1000 = $1500.
- T1 is rolled back due to an error.
- Actual balance should remain $1000 + $1000 = $2000.
- User sees an incorrect total balance of $1500, leading to confusion and possible incorrect financial decisions.
Scenario: E-commerce Inventory System
-
Transaction T1: An item is sold, reducing inventory.
- Step 1: Reduce item inventory by 1 (uncommitted).
- Step 2: Confirm sale and commit changes.
-
Transaction T2: A customer checks the availability of the item.
- Reads the current inventory level.
Problem:
- Dirty Read: Transaction T2 reads the inventory level after the reduction but before the transaction is committed.
- Inventory before sale: 10 units.
- Inventory after uncommitted reduction: 9 units.
- T1 fails and rolls back, keeping the inventory at 10 units.
- T2 reports 9 units available.
- Customer places an order based on incorrect inventory information, potentially leading to overselling and customer dissatisfaction.
Scenario: Real-Time Data Analytics
-
Transaction T1: Data from multiple sensors is aggregated and temporarily stored for real-time analytics.
- Step 1: Insert sensor data (uncommitted).
- Step 2: Calculate aggregate values (uncommitted).
-
Transaction T2: A report is generated based on the aggregated data.
- Reads aggregated data.
Problem:
- Dirty Read: Transaction T2 reads the aggregated data that includes uncommitted sensor data.
- Sensor data before aggregation: 50 readings.
- Aggregated result after uncommitted inserts: 60 readings.
- T1 fails, and the uncommitted sensor data is rolled back.
- Report generated shows incorrect aggregated data, leading to inaccurate analytics and potentially flawed business decisions.
To avoid dirty reads, databases can employ higher isolation levels that prevent transactions from reading uncommitted data. Common isolation levels that prevent dirty reads include:
-
Read Committed:
- Transactions can only read data that has been committed by other transactions. This isolation level ensures that dirty reads do not occur.
-
Repeatable Read:
- Ensures that if a transaction reads a row, it will see the same data if it reads that row again within the same transaction, preventing non-repeatable reads as well as dirty reads.
-
Serializable:
- Provides the strictest level of isolation, ensuring that transactions are completely isolated from each other and preventing all types of read anomalies, including dirty reads, non-repeatable reads, and phantom reads.
Dirty reads can lead to various issues, including data inconsistencies, unreliable applications, cascading rollbacks, and potential data corruption. By using appropriate isolation levels, databases can prevent dirty reads and ensure data integrity and consistency.
What is the downside of using the same row level locks to prevent dirty reads? What is the common approach used in practice
Long running write transaction can force read only transactions to wait until that long running transaction has completed.
Commonly used approach is that databases remember both old committed value and new value by the transaction that holds the lock. Read requests get the old value so that writer does not block reader.
DDIA 237
How do we deal with dead row level locks?
Database automatically detect deadlocks and abort one of them so that the others can make progress. The aborted transaction needs to be retried by the application.
DDIA 258
What does it mean that a database implements read committed isolation
A DB that blocks both dirty writes and dirty reads.
DDIA 236
What is read skew? Give an example
Also called non repeatable read.
We are reading committed data but there is a write executed in the middle of massive read, and therefore some invariant doesn’t hold anymore.
Long analytical read.
DDIA 238
How do we resolve a read skew problem
Snapshot isolation - DDIA 238
Assign a monotonic increasing number to each write. Store all the data overridden with a transaction number.
The transaction sees all the data that was committed at the start of the transaction. Subsequent transactions shouldn’t impact.
A key principle is writers never block readers and vice versa.
What is lost updates
Increment the counter.
Read-modify-write.
Atomic write operations.
DDIA 243
———————
A lost update occurs when two transactions read the same data and then both attempt to update it. The changes made by the first transaction are overwritten by the second transaction, resulting in a loss of the first update. This anomaly typically happens in a scenario where two transactions are trying to write to the same data item without proper synchronization.
- Transaction 1 (T1) reads a value of 10 from a row (e.g., account balance).
- Transaction 2 (T2) reads the same value of 10 from the same row.
- T1 updates the value to 15 and commits.
- T2 updates the value to 20 and commits.
- The final value is 20, and the update made by T1 is lost.
Lost updates are not inherently prevented by read committed isolation. It is handled by repeatable reads isolation with 2 phase locking.
What is write skew and how to prevent
Database invariant is broken due to changes to different rows.
Solution - hold the locks for all rows that can impact the same database variant.
DDIA 247
What is phantoms writes and how to prevent
Two writes that create new rows which conflict with each other.
A write in one transaction changes the results of a search query in another transaction. There is no object to which we can attach the locks.
Materializing conflicts
DDIA 251
What is serializability in database
When transactions are committed, the result is the same as if they had run serially, even though in reality they may have run concurrently.
Serializability in databases ensures that regardless of how transactions are interleaved in their execution, their effect on the database state is as if they had been executed in some serial order, thus preserving the consistency and correctness of the data. This level of isolation is critical for applications requiring strong consistency guarantees, such as financial services, where transaction integrity is paramount.
DDIA 225
What is actual serial execution and why do we use it
Feasible since 2007.
Runs everything in one core with one thread sequentially. After 2007 it’s feasible because RAM became cheap enough and sometimes we can fit entire dataset in memory.
OLTP transactions are usually short and only make a small number of reads and writes Long running OLAP queries can run on consistent snapshot using snapshot isolation outside of the serial execution loop.
Sometimes single threaded execution runs faster than a system that supports concurrency because it get rid of the locking overhead. However the throughout is limited to a single CPU and therefore not scalable.
VoltDB/H-store
DDIA 253
What is stored procedures and why do we use it? What are the shortcomings with it?
Part of SQL standard since 1999.
To minimize the human and network call overhead.
System with single threaded serial transaction processing don’t allow interactive multi-statement transactions (multiple http requests).
The application must submit the entire transaction code to the database ahead of time, as a stored procedure. Hard to version control and deploy the code.
DDIA 254
What is two phase locking
MySQL. Since 1970s.
Why do we need it? To handle concurrent reads and write to the same piece of data (even in a single node system) so we don’t have dirty reads, dirty writes, Phantom reads or writes.
Read locks are shared but write locks are exclusive. Writer basically blocks all other writers and readers.
Locks are ALWAYS held until transaction commits.
It was the only widely used algorithm for serializability for around 30 yrs, but a big downside is performance.
———————
Two-phase locking (2PL) is a concurrency control method used to ensure serializability, the highest level of isolation in database systems.
Serializability ensures that the execution of transactions is equivalent to some serial order, thereby preventing all types of read and write anomalies, including dirty reads, non-repeatable reads, and phantom reads.
TPL consists of two phases: the growing phase (lock acquisition) and the shrinking phase (lock release).
Phantom reads occur when a transaction retrieves a set of records that satisfy a condition, but during the course of the transaction, another transaction inserts or deletes records that would also satisfy that condition, causing the first transaction to see a different set of records upon re-execution.
Here’s how two-phase locking prevents phantom reads:
- Locks on Read Operation: When a transaction reads a set of records that satisfy a condition, two-phase locking ensures that the transaction acquires shared locks (read locks) on all records that satisfy the condition. These locks prevent other transactions from inserting or deleting records that would affect the set of records being read by the current transaction.
- Locks on Write Operation: When a transaction inserts or deletes records, it acquires exclusive locks (write locks) on those records. These locks prevent other transactions from reading or writing to the affected records until the transaction holding the locks completes and releases them.
By ensuring that transactions acquire appropriate locks (shared or exclusive) on all records they access or modify during the growing phase and release those locks only during the shrinking phase, two-phase locking prevents other transactions from inserting or deleting records that would cause phantom reads.
In essence, two-phase locking guarantees that the set of records a transaction reads remains unchanged throughout its execution, thereby preventing the occurrence of phantom reads.
Yes, in the context of preventing phantom reads, two-phase locking (2PL) does involve the use of predicate locks, though the term “predicate lock” itself is more commonly associated with other concurrency control mechanisms like predicate-based locking.
Here’s how two-phase locking relates to predicate locks in preventing phantom reads:
- Predicate Locks in Two-Phase Locking: In two-phase locking, transactions acquire locks on individual data items (records) either in a shared (read) mode or exclusive (write) mode. When a transaction performs a read operation with a condition (predicate), such as a WHERE clause in SQL, it acquires shared locks on all records that satisfy the condition. These shared locks effectively act as predicate locks because they protect the set of records that satisfy the condition from being modified (inserted or deleted) by other transactions.
- Preventing Phantom Reads: Phantom reads occur when a transaction re-executes a query and finds a different set of records due to insertions or deletions by other transactions. By acquiring shared locks on all records that match a predicate during the read operation, two-phase locking ensures that the set of records remains consistent for the duration of the transaction. This prevents other transactions from inserting or deleting records that would affect the results of the current transaction’s query, thereby preventing phantom reads.
In summary, while two-phase locking is not typically described using the term “predicate locks” in the same way as other concurrency control mechanisms, the concept of acquiring shared locks on records that satisfy a predicate condition effectively serves the purpose of preventing phantom reads by ensuring consistent query results across transaction executions.
———————
DDIA 257
What is predicate locks
Rather than a lock on an object, it is a lock on patterns, on certain query conditions.
Pretty slow to run to evaluate the full query.
If two phase locking include predicate locks, the database prevent all forms of write skew and other race conditions, so its isolation becomes serializable.
DDIA 259
What is index range locks
Simplify the predicate lock by matching a greater set of objects to improve the performance.
DDIA 259
What is database snapshot
A consistent state of a database at some point of time.
What’s serializable snapshot isolation and when should we use it
Introduced in 2008. Used by postgreSQL; Oracle; Microsoft SQL.
The optimistic concurrency control.
Transactions continue anyway in the hope that everything will turn out all right. When it wants to commit, the database checks whether anything bad happened. Only transaction that executed serializably are allowed to commit.
Only when most transaction do not get into the way of each other. Don’t use it if two many conflicting transactions.
Column oriented storage
Definition
Use case
Parquet is a free and open source column oriented storage format that supports a document data model, since 2013.
The idea is pretty simple - don’t store the values from one row together, stores all the values from each column together instead. Query only needs to read and parse those columns that are used in that query, which can save a lot of work and minimize the disk I/O.
We can perform column compaction to reduce the amount of data we store; we can even store and cache those data in memory to speed up query.
Downside - every write needs to go to many different places on disk.
Can write rows into LSM trees in memory and flush them all to disk and do bulk writes basically column oriented files.
Column storage - bitmap compression + run-length encoding
Often, the number of distinct values in a column is small compared to the number of rows (for example, a retailer may have billions of sales transactions, but only 100,000 distinct products). We can now take a column with n distinct values and turn it into n separate bitmaps: one bitmap for each distinct value, with one bit for each row. The bit is 1 if the row has that value, and 0 if not.
If n is very small (for example, a country column may have approximately 200 distinct values), those bitmaps can be stored with one bit per row. But if n is bigger, there will be a lot of zeros in most of the bitmaps (we say that they are sparse). In that case, the bitmaps can additionally be run-length encoded, as shown at the bottom of the figure below. This can make the encoding of a column remarkably compact.
Bitmap indexes such as these are very well suited for the kinds of queries that are common in a data warehouse. For example:
WHERE product_sk IN (30, 68, 69):
Load the three bitmaps for product_sk = 30, product_sk = 68, and product_sk = 69, and calculate the bitwise OR of the three bitmaps, which can be done very efficiently.
Predicate push down
A db optimization technique that improves query performance by filtering data before it’s transferred.
Entire dataset don’t need to be scanned during query execution.
Example
Consider a database query intended to find all transactions over $1,000 in a financial database:
SELECT * FROM transactions WHERE amount > 1000;
With predicate pushdown, the database engine would apply the filter amount > 1000 directly when reading the data from disk, rather than fetching all transactions into memory and then filtering them.
Conclusion
Predicate pushdown is a critical optimization in the management and processing of data across various platforms, from traditional RDBMS to modern big data systems. By ensuring that unnecessary data is filtered out early in the data processing pipeline, systems can achieve higher performance and efficiency.
What are the downsides of column oriented storage?
Sorted.
Writes to many columns.
Use LSM tree to do bulk update.
What is forward and backward compatibility
Forward: old code can read records that were written by new code.
Backward - new code can read records written by old code. If you were to add a field and make it required, that check would fail if new code read data written by old code, because the old code will not have written the new field that you added. Therefore, to maintain backward compatibility, every field you add after the initial deployment of the schema must be optional or have a default value (automatic backfill or manual backfill)
Backward compatibility is normally not hard to achieve: as author of the newer code, you know the format of data written by older code, and so you can explicitly handle it (if necessary by simply keeping the old code to read the old data). Forward compatibility can be trickier, because it requires older code to ignore additions made by a newer version of the code.
DDIA 121
What are Apache Thrift and Protocol buffers
Apache Thrift and Protocol Buffers (protobuf) are binary encoding libraries that are based on the same principle. Protocol Buffers was originally developed at Google, Thrift was originally developed at Facebook, and both were made open source in 2007–08.
Both Thrift and Protocol Buffers require a schema for any data that is encode. To encode the data in Thrift, you would describe the schema in the
Thrift interface definition language (IDL) like this:
struct Person {
1: required string userName,
2: optional i64 favoriteNumber,
3: optional list<string> interests</string>
}
Each field has a type annotation (to indicate whether it is a string, integer, list, etc.)
and, where required, a length indication (length of a string, number of items in a list).
The strings that appear in the data (“Martin”, “daydreaming”, “hacking”) are also encoded as ASCII (or rather, UTF-8).
DDIA 117
What’s the disadvantages of thrift or protocol buffers
Require knowledge of scheme at compile time.
What is Apache avro
Apache Avro is another binary encoding format that is interestingly different from Protocol Buffers and Thrift. It was started in 2009 as a subproject of Hadoop, as a result of Thrift not being a good fit for Hadoop’s use cases.
Create database schema on the fly based off of column names.
Binary data can only be decoded correctly if the code reading the data is using the exact same schema as the code that wrote the data.
To parse the binary data, you go through the fields in the order that they appear in the schema and use the schema to tell you the datatype of each field.
The key idea with Avro is that the writer’s schema and the reader’s schema don’t have to be the same—they only need to be compatible. When data is decoded (read), the Avro library resolves the differences by looking at the writer’s schema and the reader’s schema side by side and translating the data from the writer’s schema into the reader’s schema.
For example, it’s no problem if the writer’s schema and the reader’s schema have their fields in a different order, because the schema resolution matches up the fields by field name. If the code reading the data encounters a field that appears in the writer’s schema but not in the reader’s schema, it is ignored. If the code reading the data expects some field, but the writer’s schema does not contain a field of that name, it is filled in with a default value declared in the reader’s schema.
DDIA 122
Why do we need replication
Replication means keeping a copy of the same data on multiple machines that are connected via a network. As discussed in the introduction to Part II, there are several reasons why you might want to replicate data:
• To keep data geographically close to your users (and thus reduce latency)
• To allow the system to continue working even if some of its parts have failed (and thus increase availability)
• To scale out the number of machines that can serve read queries (and thus increase read throughput)
Page 151
What is synchronous and asynchronous replication
The advantage of synchronous replication is that the follower is guaranteed to have an up-to-date copy of the data that is consistent with the leader. If the leader suddenly fails, we can be sure that the data is still available on the follower.
The disadvantage is that if the synchronous follower doesn’t respond (because it has crashed, or there is a network fault, or for any other reason), the write cannot be processed. The leader must block all writes and wait until the synchronous replica is available again.
For that reason, it is impractical for all followers to be synchronous: any one node outage would cause the whole system to grind to a halt. In practice, if you enable synchronous replication on a database, it usually means that one of the followers is synchronous, and the others are asynchronous.
If the synchronous follower becomes unavailable or slow, one of the asynchronous followers is made synchronous. This guarantees that you have an up-to-date copy of the data on at least two nodes: the leader and one synchronous follower. This configuration is sometimes also called semi-synchronous.
Often, leader-based replication is configured to be completely asynchronous. In this case, if the leader fails and is not recoverable, any writes that have not yet been replicated to followers are lost. This means that a write is not guaranteed to be durable, even if it has been confirmed to the client.
However, a fully asynchronous configuration has the advantage that the leader can continue processing writes, even if all of its followers have fallen behind. Weakening durability may sound like a bad trade-off, but asynchronous replication is nevertheless widely used, especially if there are many followers or if they are geographically distributed.
DDIA 153
How do we implement replication?
Logical (row-based) log replication
An alternative is to use different log formats for replication and for the storage engine, which allows the replication log to be decoupled from the storage engine internals.
This kind of replication log is called a logical log, to distinguish it from the storage engine’s (physical) data representation.
A logical log for a relational database is usually a sequence of records describing writes to database tables at the granularity of a row:
• For an inserted row, the log contains the new values of all columns.
• For a deleted row, the log contains enough information to uniquely identify the row that was deleted. Typically this would be the primary key, but if there is no primary key on the table, the old values of all columns need to be logged.
• For an updated row, the log contains enough information to uniquely identify the updated row, and the new values of all columns (or at least the new values of all columns that changed).
You know it. Find photos.
DDIA 159
What’s the issue with WAL as replication log?
The main disadvantage is that the log describes the data on a very low level: a WAL contains details of which bytes were changed in which disk blocks. This makes replication closely coupled to the storage engine. If the database changes its storage format from one version to another, it is typically not possible to run different versions of the database software on the leader and the followers.
That may seem like a minor implementation detail, but it can have a big operational impact. If the replication protocol allows the follower to use a newer software version than the leader, you can perform a zero-downtime upgrade of the database software by first upgrading the followers and then performing a failover to make one of the upgraded nodes the new leader. If the replication protocol does not allow this version mismatch, as is often the case with WAL shipping, such upgrades require downtime.
How do we implement read-after-write consistency
How can we implement read-after-write consistency in a system with leader-based replication? There are various possible techniques. To mention a few:
• When reading something that the user may have modified, read it from the leader; otherwise, read it from a follower. This requires that you have some way of knowing whether something might have been modified, without actually querying it. For example, user profile information on a social network is normally only editable by the owner of the profile, not by anybody else. Thus, a simple rule is: always read the user’s own profile from the leader, and any other users’ profiles from a follower.
• If most things in the application are potentially editable by the user, that approach won’t be effective, as most things would have to be read from the leader (negating the benefit of read scaling). In that case, other criteria may be used to decide whether to read from the leader. For example, you could track the time of the last update and, for one minute after the last update, make all reads from the leader. You could also monitor the replication lag on followers and prevent queries on any follower that is more than one minute behind the leader.
• The client can remember the timestamp of its most recent write—then the system can ensure that the replica serving any reads for that user reflects updates at least until that timestamp. If a replica is not sufficiently up to date, either the read can be handled by another replica or the query can wait until the replica has caught up. The timestamp could be a logical timestamp (something that indicates ordering of writes, such as the log sequence number) or the actual system clock
Monotonic reads and what’s the edge case it is preventing?
In the context of single leader and asynchronous replication; client may see things move backwards in time.
Monotonic reads is a guarantee that this kind of anomaly does not happen. It’s a lesser guarantee than strong consistency, but a stronger guarantee than eventual consistency. When you read data, you may see an old value; monotonic reads only means that if one user makes several reads in sequence, they will not see time go backward— i.e., they will not read older data after having previously read newer data.
One way of achieving monotonic reads is to make sure that each user always makes their reads from the same replica (different users can read from different replicas). For example, the replica can be chosen based on a hash of the user ID, rather than randomly. However, if that replica fails, the user’s queries will need to be rerouted to another replica.
DDIA 165
Give an example of violation of causality.
DDIA 165
What is consistent prefix reads? How is it implemented?
All casually dependent writes goes to same replica.
Preventing this kind of anomaly requires another type of guarantee: consistent prefix reads. This guarantee says that if a sequence of writes happens in a certain order, then anyone reading those writes will see them appear in the same order. This is a particular problem in partitioned (sharded) databases. If the database always applies writes in the same order, reads always see a consistent prefix, so this anomaly cannot happen. However, in many distributed databases, different partitions operate independently, so there is no global ordering of writes: when a user reads from the database, they may see some parts of the database in an older state and some in a newer state. One solution is to make sure that any writes that are causally related to each other are written to the same partition—but in some applications that cannot be done efficiently.
DDIA 166
What’s the challenges with leader failure?
- How to detect leader is down - timeout settings
- Lost write - leader goes down before new writes get propagated to any of the followers
- Split brain
The topologies for multi leaders sync
DDIA 175
What is the problem of leaderless replication via gossip protocol
- It is expensive to ensure causality using lamport clock or detecting concurrent writes using version vectors and resolve conflicts.
- Infinite loop of broadcasting messages - for a specific write, maintain a set of visited nodes
DDIA 177
Last write wins to resolve write conflicts
DDIA 186
What timestamp we can use?
Quartz crystal vibration.
Clock skew.
NTP network time protocol - GPS clock
Adjust computer clock
What is concurrent writes in multi-leader system
The writes don’t know about each other
How to detect concurrent write in multi-leader system
Version vector - number of writes it has seen from each leader node
What are the other solutions besides LWW to mitigate concurrent writes problem
One option is to store sibling and let user decide later. As merging siblings in application code is complex and error-prone, there are some efforts to design data structures that can perform this merging automatically. For example, Riak’s datatype support uses a family of data structures called CRDTs that can automatically merge siblings in sensible ways, including preserving deletions.
CRDT - conflict free replicated data types, formally introduced in 2011 and now used by Riak, Redis l, AntidoteDB.
What is operational CRDT
When we say that “updates are propagated as operations” in the context of operation-based CRDTs (CmRDTs), we’re referring to the method by which changes to data are communicated across different nodes in a distributed system.
Here’s a detailed breakdown of what this entails: Process of Propagating Updates as Operations
1. Operation Encapsulation: Each change made to the data is encapsulated as a discrete operation. This operation includes all the information necessary to replicate the change on other nodes. For instance, if you’re incrementing a counter, the operation might specify that the counter should be incremented by one. 2. Transmission: Instead of sending the entire state of the data structure (which could be large), only the operation itself is sent across the network to other nodes. This is typically more efficient, especially when the data structure is large but the changes are small. 3. Commutative Operations: Operations are designed to be commutative. This means that the order in which operations are applied does not affect the final outcome. For example, if two nodes simultaneously increment the same counter, the operations “increment by 1” can be applied in any order but still yield the same result.
Example
Consider a collaborative text editing application where multiple users can insert or delete characters in a document:
• User A inserts the letter “a” at position 1. • User B deletes the character at position 2.
Each of these actions (insertion and deletion) is an operation. These operations are sent to all other participating nodes:
• The insertion operation from User A might be encoded as insert('a', 1). • The deletion operation from User B might be encoded as delete(2).
When another node receives these operations, it applies them to its local version of the document. Because the operations are commutative and designed to be independent of specific states, they can be applied in any order without resulting in conflicts or inconsistencies.
Benefits
This approach minimizes the amount of data transferred between nodes, reduces latency in response times, and ensures that even if messages arrive out of order or are delayed, the final state of the data structure remains consistent across all nodes. This makes operation-based CRDTs highly effective for real-time, distributed applications where low latency and high reliability are crucial.
However, whenever we have casual relationships amongst our operations and operational CRDT could let us down.
Stated based CRDT
Updates are managed by sharing the entire state of the data structure rather than individual operations.
Each node independently updates its local state, and periodically, the entire state is merged with states from other nodes using a merge operation that is associative, commutative, and idempotent. This ensures that all nodes eventually converge to the same state, regardless of the order in which states are merged.
A G-Counter is a simple CRDT used to implement a distributed counter that can only increase.
- Local Update: Each node maintains its own counter and increments its local count independently without coordination with other nodes.
- Merging: When nodes communicate, they share their entire counter state. The merge operation for a G-Counter takes the maximum of each node’s count for each node in the system, effectively synchronizing the counts across all nodes to the highest value observed.
- Result: The result is a counter that always increases and never decreases, providing a reliable count even in the presence of network partitions and asynchronous updates.
State-based CRDTs like the G-Counter are particularly useful in distributed systems where it’s important to ensure that all nodes can operate independently and still maintain consistency across the system without complex conflict resolution mechanisms.
State-based CRDTs handle causal relationships by ensuring that the merge operation can combine states from different nodes in a way that reflects the causal order of updates, even though individual nodes might apply updates independently and asynchronously. For operations and data structures where the relationship between updates is not inherently commutative, like a G-Counter, CRDTs must incorporate additional mechanisms to track and reconcile the order of operations. One common approach is to use version vectors or other metadata that capture the causal relationships between updates.
What database use leaderless replication
Cassandra, Riak
What is the context of read repair?
Read repair is a technique used in distributed data storage systems to ensure data consistency across different replicas of the same data. This process is particularly important in systems that employ replication to provide high availability and fault tolerance. Read repair helps in keeping the replicas synchronized by actively correcting discrepancies during read operations.
Read repair operates under eventual consistency models, where data is replicated across multiple nodes to ensure availability and resilience against node failures. Here’s how read repair typically functions:
- Data Retrieval: When a read request is issued to a distributed database, it may retrieve data from one or more replicas, depending on the consistency requirements specified for the read operation.
- Version Comparison: The system compares the versions of the data retrieved from different replicas. This can be done by checking timestamps, version numbers, or other metadata associated with the data to determine which replica has the most recent version.
- Discrepancy Detection: If discrepancies are found (i.e., some replicas have older versions of the data), the system identifies the most up-to-date version of the data.
- Correction: The system then propagates the correct version back to those replicas that had outdated or incorrect versions, thus “repairing” the data on those nodes.
- Update of Replicas: The outdated replicas update their data to reflect the most recent version, ensuring consistency across the system.
Read repair can be triggered in a few different ways, depending on the system’s design and configuration:
- Synchronous Read Repair: This happens as part of the read process. The system checks and repairs any inconsistencies before returning the data to the requester. This ensures strong consistency but can increase the latency of read operations because the system must wait until all necessary repairs are made.
- Asynchronous Read Repair: In this approach, the system logs the found discrepancies during the read operation but does not immediately correct them. Instead, the repairs are done in the background, allowing the read operation to complete with lower latency. This method favors availability and performance over immediate consistency.
Read repair is widely used in various distributed data stores, including:
- Cassandra: Uses read repair as part of its normal operations to ensure that all replicas for a given piece of data remain consistent over time. Cassandra allows configuring the probability of read repair happening on a per-read basis.
- DynamoDB: Amazon’s DynamoDB uses a form of read repair in its background processes to maintain data consistency across its distributed storage.
- Riak: Also implements read repair to handle discrepancies found during read operations, ensuring that data returned to the user is the most current and that all replicas are eventually consistent.
Read repair is a crucial mechanism in distributed databases to maintain data consistency across replicas. By correcting discrepancies during read operations, read repair helps ensure that all replicas converge towards a consistent state, thereby fulfilling the promises of reliability and availability in distributed systems. This technique is particularly valuable in environments where data is continuously updated and accessed across multiple nodes.
What is the context of anti-entropy and how is it implemented
Anti-entropy is a concept used in distributed systems to ensure data consistency across replicas. It involves mechanisms that detect and correct discrepancies in data at different nodes, effectively reducing “entropy” which in this context refers to disorder or inconsistencies in the system. Anti-entropy processes help in synchronizing data across a distributed network by continuously or periodically comparing data sets and resolving differences.
In distributed data storage systems, replicas of the same data may become inconsistent due to network failures, temporary partitioning, or delays in update propagation. Anti-entropy is a proactive approach to handle these potential inconsistencies and ensure that all replicas converge to the same state, even in the face of failures.
-
Merkle Trees:
- Concept: A Merkle tree is a binary tree in which every leaf node is labelled with the hash of a data block, and every non-leaf node is labelled with the cryptographic hash of the labels of its child nodes. Merkle trees are used to efficiently compare data sets across different nodes.
- Usage: When two nodes need to synchronize their data, they can exchange the root hashes of their Merkle trees. If the hashes differ, they can compare smaller segments of their trees (i.e., branches) progressively, until they localize the blocks that differ. This process significantly reduces the amount of data that needs to be transferred for synchronization.
- Example: Cassandra uses Merkle trees during its anti-entropy repair process to identify and synchronize differences between replicas.
-
Read Repair and Write Repair:
- Read Repair: As previously explained, read repair involves detecting and correcting inconsistencies during read operations. This is a form of anti-entropy because it actively seeks to eliminate divergences by updating replicas with the most recent data version when a discrepancy is detected during a read.
- Write Repair: Similar to read repair, but corrections are made during write operations. When new data is written or existing data is updated, the system ensures that all replicas are immediately brought up to date, hence maintaining consistency.
-
Gossip Protocols:
- Concept: Gossip protocols involve nodes periodically sharing state information about themselves and other nodes they know about in a manner akin to how gossip spreads in social networks.
- Usage: Each node in a distributed system periodically exchanges state information with a randomly chosen other node. This state information typically includes data about the node’s view of the system and can include version vectors or other metadata that track data state.
- Purpose: The continuous, random mixing of state information helps in quickly propagating updates across all nodes and correcting any discrepancies, thereby maintaining system consistency over time.
- Data Consistency: Ensures all nodes in a distributed system eventually hold the same data, despite any failures or inconsistencies.
- Fault Tolerance: Increases the resilience of the system to network failures, data corruption, or other anomalies.
- Scalability: Allows the system to scale effectively, as the mechanisms can handle the synchronization of data across a large number of nodes without central coordination.
Anti-entropy mechanisms are essential in distributed systems to manage data consistency proactively. By continuously addressing and correcting data discrepancies, these mechanisms help ensure that the system operates reliably and efficiently, even as it scales and faces various operational challenges.
Quorum writes and reads
You know it
What is the edge case that quorum consistency may fail
If a sloppy quorum is used, the w writes may end up on different nodes than the r reads, so there is no longer a guaranteed overlap between the r nodes and the w nodes.
Still prone to concurrent writes until all nodes reach distributed consensus.
Writes failed at some nodes.
What is strong consistency?
Everyone reading the same time would agree on an identical and most up to date value for a given key
What is sloppy quorum and hinted handoff
In a large cluster (with significantly more than n nodes) it’s likely that the client can connect to some database nodes during the network interruption, just not to the nodes that it needs to assemble a quorum for a particular value.
In that case, database designers face a trade-off:
• Is it better to return errors to all requests for which we cannot reach a quorum of w or r nodes?
• Or should we accept writes anyway, and write them to some nodes that are reachable but aren’t among the n nodes on which the value usually lives?
The latter is known as a sloppy quorum: writes and reads still require w and r successful responses, but those may include nodes that are not among the designated n “home” nodes for a value.
By analogy, if you lock yourself out of your house, you may knock on the neighbor’s door and ask whether you may stay on their couch temporarily. Once the network interruption is fixed, any writes that one node temporarily accepted on behalf of another node are sent to the appropriate “home” nodes. This is called hinted handoff. (Once you find the keys to your house again, your neighbor politely asks you to get off their couch and go home.)
Thus, a sloppy quorum actually isn’t a quorum at all in the traditional sense. It’s only an assurance of durability, namely that the data is stored on w nodes somewhere. There is no guarantee that a read of r nodes will see it until the hinted handoff has completed.
Sloppy quorums are optional in all common Dynamo implementations. In Riak they are enabled by default, and in Cassandra and Voldemort they are disabled by default
Why do we need database partitioning
- Data volume
- Scalability - large dataset can be distributed across many disks and query load can be distributed across many processors.
Replication is having multiple copies of the same data on different nodes. For very large datasets, or very high query throughput, that is not sufficient: we need to break the data up into partitions, also known as sharding.
The main reason for wanting to partition data is scalability. Different partitions can be placed on different nodes in a shared-nothing cluster, and the query load can be distributed across many processors.
Partitioned databases were pioneered in the 1980s by products such as Teradata and Tandem NonStop SQL, and more recently rediscovered by NoSQL databases and Hadoop-based data warehouses.
Some systems are designed for transactional workloads, and others for analytics; this difference affects how the system is tuned, but the fundamentals of partitioning apply to both kinds of workloads.
DDIA 199
Range based partitioning; pros and cons
Database locality for range scan;
Hot spot - last name starting with x or z: a to c partition.
If you know the boundaries between the ranges, you can easily determine which partition contains a given key. If you also know which partition is assigned to which node, then you can make your request directly to the appropriate node (or, in the case of the encyclopedia, pick the correct book off the shelf).
However, the downside of key range partitioning is that certain access patterns can lead to hot spots. If the key is a timestamp, then the partitions correspond to ranges of time—e.g., one partition per day. Unfortunately, because we write data from the sensors to the database as the measurements happen, all the writes end up going to the same partition (the one for today), so that partition can be overloaded with writes while others sit idle.
To avoid this problem in the sensor database, you need to use something other than the timestamp as the first element of the key. For example, you could prefix each timestamp with the sensor name so that the partitioning is first by sensor name and then by time. Assuming you have many sensors active at the same time, the write load will end up more evenly spread across the partitions. Now, when you want to fetch the values of multiple sensors within a time range, you need to perform a separate range query for each sensor name.
DDIA 202
Hash range based partitioning
Each partition assigned a range of hash keys.
Even distribution of keys, less hot spot
No data locality for range queries
Keys that were once adjacent are now scattered across all the partitions, so their sort order is lost. In MongoDB, if you have enabled hash-based sharding mode, any range query has to be sent to all partitions.
Range queries on the primary key are not supported by Riak, Couchbase, or Voldemort.
Cassandra achieves a compromise between the two partitioning strategies. A table in Cassandra can be declared with a compound primary key consisting of several columns.
Only the first part of that key is hashed to determine the partition, but the other columns are used as a concatenated index for sorting the data in Cassandra’s SSTables. A query therefore cannot search for a range of values within the first column of a compound key, but if it specifies a fixed value for the first column, it can perform an efficient range scan over the other columns of the key.
Bottom of DDIA 203
Why is hash range partitioning still not perfectly solving hot spot
If certain keys are extremely active. One celebrity has too many comments for her instagram posts.
Today, most data systems are not able to automatically compensate for such a highly skewed workload, so it’s the responsibility of the application to reduce the skew.
For example, if one key is known to be very hot, a simple technique is to add a random number to the beginning or end of the key. Just a two-digit decimal random number would split the writes to the key evenly across 100 different keys, allowing those keys to be distributed to different partitions.
However, having split the writes across different keys, any reads now have to do additional work, as they have to read the data from all 100 keys and combine it.
This technique also requires additional bookkeeping: it only makes sense to append the random number for the small number of hot keys; for the vast majority of keys with low write throughput this would be unnecessary overhead. Thus, you also need some way of keeping track of which keys are being split.
Perhaps in the future, data systems will be able to automatically detect and compensate for skewed workloads; but for now, you need to think through the trade-offs for your own application.
How to optimize the hash range key partitioning for range query
Having secondary indexes.
A secondary index usually doesn’t identify a record uniquely but rather is a way of searching for occurrences of a particular value: find all actions by user 123, find all articles containing the word hogwash, find all cars whose color is red, and so on.
Local secondary indexes
Second copy of sorted dataset on each partition. Pro: No need to perform any extra writes over the network. Con: Need to fetch from every single partition.
In this indexing approach, each partition is completely separate: each partition maintains its own secondary indexes, covering only the documents in that partition. It doesn’t care what data is stored in other partitions.
Whenever you need to write to the database—to add, remove, or update a document—you only need to deal with the partition that contains the document ID that you are writing. For that reason, a document-partitioned index is also known as a local index (as opposed to a global index, described in the next section).
However, reading from a document-partitioned index requires care: unless you have done something special with the document IDs, there is no reason why all the cars with a particular color or a particular make would be in the same partition. Thus, if you want to search for red cars, you need to send the query to all partitions, and combine all the results you get back. This approach to querying a partitioned database is sometimes known as scatter/ gather, and it can make read queries on secondary indexes quite expensive. Even if you query the partitions in parallel, scatter/gather is prone to tail latency amplification (see “Percentiles in Practice” on page 16).
Nevertheless, it is widely used: MongoDB, Riak, Cassandra, Elasticsearch, SolrCloud, and VoltDB all use document-partitioned secondary indexes.
Most database vendors recommend that you structure your partitioning scheme so that secondary index queries can be served from a single partition, but that is not always possible, especially when you’re using multiple secondary indexes in a single query (such as filtering cars by color and by make at the same time).