PARALLEL SLAVE in MySQL REPLICATION

Overview
MySQL replication does not always scale up well. A common reason for
that is the single-threaded nature of the slave server applier.
Indeed the performance race between the master and the slave
has been unfair. Contrary to the master, which executes transactions
concurrently by multiple threads, the standard MySQL slave is limited
to install changes sequentially by the only thread.
However, if logical partitioning of data per database
takes place, the 5.6 server's Multi-Threaded Slave (MTS) framework may
be found helpful. It allows to install changes done to different
databases in parallel.
For instance the simplest use case would be when the master server has
just two databases and transactions against the master server update
only one of them at a time. Those transactions when replicated will
be executed by two separate slave worker threads (Worker) simultaneously.
The former single applier, so called SQL thread, plays a role of their coordinator. It feeds Workers with events and update the total progress information that is querieable as usual through Show-Slave-Status and
now also via SELECTing from `mysql`.`slave_relay_log_info` table
of yet another new replication info repositories (or recovery tables)
feature.




MTS is configurable with just few options:

  --slave-parallel-workers=#
                      Number of worker threads for executing events in parallel
  --slave-checkpoint-period=#
                      Gather worker activities to Update progress status of
                      Multi-threaded slave and flush the relay log info to disk
                      after every #th milli-seconds;
  --slave-checkpoint-group=#
                      Maximum number of processed transactions by
                      Multi-threaded slave before a checkpoint operation is
                      called to update progress status;
                      Multi-threaded slave and flush the relay log info to disk
  --slave-pending-jobs-size-max=#
                      Max size of Slave Worker queues holding yet not applied
                      events.The least possible value must be not less than the
                      master side max_allowed_packet;

each corresponding to a @@global namesake variable so online
reconfiguration is also possible.


The first option simply specifies how many threads will be forked out
with START-SLAVE. A good number should correspond to the number of
concurrent transaction profiles. Like if there are four databases
A,B,C,D and two types of transactions updating A,B and C,D then the
optimal number of Workers is two. So generally one worker maps to one
or more databases. Nevertheless keep in mind there could be a limit
to the number of workers dictated by combination of your hardware
resources and the actual load.
The second and third options checkpoint period and group count provide
a control for how often to update `mysql`.`slave_relay_log_info` by
Coordinator thread.
The fourth simply constraints RAM usage to not consume
memory with replication events over some threshold.

MTS is equipped with a new `mysql`.`slave_worker_info` table. It's a
storage to keep up individual Worker progress. Although it was
design-minded for the slave service recovery, the table is querieable
to provide some performance figures.

MTS as a replication feature is rather orthogonal to all the
pre-existing ones. In particular it supports different storage engines
and binlog formats. It's neutral to the newcomers like replication
checksum
, GTIDs. And it's friendly to say the least to the
replication info repositories. For instance
--relay-log-info-repository = FILE turns the Worker recovery tables
into files along with the relay log info table itself. The tables will
remain, empty.  In case of MTS is active - that is the number of
Workers is set to be greater than zero - the recommended repository
type is TABLE because recovery is only guaranteed (and actually
feasible) in this case.

While executing events in parallel MTS may find situations when the
parallel applying is impossible. Such cases include apparent overlap
in databases that two transactions updated on the master, a
transaction is found to have updated over maximum (16) databases, or
there is a threat of hidden conflicts that Foreign Keys references
currently represent.  If any of these happened MTS temporarily
switches to the sequential applying.  There is no START-SLAVE-UNTIL
support yet, attempt to run this command will end up with a warning
and having the single-thread slave running. Events from pre-5.6 master
will force the sequential applying as well.

If an error in applying by a Worker happens, MTS stores the error
message into slave's diagnostic area and stops all threads. In the
following course if the cause of the error is eliminated, the slave service
can be resumed in either single- or multi- threaded mode thanks to
recovery routine that detects possible gaps in execution history and
fix them. There is a new special mode the slave-start

  START SLAVE MTS_UNTIL_AFTER_GAPS

that stops the slave service right after the last gap is fixed.



Run-time high-level architecture descriptionThe database partitioning model suggests to equipping the single SQL
thread system by a worker thread pool. The current sole slave applier
thread changes its current multi-purpose role to serve as Coordinator.
Notice, the standard sequential execution of an replication event (e.g
a query of a transaction) breaks into five basic activities as the
following chain represents


  Read Event -> Construct Event -> 
  Skip-Until-Delay-options handle ->

  Execute Event -> Commit to Relay-Log-Info

Agglomeration of Read, Construct and Skip-Until-Delay-options to be
executed within one thread is reasonable considering the three tasks
are rather light and some host of codes are reused. The remained
Execute and Commit tasks are delegated to Worker threads.
Interaction between Coordinator and Workers follow to the
producer-consumer model. Here is pseudo-code for Coordinator
describing its life-time activities.
Coordinator the former SQL thread remains to operate as in the single
thread env to stay in read-execute loop but instead of executing an
event through apply_event_and_update_pos() it distributes tasks to
workers

 while(!killed)
  exec_relay_log_event();
where

  exec_relay_log_event()
  {

    if (workers_state() is W_ERROR)
      return error;

    ev= read();               // the same as in the base code
    work_id= hash(ev);        // get the worker-id, might be self
    if (word_id == self)
       apply_event_and_update_pos();   // sequential mode
    engage_worker(ev, w_thd[work_id]); // the new parallel mode
  }
}

The worker stays in wait-execute loop



while (!killed)
{
  ev= get_event();     // wait for an event to show up
  exec_res= ev->apply_event(rli);
  ev->update_pos();    // commit to relay-log-info
  report(exec_res);    // status report to Coordinator
}

The worker pool is activated via START SLAVE.
It's size is equal to @@global.slave_parallel_workers value.

While distributing tasks Coordinator maintains association between
databases of the currently scheduled transactions and Worker threads.
Databases map to Workers as n to m. If a transaction has a common
database with another one which fact gets noticed by Coordinator at
calling hash(ev), Coordinator postpones assigning to wait the earlier
transaction has committed and released the common database partition.
That say in the conflict case execution is constrained to the
sequential.

Recovery high-level architecture description Keep in mind that multi-threaded slave recovery inherits limitations
of the standard sequential slave in that there is no support neither
for DDL nor DML updating non-transactional databases.
During run-time regular operation Coordinator and Workers maintain a
checkpoint bitmap associated with an interval in the execution history as
it is recorded in the binary log. In plain words, the bitmap describes which
transaction are committed and which are not. The bitmap and
coordinates of the committed event that the bitmap follows up are made
as fields of a special worker recovery table. When a Worker commits it
sets a corresponding bit to one. Whenever the bitmask runs out of bits
or the checkpoint interval elapses Coordinator resets the bitmask and
sets the new origin to coordinates of the last committed.
In case of a crash, Coordinator first investigates if the recovery
tables are in sane state. After that it reads transactions starting
from the origin and skips those that are marked as done to execute the
rest. The bitmask is updated following the earlier described logics
which allows crashes and more rounds of recovery to converge eventually
to the fully recovered slave.
Monitoring Monitoring consists SELECT:s on the replication recovery tables
and will be extended in the near future
to display fine details of performed actions by Coordinator and Workers
as separate P_S tables.

So the available Worker status can be gained through

select * `mysql.slave_worker_info`



where

mysql> show create table `mysql.slave_worker_info`\G
*************************** 1. row ***************************
Table: slave_worker_info
Create Table: CREATE TABLE `slave_worker_info` (
  `Master_id` int(10) unsigned NOT NULL,
  `Worker_id` int(10) unsigned NOT NULL,
  `Relay_log_name` text CHARACTER SET utf8 COLLATE utf8_bin NOT NULL,
  `Relay_log_pos` bigint(20) unsigned NOT NULL,
  `Master_log_name` text CHARACTER SET utf8 COLLATE utf8_bin NOT NULL,
  `Master_log_pos` bigint(20) unsigned NOT NULL,
  `Checkpoint_relay_log_name` text CHARACTER SET utf8 COLLATE utf8_bin NOT NULL,
  `Checkpoint_relay_log_pos` bigint(20) unsigned NOT NULL,
  `Checkpoint_master_log_name` text CHARACTER SET utf8 COLLATE utf8_bin NOT NULL,
  `Checkpoint_master_log_pos` bigint(20) unsigned NOT NULL,
  `Checkpoint_seqno` int(10) unsigned NOT NULL,
  `Checkpoint_group_size` int(10) unsigned NOT NULL,
  `Checkpoint_group_bitmap` blob NOT NULL,
  PRIMARY KEY (`Master_id`,`Worker_id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='Worker Information'

The Coordinator's status is present in Show Slave Status and through

select * from `mysql.slave_relay_log_info`;


where

mysql> show create table `mysql.slave_relay_log_info`\G
*************************** 1. row ***************************
Table: slave_relay_log_info
Create Table: CREATE TABLE `slave_relay_log_info` (
  `Master_id` int(10) unsigned NOT NULL,
  `Number_of_lines` int(10) unsigned NOT NULL,
  `Relay_log_name` text CHARACTER SET utf8 COLLATE utf8_bin NOT NULL,
  `Relay_log_pos` bigint(20) unsigned NOT NULL,
  `Master_log_name` text CHARACTER SET utf8 COLLATE utf8_bin NOT NULL,
  `Master_log_pos` bigint(20) unsigned NOT NULL,
  `Sql_delay` int(11) NOT NULL,
  `Number_of_workers` int(10) unsigned NOT NULL,
  PRIMARY KEY (`Master_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='Relay Log Information'

Benchmarking results With well-partitioned data performance can go up linearly until
some limits like depicted on the following diagram.

 
Conclusion Besides to provide something valuable to users MTS has opened up
a development framework. Partitioning criteria may vary but
all mechanisms of the control and monitoring will be largely reused.
A good deal of work was done to identify tasks and roles and
improve internal interfaces.