Introducing Maxwell, a mysql-to-kafka binlog processor

Hi, I'm Ben Osheroff, an engineer on the infrastructure team at Zendesk. My team began this year with a single goal: to better scale Zendesk's view system.

If you haven't tried Zendesk yet, views are lists of tickets with user-specified constraints, generally consumed by support agents as a workflow tool. As implemented, views are a SQL generation and execution engine, and this poses some interesting scale challenges: giving your customers a fully-featured bridge to SQL means that they will find all sorts of creative ways to generate queries your databases aren't prepared to handle.

To solve this, we've beenbuilding a system based on adaptive caches, where Zendesk keeps "materialized views" of expensive-to-execute queries, created on the fly as needed. At the heart of this caching system is a data firehose, a change capture system like Martin Kleppman's been talking about.

We didn't find anything that quite suited our needs, so we built , a program which transforms mysql binlogs into JSON and sends the entries to kafka. Getting going with Maxwell.

More in-depth quickstart docs can be found here but very briefly, you gotta:

  • setup row-based replication on your mysql master
  • setup kafka
  • run Maxwell with permissions to access the "maxwell" database. Maxwell will create it if it doesn't it.

Here's a sample of what Maxwell outputs to Kafka. This data is from one of our production shards. The ids have been changed to protect the innocent, and some whitespace added for clarity.

{ "database": "zd_shard461_prod",
  "table":    "ticket_field_entries",
  "type":     "update",
  "data":     {"id":918362569,"ticket_field_id":2409966,"ticket_id":105008079,"updated_at":"2015-08-01 20:35:37","account_id":34989,"value":"147","created_at":"2015-08-01 20:32:23"},
  "ts":       1438461337 }

{ "database": "zd_shard461_prod",
  "table":    "events",
  "type":     "insert",
  "data":     {"id":771929975, "ticket_id":105008079, "updated_at":"2015-08-01 20:35:37", "account_id":34989, "is_public":0, "value_previous":null, "via_id":0, "created_at":"2015-08-01 20:35:37", "author_id":1016622766, "type":"Audit"},
  "ts":       1438461337 }

As you can see, Maxwell logs change data from mysql's binlog, in JSON format. Let's do a quick breakdown of the fields:

  • database -- the database name
  • table -- the table name
  • type -- this can be "insert", "update", or "delete". In all 3 event types cases, the JSON stream will include a full row. If the row is a delete, Maxwell will output the last value of the row before it was deleted.
  • ts -- the timestamp the row was stored in the binlog.
  • data -- ahhhhh, the good stuff. More info on how Maxwell handles data types is available here.

Some of the challenges

The infrastructure team first got in-depth with the care and feeding of mysql binlogs while working on our last big project, a system to move our customer data's between our various data pods with minimal downtime. We snapshot an account's mysql data at a certain point in time, transfer the corpus of data, and then start applying deltas from the source binlog to the destination mysql server. In theory applying deltas should be a straightforward task: record the binlog position at the start of the corpus creation (call that Tx), and again before finalizing the account move (call that Ty), and then simply replay the binlogs spanning from Tx-Ty back on the destination master.

Mysql's binlogs are a funny thing, though. They contain enough metadata for a replica to keep in sync with its master, but that's about it: they don't have enough information to transform a row back into SQL (or json). Processing the binlogs gives you access to raw column data and some minimal type information, but you don't get column names, character encoding, decimal precision or signedness of integers. Because of this, in order to regenerate a stream of the data, it's critical to have an up-to-date copy of the mysql schema in hand. This is one of the trickiest parts of dealing with binlogs. Some possible solutions we considered:

  • For our account mover process, we solved this by capturing the schema at the top of process, and then aborting the move if any schema differences were detected during the move.
  • MyPipe solves this problem (sort of) by recapturing the schema after processing each ALTER statement, but this is prone to errors if the replica falls too far behind.
  • We're fairly sure LinkedIn's engineers, while writing DataBus solved this problem by patching mysql to output full schema information into its binlogs. They never seemed to release the patch, though. (Update: we were totally wrong about this, see conversation on GitHub.)

We wanted a better approach for Maxwell, one more robust than MyPipe's approach and less invasive than DataBus'. One fairly obvious, but daunting approach was to make the binlog processor act as a fully-fledged replica, and code it to be able to parse the ALTER/CREATE TABLE/DROP TABLE statements sent by the server. In a fit of hubris and silly ambition, we decided that if we had to write a SQL parser, so be it. Maxwell includes an ANTLR4-based parser that's capable of processing all DDL statements the same way a mysql replica would, and keeps a snapshot of the schema inside the mysql master itself. This was one of the hardest parts of the system to get "feature complete" -- supporting every nook and cranny of the vast syntax that mysql supports is a challenge. Even with MySQL's excellent documentation, we still occasionally find odd or undocumented cases of syntax that Maxwell needs to be taught how to handle.

JSON? Not Avro?

We went back and forth on this one quite a bit. Avro is great and we're starting to use it at Zendesk, but we didn't find a great solution on how to store avro schemas. One could put them in the middle of the kafka stream, but this would mean that the stream wasn't readable from any given starting position. Or you could store them in a separate HTTP repo, but that requires encoding a bunch of smarts into any Maxwell consumer. In practice, JSON has some downsides (datetimes and BLOBs are particularly sucky), but its simplicity seems to outweigh all the other concerns. It also compresses wonderfully (over X-to-1), so its extra space usage isn't much of a concern. Maybe we'll end up revisiting this decision. Not sure.

open-replicator

At the core of Maxwell is Google's excellent open-replicator. The main branch seemed a little dead so we've taken over maintainership of a fork, and are actively working on adding new features like support for mysql 5.6 checksums, and are eyeing GTID support.

What's next for Maxwell

Maxwell is up and running in production at Zendesk! We've been shaking out the beta bugs and should have a 1.0 release soon. We're still not totally feature complete, though. Here's a partial list of our upcoming ambitions for Maxwell:

  • A "day-zero" feature to bootstrap the initial contents of your database into Kafka. DataBus does this, and we're jealous. Also, having the entire contents of our database denormalized into kafka streams will let us use only the kafka streams as inputs to our ETL systems.

  • For updates, Maxwell needs to output not only the snapshot of the row as it stands, but also the row as it was. This can be useful for knowing when a particular row started or stopped matching a condition.

  • Maxwell's handling of a master failover is less-than-ideal. At Zendesk, we run a simple wrapper script that probes our database clusters at runtime, and in the event of a master failover a new Maxwell will spin up on the promoted master. This is a fine solution, but the version of Maxwell on the new master has no idea that it should pick up exactly where copy running on the old master left off. Supporting GTIDs and 5.7 is one possible solution to this, but there's also a possibility of pulling off master failover without them.

  • Currently Maxwell always partitions its data to different Kafka topics by database name. Ideally this should be user configurable -- for those who would want to sacrifice ordering guarantees for overall throughput, maxwell should be able to partition its data based on an auto-increment key, or table, or really any field in the JSON payload.

Obligatory "summing up/gloating" paragraph

We've started to use Maxwell for various ETL systems - we're looking at revamping our search architecture to be Kafka-consumers instead of database-pollers, and various new products are using Maxwell to get notified when something changes in the main database.

Change-data-capture systems are great pieces of architecture, and like all good architectures, solutions seems to fall out of them easily. So we'd love for you to give Maxwell a shot, and see if it solves your problems the way it's solved ours!

Open an issue on our GitHub repo if you have any questions or want to get in touch about Maxwell