This is a follow up post, describing the implementation details
of Hadoop Applier, and steps to configure and install it.
Hadoop Applier integrates MySQL with Hadoop providing the
real-time replication of INSERTs to HDFS, and hence can be
consumed by the data stores working on top of Hadoop. You can
know more about the design rationale and per-requisites in the
previous post.
Design and Implementation:
Hadoop Applier replicates rows inserted into a table in MySQL to
the Hadoop Distributed File System(HDFS). It uses an API provided by libhdfs,
a C library to manipulate files in HDFS.
The library comes pre-compiled with Hadoop distributions. It
connects to the MySQL master (or read a binary log generated by MySQL) and:
- fetches the row insert events occurring on the master
- decodes these events, extracts data inserted into each field of the row
- uses content handlers to get it in the format required and appends it to a text file in HDFS.
Schema equivalence is a simple mapping:
Databases are mapped as separate directories, with tables in them
as sub-directories. Data inserted into each table is written into
text files (named as datafile1.txt) in HDFS. Data can be in comma
separated format; or any other delimiter can be used, that is
configurable by command line arguments.
The diagram explains the mapping between MySQL and HDFS
schema.
The file in which the data is stored is named
datafile1.txt here; you can name is anything you want. The
working directory where this datafile goes is
base_dir/db_name.db/tb_name.
The timestamp at which the event occurs is included as the first
field in each row inserted in the text file.
The implementation follows these steps:
- Connect to the MySQL master using the interfaces to the
binary log
#include “binlog_api.h”
Binary_log binlog(create_transport(mysql_uri.c_str()));
binlog.connect();
- Register content handlers
/*
Table_index is a sub class of Content_handler class in the
Binlog API
*/ Table_index table_event_hdlr;
Applier replay_hndlr(&table_event_hdlr,
&sqltohdfs_obj);
binlog.content_handler_pipeline()->push_back(&table_event_hdlr);
binlog.content_handler_pipeline()->push_back(&replay_hndlr);
- Start an event loop and wait for the events to occur on the
master
while (true) { /* Pull events from the
master. This is the heart beat of the event listener.
*/ Binary_log_event *event;
binlog.wait_for_next_event(&event); }
- Decode the event using the content handler interfaces
class Applier : public mysql::Content_handler
{ public: Applier(Table_index *index, HDFSSchema
*mysqltohdfs_obj) { m_table_index=
index; m_hdfs_schema= mysqltohdfs_obj;
} mysql::Binary_log_event
*process_event(mysql::Row_event *rev) {
int table_id= rev->table_id;
typedef std::map<long int,
mysql::Table_map_event *> Int2event_map;
int2event_map::iterator ti_it=
m_table_index->find(table_id);
- Each row event contains multiple rows and fields.
Iterate one row at a time using Row_iterator.
mysql::Row_event_set rows(rev, ti_it->second);
mysql::Row_event_set::iterator it= rows.begin(); do
{ mysql::Row_of_fields fields= *it; long int
timestamp= rev->header()->timestamp; if
(rev->get_event_type() == mysql::WRITE_ROWS_EVENT)
table_insert(db_name, table_name, fields, timestamp,
m_hdfs_schema); } while (++it != rows.end());
- Get the field data separated by field delimiters and row
delimiters.
Each row contains a vector of Value objects. The
converter allows us to transform the value into another
representation.
mysql::Row_of_fields::const_iterator field_it=
fields.begin();
mysql::Converter converter; std::ostringstream
data; data << timestamp; do {
field_index_counter++; std::vector<long
int>::iterator it; std::string str;
converter.to(str, *field_it);
data
<< sqltohdfs_obj->hdfs_field_delim; data <<
str; } while (++field_it != fields.end()); data
<< sqltohdfs_obj->hdfs_row_delim;
- Connect to the HDFS file system. If not provided,
the connection information (user name, password host and port)
are read from the XML configuration file, hadoop-site.xml.
HdfsFS m_fs= hdfsConnect(host.c_str(), port);
- Create the directory structure in HDFS. Set the
working directory and open the file in append mode.
hdfsSetWorkingDirectory(m_fs,
(stream_dir_path.str()).c_str()); const char* write_path=
"datafile1.txt"; hdfsFile writeFile;
- Append data at the end of the file.
writeFile= hdfsOpenFile(m_fs, write_path, O_WRONLY|O_APPEND, 0,
0, 0); tSize num_written_bytes = hdfsWrite(m_fs,
writeFile, (void*)data, strlen(data));
Install and Configure:
Follow these steps to install and run the Applier:
1. Download a Hadoop release (I am using 1.0.4); configure and
install (for the purpose of the demo, install it in pseudo
distributed mode). Flag 'dfs.support.append'must be set to
true while configuring HDFS(hdfs-site.xml). Since append is not
supported in Hadoop 1.x, set the flag
'dfs.support.broken.append' to true.
2. Set the environment variable $HADOOP_HOME to point to
the Hadoop installation directory.
3. CMake doesn't come with a 'find' module for libhdfs. Ensure
that the 'FindHDFS.cmake' is in the CMAKE_MODULE_PATH. You
can download a copy here.
4. Edit the file 'FindHDFS.cmake', if necessary, to have
HDFS_LIB_PATHS set as a path to libhdfs.so, and HDFS_INCLUDE_DIRS
have the path pointing to the location of hdfs.h. For 1.x
versions, library path is $ENV{HADOOP_HOME}/c++/Linux-i386-32/lib
, and header files are contained in
$ENV{HADOOP_HOME}/src/c++/libhdfs. For 2.x releases, header files
and libraries can be found in $ENV{HADOOP_HOME}/lib/native, and
$ENV{HADOOP_HOME}/include respectively.
For versions 1.x, this patch will fix the paths:
--- a/cmake_modules/FindHDFS.cmake
+++ b/cmake_modules/FindHDFS.cmake
@@ -11,6 +11,7 @@ exec_program(hadoop ARGS version OUTPUT_VARIABLE
Hadoop_VERSION
# currently only looking in HADOOP_HOME
find_path(HDFS_INCLUDE_DIR hdfs.h PATHS
$ENV{HADOOP_HOME}/include/
+ $ENV{HADOOP_HOME}/src/c++/libhdfs/
# make sure we don't accidentally pick up a different version
NO_DEFAULT_PATH
)
@@ -26,9 +27,9 @@ endif()
message(STATUS "Architecture: ${arch_hint}")
if ("${arch_hint}" STREQUAL "x64")
- set(HDFS_LIB_PATHS $ENV{HADOOP_HOME}/lib/native)
+ set(HDFS_LIB_PATHS $ENV{HADOOP_HOME}/c++/Linux-amd64-64/lib)
else ()
- set(HDFS_LIB_PATHS $ENV{HADOOP_HOME}/lib/native)
+ set(HDFS_LIB_PATHS $ENV{HADOOP_HOME}/c++/Linux-i386-32/lib)
endif ()
message(STATUS "HDFS_LIB_PATHS: ${HDFS_LIB_PATHS}")
5.Since libhdfs is JNI based API, it requires JNI header files
and libraries to build. If there exists a module FindJNI.cmake in
the CMAKE_MODULE_PATH and JAVA_HOME is set; the headers will be
included, and the libraries would be linked to. If not, it will
be required to include the headers and load the libraries
separately (modify LD_LIBRARY_PATH).
6. Build and install the library 'libreplication', to be used by
Hadoop Applier,using CMake.
- Download a copy of Hadoop Applier from http://labs.mysql.com.
- 'mysqlclient' library is required to be installed in the default library paths. You can either download and install it (you can get a copy here), or set the environment variable $MYSQL_DIR to point to the parent directory of MySQL source code. Make sure to run cmake on MySQL source directory. $export MYSQL_DIR=/usr/local/mysql-5.6
- Run the 'cmake' command on the parent directory of the Hadoop Applier source. This will generate the necessary Makefiles. Make sure to set cmake option ENABLE_DOWNLOADS=1; which will install Google Test required to run the unit tests. $cmake . -DENABLE_DOWNLOADS=1
- Run 'make' and 'make install' to build and install. This will install the library 'libreplication' which is to be used by Hadoop Applier.
7. Make sure to set the CLASSPATH to all the hadoop jars needed
to run Hadoop itself.
$export
PATH=$HADOOP_HOME/bin:$PATH
$export
CLASSPATH=$(hadoop classpath)
8. The code for Hadoop Applier can be found in
/examples/mysql2hdfs, in the Hadoop Applier repository. To
compile, you can simply load the libraries (modify
LD_LIBRARY_PATH if required), and run the command “make happlier”
on your terminal. This will create an executable file in the
mysql2hdfs directory.
.. and then you are done!
Now run hadoop dfs (namenode and datanode), start a MySQL server
as master with row based replication (you can use mtr rpl suite
for testing purposes : $MySQL-5.6/mysql-test$./mtr --start
--suite=rpl --mysqld=--binlog_format='ROW'
--mysqld=--binlog_checksum=NONE), start hive (optional) and run
the executable ./happlier, optionally providing MySQL and HDFS
uri's and other available command line options. (./happlier –help
for more info).
There are useful filters as command line options to the Hadoop
applier.
Options | Use |
-r, --field-delimiter=DELIM Use DELIM instead of ctrl-A for field delimiter. DELIM can be a string or an ASCII value in the format '\nnn' . Escape sequences are not allowed. |
Provide the string by which the fields in a row will be separated. By default, it is set to ctrl-A |
-w, --row-delimiter=DELIM Use DELIM instead of LINE FEED for row delimiter . DELIM can be a string or an ASCII value in the format '\nnn' Escape sequences are not allowed. |
Provide the string by which the rows of a table will be separated. By default, it is set to LINE FEED (\n) |
-d, --databases=DB_LIST DB-LIST is made up of one database name, or many names separated by commas. Each database name can be optionally followed by table names. The table names must follow the database name, separated by HYPHENS Example: -d=db_name1-table1-table2,dbname2-table1,dbname3 |
Import entries for some databases, optionally include only specified tables. |
-f, --fields=LIST Similar to cut command, LIST is made up of one range, or many ranges separated by commas.Each range is one of: N N'th byte, character or field, counted from 1 N- from N'th byte, character or field, to end of line N-M from N'th to M'th (included) byte, character or field -M from first to M'th (included) byte, character or field |
Import entries for some fields only in a table |
-h, --help | Display help |
Integration with HIVE: Hiveruns on top of Hadoop. It is sufficient
to install Hive only on the Hadoop master node. Take note of
the default data warehouse directory, set as a property in
hive-default.xml.template configuration file. This must be the
same as the base directory into which Hadoop Applier
writes.
Since the Applier does not import DDL statements; you have to
create similar schema's on both MySQL and Hive, i.e. set up a
similar database in Hive using Hive QL(Hive Query Language).
Since timestamps are inserted as first field in HDFS files,you
must take this into account while creating tables in Hive.
SQL Query | Hive Query |
CREATE TABLE t (i INT); | CREATE TABLE t ( time_stamp INT, i INT) [ROW FORMAT DELIMITED] STORED AS TEXTFILE; |
Now, when any row is inserted into table on MySQL databases, a
corresponding entry is made in the Hive tables. Watch the demo to
get a better understanding.
The demo is non audio, and is meant to be followed in conjunction
with the blog.You can also create an external table in hive and
load data into the tables; its your choice!
Watch the
Hadoop Applier Demo >>
Limitations of the Applier: In the first version we
support WRITE_ROW_EVENTS, i.e. only insert statements are
replicated. We have considered adding support for deletes,
updates and DDL's as well, but they are more complicated to
handle and we are not sure how much interest there is in this. We
would very much appreciate your feedback on requirements - please
use the comments section of this blog to let us know!
The Hadoop Applier is compatible with MySQL 5.6, however it does
not import events if binlog checksums are enabled. Make sure to
set them to NONE on the master, and the server in row based
replication mode.
This innovation includes dedicated contribution from Neha Kumari,
Mats Kindahl and Narayanan Venkateswaran. Without them, this
project would not be a success.
Give it a try! You can download a copy from http://labs.mysql.com
and get started NOW!