Eventual Consistency in MySQL Cluster - using epochs




Before getting to the details of how eventual consistency is implemented, we need to look at epochs. Ndb Cluster maintains an internal distributed logical clock known as the epoch, represented as a 64 bit number. This epoch serves a number of internal functions, and is atomically advanced across all data nodes.

Epochs and consistent distributed state

Ndb is a parallel database, with multiple internal transaction coordinator components starting, executing and committing transactions against rows stored in different data nodes. Concurrent transactions only interact where they attempt to lock the same row. This design minimises unnecessary system-wide synchronisation, enabling linear scalability of reads and writes.

The stream of changes made to rows stored at a data node are written to a local Redo log for node and system recovery. The change stream is also published to NdbApi event listeners, including MySQLD servers recording Binlogs. Each node's change stream contains the row changes it was involved in, as committed by multiple transactions, and coordinated by multiple independent transaction coordinators, interleaved in a partial order.

Incoming independent transactions
affecting multiple rows

T3 T4 T7
T1 T2 T5

| | |
V V V

-------- -------- --------
| 1 | | 2 | | 3 |
| TC | | TC | | TC | Data nodes with multiple
| |--| |--| | transaction coordinators
|------| |------| |------| acting on data stored in
| | | | | | different nodes
| DATA | | DATA | | DATA |
-------- -------- --------

| | |
V V V

t4 t4 t3
t1 t7 t2
t2 t1 t7
t5

Outgoing row change event
streams by causing
transaction



These row event streams are generated independently by each data node in a cluster, but to be useful they need to be correlated together. For system recovery from a crash, the data nodes need to recover to a cluster-wide consistent state. A state which contains only whole transactions, and a state which, logically at least, existed at some point in time. This correlation could be done by an analysis of the transaction ids and row dependencies of each recorded row change to determine a valid order for the merged event streams, but this would add significant overhead. Instead, the Cluster uses a distributed logical clock known as the epoch to group large sets of committed transactions together.

Each epoch contains zero or more committed transactions. Each committed transaction is in only one epoch. The epoch clock advances periodically, every 100 milliseconds by default. When it is time for a new epoch to start, a distributed protocol known as the Global Commit Protocol (GCP) results in all of the transaction coordinators in the Cluster agreeing on a point of time in the flow of committing transactions at which to change epoch. This epoch boundary, between the commit of the last transaction in epoch n, and the commit of the first transaction in epoch n+1, is a cluster-wide consistent point in time. Obtaining this consistent point in time requires cluster-wide synchronisation, between all transaction coordinators, but it need only happen periodically.

Furthermore, each node ensures that the all events for epoch n are published before any events for epoch n+1 are published. Effectively the event streams are sorted by epoch number, and the first time a new epoch is encountered signifies a precise epoch boundary.

Incoming independent transactions

T3 T4 T7
T1 T2 T5

| | |
V V V

-------- -------- --------
| 1 | | 2 | | 3 |
| TC | | TC | | TC | Data nodes with multiple
| |--| |--| | transaction coordinators
|------| |------| |------| acting on data stored in
| | | | | | different nodes
| DATA | | DATA | | DATA |
-------- -------- --------

| | |
V V V

t4(22) t4(22) t3(22) Epoch 22
...... ...... ......
t1(23) t7(23) t2(23) Epoch 23
t2(23) t1(23) t7(23)
......
t5(24) Epoch 24

Outgoing row change event
streams by causing transaction
with epoch numbers in ()



When these independent streams are merge-sorted by epoch number we get a unified change stream. Multiple possible orderings can result.
One Partial ordering is shown here :

Events      Transactions
contained in epoch

t4(22)
t4(22) {T4,T3}
t3(22)

......

t1(23)
t2(23)
t7(23)
t1(23) {T1, T2, T7}
t2(23)
t7(23)

......

t5(24) {T5}



Note that we can state from this that T4 -> T1 (Happened before), and T1 -> T5. However we cannot say whether T4 -> T3 or T3 -> T4. In epoch 23 we see that the row events resulting from T1, T2 and T7 are interleaved.

Epoch boundaries act as markers in the flow of row events generated by each node, which are then used as consistent points to recover to. Epoch boundaries also allow a single system wide unified transaction log to be generated from each node's row change stream, by merge-sorting the per-node row change streams by epoch number. Note that the order of events within an epoch is still not tightly constrained. As concurrent transactions can only interact via row locks, the order of events on a single row (Table and Primary key value) signifies transaction commit order, but there is by definition no order between transactions affecting independent row sets.

To record a Binlog of Ndb row changes, MySQLD listens to the row change streams arriving from each data node, and merge-sorts them them by epoch into a single, epoch-ordered stream. When all events for a given epoch have been received, MySQLD records a single Binlog transaction containing all row events for that epoch. This Binlog transaction is referred to as an 'Epoch transaction' as it describes all row changes that occurred in an epoch.

Epoch transactions in the Binlog

Epoch transactions in the Binlog have some interesting properties :

  • Efficiency : They can be considered a kind of Binlog group commit, where multiple user transactions are recorded in one Binlog (epoch) transaction. As an epoch normally contains 100 milliseconds of row changes from a cluster, this is a significant amortisation.
  • Consistency : Each epoch transaction contains the row operations which occurred when moving the cluster from epoch boundary consistent state A to epoch boundary consistent state B
    Therefore, when applied as a transaction by a slave, the slave will atomically move from consistent state A to consistent state B
  • Inter-epoch ordering : Any row event recorded in epoch n+1 logically happened after every row event in epoch n
  • Intra-epoch disorder : Any two row events recorded in epoch n, affecting different rows, may have happened in any order.
  • Intra-epoch key-order : Any two row events recorded in epoch n, affecting the same row, happened in the order they are recorded.


The ordering properties show that epochs give only a partial order, enough to subdivide the row change streams into self-consistent chunks. Within an epoch, row changes may be interleaved in any way, except that multiple changes to the same row will be recorded in the order they were committed.

Each epoch transaction contains the row changes for a particular epoch, and that information is recorded in the epoch transaction itself, as an extra WRITE_ROW event on a system table called mysql.ndb_apply_status. This WRITE_ROW event contains the binlogging MySQLD's server id and the epoch number. This event is added so that it will be atomically applied by the Slave along with the rest of the row changes in the epoch transaction, giving an atomically reliable indicator of the replication 'position' of the Slave relative to the Master Cluster in terms of epoch number. As the epoch number is abstracted from the details of a particular Master MySQLD's binlog files and offsets, it can be used to failover to an alternative Master.

We can visualise a MySQL Cluster Binlog as looking something like this. Each Binlog transaction contains one 'artificially generated' WRITE_ROW event at the start, and then RBR row events for all row changes that occurred in that epoch.

BEGIN
WRITE_ROW mysql.ndb_apply_status server_id=4, epoch=6998
WRITE_ROW ...
UPDATE_ROW ...
DELETE_ROW ...
...
COMMIT # Consistent state of the database

BEGIN
WRITE_ROW mysql.ndb_apply_status server_id=4, epoch=6999
...
COMMIT # Consistent state of the database

BEGIN
WRITE_ROW mysql.ndb_apply_status server_id=4, epoch=7000
...
COMMIT # Consistent state of the database
...


A series of epoch transactions, each with a special WRITE_ROW event for recording the epoch on the Slave. You can see this structure using the mysqlbinlog tool with the --verbose option.

Rows tagged with last-commit epoch

Each row in a MySQL Cluster stores a hidden metadata column which contains the epoch at which a write to the row was last committed. This information is used internally by the Cluster during node recovery and other operations. The ndb_select_all tool can be used to see the epoch numbers for rows in a table by supplying the --gci or --gci64 options. Note that the per-row epoch is not a row version, as two updates to a row in reasonably quick succession will have the same commit epoch.

Epochs and eventual consistency

Reviewing epochs from the point of view of my previous posts on eventual consistency we see that :

  • Epochs provide an incrementing logical clock
  • Epochs are recorded in the Binlog, and therefore shipped to Slaves
  • Epoch boundaries imply happened-before relationships between events before and after them in the Binlog


The properties mean that epochs are almost perfect for monitoring conflict windows in an active-active circular replication setup, with only a few enhancements required.

I'll describe these enhancements in the next post.

Edit 23/12/11 : Added index