Memory tuning fast paced ETL

Dear Kettle friends,

on occasion we need to support environments where not only a lot of data needs to be processed but also in frequent batches.  For example, a new data file with hundreds of thousands of rows arrives in a folder every few seconds.

In this setting we want to use clustering to use “commodity” computing resources in parallel.  In this blog post I’ll detail how the general architecture would look like and how to tune memory usage in this environment.

Clustering was first created around the end of 2006.  Back then it looked like this.

The master

This is the most important part of our cluster.  It takes care of administrating network configuration and topology.  It also keeps track of the state of dynamically added slave servers.

The master is started just like any other slave server below.  It’s just a logical part of a cluster schema but without it we can’t do any clustering.

The slaves

Any Kettle cluster needs one or more slaves servers to do the heavy lifting.  The slave servers can be manually added to the cluster schema using the Spoon browser, or they can be dynamically configured as detailed here.

All slave servers are started by running the Carte server.  You can start it by simply running the following command:

sh carte.sh myhostname 8282

By executing this, we started a small embedded web server that we can actually access on the following URL:  http://myhostname:8282/

An alternative way of running Carte is by passing it an XML configuration file.  You can find a few examples in the pwd/ folder of your Pentaho Data Integration (Kettle) distribution package.

Security

The user names and passwords of the carte instances are defined in the pwd/kettle.pwd file.  The defaults are user: “cluster” and password “cluster”.

A clustered transformation

Our transformation reads the incoming data files in parallel from a network attached storage device.  This is the fastest way to read the file as it eliminates all possible CPU bottlenecks incurred by character code-page conversion and data conversion logic.  The data is then split into pieces with regular expression logic and some complex scripts are applied (add more steps, rinse and repeat to your liking).  Finally the data is landed in an output file or in a database.

To get some idea on the master about how much data was processed we count the row and calculate a total for the complete cluster on the master.  Please note that all steps that are configured to run clustered will be executed on the slave servers (in our case 2) and the others will be executed on the master slave server.

How does this work?

The executing job (or Spoon) takes the definition of your transformation and creates one master transformation and a set of Slave transformations.  This splitting of the original transformation is made possible because Pentaho Data Integration is 100% metadata driven.  So Kettles takes your Kettle metadata, splits it and converts it to make it suitable for execution on the various servers.   If you enable the “Show transformations” option in the transformation execution dialog in Spoon, you can see the generated master and slave transformations.

Sometimes (like in our example) data needs to be sent from the master to the slaves or vice-versa.  We don’t send this data over web services as it would be too slow.  For this we use TCP/IP sockets.  The port numbers are allocated in advance and administered by the master server.  You can list all the allocated sockets for a certain host by using the following call in your favorite browser:

http://masterhost:8282/kettle/listSocket/?host=slavehost

If you only want to see the open (or used) sockets, you can add  “&onlyOpen=Y” to the URL.

Then the transformations are sent to the master and slave servers using the web services (simple servlets actually) that are exposed on the Carte instances.   They arrive on the slave servers and are put in a “Waiting” state.  Then all the transformations are initialized (or prepared) with another web service call.  At this point any server sockets are opened for those steps that needs to send data to a remote next step (from a slave to a master or vice versa).  When all that completed successfully we start the transformations.

After the clustered execution of the master and slave transformations is competed we run a clean-up on the various slaves where sockets are closed and where server ports on the master are deallocated for re-use by another transformation.

Parallel clustering

To further help out with fast-paced environments, version 4.2.0-RC (due real soon) will support parallel execution of the same or different clustered transformations on the same cluster.  This allows you to devise a strategy where a clustered execution is started for each incoming file without regards to the state of the previous execution.  To keep the various executions apart we’ve introduced a new internal Kettle variable called ${Internal.Cluster.Run.ID} which you can use on the master and the slaves to write to different output files for example.

A never ending job

If you check the “Repeat” option in the dialog of the “Start” job entry, you get a never ending job unless you execute an “Abort job” job entry.  This is very convenient if you want to have logic that keeps looking for new files or more things to do until nothing is left.  We can also use this to grab another value from a message queue, a web service or a directory full of files and near-real time data integration in general.

In the past this option somewhat became discredited because memory management in versions prior to the upcoming 4.1 was not as good as it is now.  Let’s take a look at what those options in version 4.2 are…

Time out stale carte objects

Every time you execute a clustered transformation you will see a new master and slave transformation appear in a “Waiting” state on slave server.  If you execute every couple of minutes or even faster, you will get a long list of logged transformations on the slave servers.  By default (in 4.2) they will be automatically purged but only after one day.  Setting a faster time-out period is important in this case.  You can do this either by setting the <object_timeout_minutes> option in your carte configuration file.  Another option is to set the KETTLE_CARTE_OBJECT_TIMEOUT_MINUTES variable in the kettle.properties files on the various servers.

Configure the logging back-end

Since version 4, the logging back-end of Kettle was completely re-written to be as flexible as possible.  This logging back-end also provides us with valuable tracing and lineage information.  Every executable component in Kettle, every database connection, transformation, job, step or job entry has its own unique logging channel ID.  Upon creation, the component registers itself in a central logging registry where we keep track of the name and type of the component (and so on) but also which parent it has.  With this we know the complete execution lineage of a job for example.
During execution, log lines are kept in a central log buffer so that we can easily retrieve the lines incrementally (in Spoon) or store them in a database table. (transformation or job logging for example).  Each line references the logging registry so we know at all times where it came from.

All this information is being managed as detailed on this wiki page but in high-paced environments it is still wise to set limits as to how much memory can be consumed by the logging back-end.  For this there are a few parameters you can set:

  • KETTLE_MAX_LOGGING_REGISTRY_SIZE : Make sure to consider this parameter in fast paced environments where a job never ends and the registry is not cleaned automatically because of this.  The default of 1000 should be enough to provide accurate logging.  If you have complex jobs you might want to increase this number.
  • KETTLE_MAX_JOB_ENTRIES_LOGGED : For never ending jobs this makes a big difference.  Please note that you can enable interval logging on the job entry log table.  Make sure you keep enough entries in memory until the next time you write them out to the database table.  The default is also set to a reasonably low 1000 entries.
  • KETTLE_MAX_JOB_TRACKER_SIZE: Again this parameter makes a difference in never ending jobs as it allows another possible memory leak to be cleaned up automatically beyond a certain size.  The job tracker keeps track of the results of job entries.  In a never ending job you rarely need more than the default, 1000 again.
  • KETTLE_MAX_LOG_SIZE_IN_LINES: If you accidentally execute a transformation in say “Row level” logging mode, an enormous amount of very detailed logging will be produced.  In the past, before version 4, this was usually a common cause for running out of memory and crashing your whole cluster.  By setting this value to a fair maximum (default is 5000 in 4.2) you will prevent this situation.  You can also specify this parameter in your Carte XML configuration file with <max_log_lines> parameter.
  • KETTLE_MAX_LOG_TIMEOUT_IN_MINUTES: If you prefer to let the records time out after a while, then that is possible.  You can specify a maximum age with this parameter.  The default maximum age is 1440 (one day).    You can also specify this parameter in your Carte XML configuration file with <max_log_timeout_minutes> parameter.

As someone who had the pleasure of testing the various settings for the past couple of weeks I can say that it all works nicely.  Once a job (executing 3 clustered transformations in parallel processing hundreds of millions of rows) runs for thousands of iterations and for days on end without consuming more than a few hundred MB you can be sure that memory management is under control.

Enjoy!

Matt