Before getting to the details of how eventual consistency is
implemented, we need to look at epochs. Ndb Cluster maintains an
internal distributed logical clock known as the epoch,
represented as a 64 bit number. This epoch serves a number of
internal functions, and is atomically advanced across all data
nodes.
Epochs and consistent distributed state
Ndb is a parallel database, with multiple internal transaction
coordinator components starting, executing and committing
transactions against rows stored in different data nodes.
Concurrent transactions only interact where they attempt to lock
the same row. This design minimises unnecessary system-wide
synchronisation, enabling linear scalability of reads and
writes.
The stream of changes made to rows stored at a data node are
written to a local Redo log for node and system recovery. The
change stream is also published to NdbApi event listeners,
including MySQLD servers recording Binlogs. Each node's change
stream contains the row changes it was involved in, as committed
by multiple transactions, and coordinated by multiple independent
transaction coordinators, interleaved in a partial order.
Incoming independent transactions
affecting multiple rows
T3 T4 T7
T1 T2 T5
| | |
V V V
-------- -------- --------
| 1 | | 2 | | 3 |
| TC | | TC | | TC | Data nodes with multiple
| |--| |--| | transaction coordinators
|------| |------| |------| acting on data stored in
| | | | | | different nodes
| DATA | | DATA | | DATA |
-------- -------- --------
| | |
V V V
t4 t4 t3
t1 t7 t2
t2 t1 t7
t5
Outgoing row change event
streams by causing
transaction
These row event streams are generated independently by each data
node in a cluster, but to be useful they need to be correlated
together. For system recovery from a crash, the data nodes need
to recover to a cluster-wide consistent state. A state which
contains only whole transactions, and a state which, logically at
least, existed at some point in time. This correlation could be
done by an analysis of the transaction ids and row dependencies
of each recorded row change to determine a valid order for the
merged event streams, but this would add significant overhead.
Instead, the Cluster uses a distributed logical clock known as
the epoch to group large sets of committed transactions
together.
Each epoch contains zero or more committed transactions. Each
committed transaction is in only one epoch. The epoch clock
advances periodically, every 100 milliseconds by default. When it is time for a new epoch to
start, a distributed protocol known as the Global Commit Protocol
(GCP) results in all of the transaction coordinators in the
Cluster agreeing on a point of time in the flow of committing
transactions at which to change epoch. This epoch boundary,
between the commit of the last transaction in epoch n, and the
commit of the first transaction in epoch n+1, is a cluster-wide
consistent point in time. Obtaining this consistent point in time
requires cluster-wide synchronisation, between all transaction
coordinators, but it need only happen periodically.
Furthermore, each node ensures that the all events for epoch n
are published before any events for epoch n+1 are published.
Effectively the event streams are sorted by epoch number, and the
first time a new epoch is encountered signifies a precise epoch
boundary.
Incoming independent transactions
T3 T4 T7
T1 T2 T5
| | |
V V V
-------- -------- --------
| 1 | | 2 | | 3 |
| TC | | TC | | TC | Data nodes with multiple
| |--| |--| | transaction coordinators
|------| |------| |------| acting on data stored in
| | | | | | different nodes
| DATA | | DATA | | DATA |
-------- -------- --------
| | |
V V V
t4(22) t4(22) t3(22) Epoch 22
...... ...... ......
t1(23) t7(23) t2(23) Epoch 23
t2(23) t1(23) t7(23)
......
t5(24) Epoch 24
Outgoing row change event
streams by causing transaction
with epoch numbers in ()
When these independent streams are merge-sorted by epoch number
we get a unified change stream. Multiple possible orderings can
result.
One Partial ordering is shown here :
Events Transactions
contained in epoch
t4(22)
t4(22) {T4,T3}
t3(22)
......
t1(23)
t2(23)
t7(23)
t1(23) {T1, T2, T7}
t2(23)
t7(23)
......
t5(24) {T5}
Note that we can state from this that T4 -> T1 (Happened
before), and T1 -> T5. However we cannot say whether T4 ->
T3 or T3 -> T4. In epoch 23 we see that the row events
resulting from T1, T2 and T7 are interleaved.
Epoch boundaries act as markers in the flow of row events
generated by each node, which are then used as consistent points
to recover to. Epoch boundaries also allow a single system wide
unified transaction log to be generated from each node's row
change stream, by merge-sorting the per-node row change streams
by epoch number. Note that the order of events within an epoch is
still not tightly constrained. As concurrent transactions can
only interact via row locks, the order of events on a single row
(Table and Primary key value) signifies transaction commit order,
but there is by definition no order between transactions
affecting independent row sets.
To record a Binlog of Ndb row changes, MySQLD listens to the row
change streams arriving from each data node, and merge-sorts them
them by epoch into a single, epoch-ordered stream. When all
events for a given epoch have been received, MySQLD records a
single Binlog transaction containing all row events for that
epoch. This Binlog transaction is referred to as an 'Epoch
transaction' as it describes all row changes that occurred in an
epoch.
Epoch transactions in the Binlog
Epoch transactions in the Binlog have some interesting properties
:
- Efficiency : They can be considered a kind of Binlog group
commit, where multiple user transactions are recorded in one
Binlog (epoch) transaction. As an epoch normally contains 100
milliseconds of row changes from a cluster, this is a significant
amortisation.
- Consistency : Each epoch transaction contains the row
operations which occurred when moving the cluster from epoch
boundary consistent state A to epoch boundary consistent state
B
Therefore, when applied as a transaction by a slave, the slave will atomically move from consistent state A to consistent state B - Inter-epoch ordering : Any row event recorded in epoch n+1 logically happened after every row event in epoch n
- Intra-epoch disorder : Any two row events recorded in epoch n, affecting different rows, may have happened in any order.
- Intra-epoch key-order : Any two row events recorded in epoch n, affecting the same row, happened in the order they are recorded.
The ordering properties show that epochs give only a partial
order, enough to subdivide the row change streams into
self-consistent chunks. Within an epoch, row changes may be
interleaved in any way, except that multiple changes to the same
row will be recorded in the order they were committed.
Each epoch transaction contains the row changes for a particular
epoch, and that information is recorded in the epoch transaction
itself, as an extra WRITE_ROW event on a system table called
mysql.ndb_apply_status. This WRITE_ROW event
contains the binlogging MySQLD's server id and the epoch number.
This event is added so that it will be atomically applied by the
Slave along with the rest of the row changes in the epoch
transaction, giving an atomically reliable indicator of the
replication 'position' of the Slave relative to the Master
Cluster in terms of epoch number. As the epoch number is
abstracted from the details of a particular Master MySQLD's
binlog files and offsets, it can be used to failover to an alternative Master.
We can visualise a MySQL Cluster Binlog as looking something like
this. Each Binlog transaction contains one 'artificially
generated' WRITE_ROW event at the start, and then RBR row events
for all row changes that occurred in that epoch.
BEGIN
WRITE_ROW mysql.ndb_apply_status server_id=4, epoch=6998
WRITE_ROW ...
UPDATE_ROW ...
DELETE_ROW ...
...
COMMIT # Consistent state of the database
BEGIN
WRITE_ROW mysql.ndb_apply_status server_id=4, epoch=6999
...
COMMIT # Consistent state of the database
BEGIN
WRITE_ROW mysql.ndb_apply_status server_id=4, epoch=7000
...
COMMIT # Consistent state of the database
...
A series of epoch transactions, each with a special WRITE_ROW
event for recording the epoch on the Slave. You can see this
structure using the mysqlbinlog tool with the --verbose option.
Rows tagged with last-commit epoch
Each row in a MySQL Cluster stores a hidden metadata column which
contains the epoch at which a write to the row was last
committed. This information is used internally by the Cluster
during node recovery and other operations. The ndb_select_all tool can be used to see the
epoch numbers for rows in a table by supplying the --gci or
--gci64 options. Note that the per-row epoch is not a row
version, as two updates to a row in reasonably quick succession
will have the same commit epoch.
Epochs and eventual consistency
Reviewing epochs from the point of view of my previous posts on
eventual consistency we see that :
- Epochs provide an incrementing logical clock
- Epochs are recorded in the Binlog, and therefore shipped to Slaves
- Epoch boundaries imply happened-before relationships between events before and after them in the Binlog
The properties mean that epochs are almost perfect for monitoring
conflict windows in an active-active circular replication setup,
with only a few enhancements required.
I'll describe these enhancements in the next post.
Edit 23/12/11 : Added index