Being a backend programmer, I rarely code in JavaScript. Today I
code a lot of cross browser JavaScript with the fantastic lib
jquery and its various extensions. Front end programming is hard
to get correct since IE sucks but that's another Blog post that
is out of scope. I certainly have more of an appreciation for the
people that focus in this area. I can finally say I am really
well rounded in doing CSS, JavaScript, Server tuning, DBA, PHP, C
etc since I work the entire stack. Now for the setup and
disclaimer. This post is not meant to provide a How-To but to
explain the approach. If you would like a How To let me know on
what.
With Node.js the gap between the discipline of Front-End
programming and Backend Programming is narrowing. To code in Node
you need to know JavaScript. Node is great at handing off data
and pumping data to the requester in an event based model that is
fantastically fast and small. I love node, especially since node
has access to the browser cookie.
My use of node is very simple.
Browser makes a long polling jsonp connection (json with padding
basically the server can call code loaded on your browser) to
Node.js.
Node.js will send back any data that is waiting for the client,
or is sent to the client during the connection lifetime, which
recycles every 50 seconds. Node source of data is from
RabbitMQ.
RabbitMQ an erlang server which talks the AMQP protocol (shared
by many message queue servers) is a message queue that can be
made into a distributed fault tolerant system and hands off the
message payloads to Node.js which sends the message to the users
who are online and subscribe to an exchange. The source of data
for RabbitMQ comes from the database commit in Apache application
space (PHP).
MySQL is the persistent store that handles page reload or init
requests, while the previous two components enable the real-time
feed.
Memcache holds a persons friend list.
Notice Flash is not used since I wanted a pure JavaScript version
and reduce the amount of download needed to service the request.
Additionally web sockets are not finalized yet and does not work
across all browsers while long polling do, Socket.io is cool but
requires flash as well.
So the ascii diagram shows this flow:
Browser --> Apache+PHP --> DB
if DB commit ok
Apache+PHP-->RabbitMQ-->Node.js-->Friends of
Browser
Since Node is a gateway lets focus more on RabbitMQ which handles
the logic. I'm using RabbitMQ in a pub-sub model. To understand
Rabbit knowing its vocabulary is important.
Producer: Produces the message
Exchange: The bridge between Producer and the Queue(s). Note the
producer has no idea what the queues are in concept, all it needs
to know is the exchange will take the message from the producer
and send it to the correct queue(s).
Queue: On disk queue or in memory (distributed) queue based on
durability settings holds the message from Producers
Consumer: Node.js it reads all events that Node subscribed to and
hands it off to the connected user.
I wrote a benchmark in node to test the capability of this setup
and its quite impressive: let's take a quick look:
require("./amqp.connection");
connection.addListener('ready', function() {
puts("connected to " + connection.serverProperties.product);
var e = connection.exchange('[userid]_feedExchange', {type: 'fanout'});
e.on('exchangeDeclareOk', function(data){
e.publish('routingKeyWhichIgnoredBecauseOfFanout', {message: "This is the message payload"});
setTimeout(function () {
// wait one second to receive the message, then quit
connection.end();
}, 1000);
});
The require (line 1) sets up the connection: global.connection =
amqp.createConnection(options);
Line 3 sets up a listener which fires off code when the
connection is established.
Line 5 declares the exchange. If it doesn't exist create it, else
keep going. Notice that the exchange is dynamically created from
the userId, so queue binds can happen against this exchange
Line 6 listens for another event exchangeDeclareOK, if OK send
the message which is on line 7
Line 9 sets a timeout to cleanly close the connection else you
will get
net.js:392
throw new Error('Socket is not writable');
^
Error: Socket is not writable
at Connection._writeOut (net.js:392:11)
at Connection.write (net.js:378:17)
at Connection._sendMethod (/usr/local/node/lib/node_modules/amqp/amqp.js:1011:8)
at Object.cb (/usr/local/node/lib/node_modules/amqp/amqp.js:1799:21)
at Exchange._tasksFlush (/usr/local/node/lib/node_modules/amqp/amqp.js:1306:10)
at Exchange._onMethod (/usr/local/node/lib/node_modules/amqp/amqp.js:1772:8)
at Exchange._onChannelMethod (/usr/local/node/lib/node_modules/amqp/amqp.js:1338:14)
at Connection._onMethod (/usr/local/node/lib/node_modules/amqp/amqp.js:900:28)
at AMQPParser.onMethod (/usr/local/node/lib/node_modules/amqp/amqp.js:807:12)
at AMQPParser._parseMethodFrame (/usr/local/node/lib/node_modules/amqp/amqp.js:454:10)
shell returned 1
Since this is a fanout approach, RabbitMQ is sending the message
to each "User" that subscribed to the exchange.
Here is an example of a consumer in node:
#!/bin/env node
require("./amqp.connection");
require("./utilities");
connection.addListener('ready', function () {
puts("connected to " + connection.serverProperties.product);
var e = {};
var q = {};
var qNames = {};
e = connection.exchange('[userid]_feedExchange', {type: 'fanout'});
for(var i = 0; i < 1000; i++){
var queueName = "_" + 100223 + i;
q[queueName] = connection.queue(queueName, {autoDelete: true});
var f = function(){
var k=i; //variables are scoped by function and not brace so to get a copy of i we need to send the k at the time k is declared for the local lambda
q[queueName].on('queueDeclareOk', function (args) {
q[args.queue].bind(e, "*");
puts(k+': queue opened: ' + args.queue + ' Message Count:'+ args.messageCount + " ConsumerCount: " + args.consumerCount);
q[args.queue].subscribe(function(json) {
console.log(k + ": " + json.message);
});
});
}();
}
});
This consumer binds 1000 queues starting from 100223 to the
[userid]_feedExchange in the producer section
On line 15 I create a hashmap of queues to access later when line
18 event queueDeclareOK is thrown.
Line 19 binds each queue to the exchange
Line 21 subscribes each queue to any message sent from the
exchange (if we used routing we can listen to certain types of
messages - another post).
Very simple, it is fast and works. On a single processor to send
a message to 1000 consumers takes
real 0m0.301s
user 0m0.220s
sys 0m0.028s
using very little memory
==16049==
==16049== HEAP SUMMARY:
==16049== in use at exit: 1,275,122 bytes in 140 blocks
==16049== total heap usage: 3,798 allocs, 3,658 frees, 11,036,111
bytes allocated
==16049==
==16049== Searching for pointers to 140 not-freed blocks
==16049== Checked 505,492 bytes
==16049==
==16049== LEAK SUMMARY:
==16049== definitely lost: 1,600 bytes in 74 blocks
==16049== indirectly lost: 1,048,736 bytes in 31 blocks
==16049== possibly lost: 172,080 bytes in 3 blocks
==16049== still reachable: 52,706 bytes in 32 blocks
==16049== suppressed: 0 bytes in 0 blocks
==16049== Rerun with --leak-check=full to see details of leaked
memory
==16049==
==16049== ERROR SUMMARY: 0 errors from 0 contexts (suppressed: 33
from 8)
--16049--
--16049-- used_suppression: 33 dl-hack3-cond-1
==16049==
==16049== ERROR SUMMARY: 0 errors from 0 contexts (suppressed: 33
from 8)