ZeroMQ.js
https://github.com/zeromq/zeromq.js
http://zeromq.github.io/zeromq.js/
https://zeromq.org/languages/nodejs/
$ npm install zeromq@5
const zmq = require("zeromq")
Messaging Patterns
- Publisher / Subscriber
- Request / Reply
- Router / Dealer
- Pushing / Pulling
Publisher / Subscriber
Publisher send message to all Subscriber
Publisher can't receive message
Subscribe can't send message
http://zguide.zeromq.org/js:wuserver
// Weather update server in node.js
// Binds PUB socket to tcp://*:5556
// Publishes random weather updates
var zmq = require('zeromq')
, publisher = zmq.socket('pub');
publisher.bindSync("tcp://*:5556");
publisher.bindSync("ipc://weather.ipc");
function zeropad(num) {
return num.toString().padStart(5, "0");
};
function rand(upper, extra) {
var num = Math.abs(Math.round(Math.random() * upper));
return num + (extra || 0);
};
while (true) {
// Get values that will fool the boss
var zipcode = rand(100000)
, temperature = rand(215, -80)
, relhumidity = rand(50, 10)
, update = `${zeropad(zipcode)} ${temperature} ${relhumidity}`;
publisher.send(update);
}
Request / Reply
when server get request (message), reply (message)
one request for mult reply need polling, it will process one by one
http://zguide.zeromq.org/js:hwserver
var zmq = require('zmq');
// socket to talk to clients
var responder = zmq.socket('rep');
responder.on('message', function(request) {
console.log("Received request: [", request.toString(), "]");
// do some 'work'
setTimeout(function() {
// send reply back to client.
responder.send("World");
}, 1000);
});
responder.bind('tcp://*:5555', function(err) {
if (err) { console.log(err);
} else { console.log("Listening on 5555…"); }
});
process.on('SIGINT', function() {
responder.close();
});
Router / Dealer
http://zguide.zeromq.org/page:all#advanced-request-reply
Advance of Request / Reply
income message(Request) send to router, router send to dealer
dealer have some worker to work
when dealer get request form router, dealer will send to worker
worker finish the work and reply to dealer
dealer forward to router, router reply to correspond request.
http://zguide.zeromq.org/js:rrbroker
// Simple request-reply broker in Node.js
var zmq = require('zmq')
, frontend = zmq.socket('router')
, backend = zmq.socket('dealer');
frontend.bindSync('tcp://*:5559');
backend.bindSync('tcp://*:5560');
frontend.on('message', function() {
// Note that separate message parts come as function arguments.
var args = Array.apply(null, arguments);
// Pass array of strings/buffers to send multipart messages.
backend.send(args);
});
backend.on('message', function() {
var args = Array.apply(null, arguments);
frontend.send(args);
});
Pushing / Pulling
Similar to Publisher and Subscriber
Pusher push one message, then only one puller get the message
bi-directional Pushing and Pulling
use cluster https://nodejs.org/api/cluster.html#cluster_cluster
const cluster = require('cluster');
const numCPUs = require('os').cpus().length;
if (cluster.isMaster) {
/* ... */
}else{
/* ... */
}
The master cluster create pusher and puller
pusher use to push task to worker cluster
puller use to pull result from worker cluster
and also reponsible to fork worker cluster
Form worker clusters:
worker clusters create pusher and puller
puller use to pull task from masker cluster
pusher use to push result to masker cluster
const zmq = require('zmq');
const cluster = require('cluster');
const numCPUs = require('os').cpus().length;
if (cluster.isMaster) {
const masterPusher = zmq.socket('push');
masterPusher.bind(/* ... */);
masterPusher.send(/* ... */); // call worker
const masterPuller = zmq.socket('pull');
masterPuller.bind(/* ... */);
masterPuller.on('message', msg => {
/* get the result */
});
for (let i = 0; i < numCPUs; i++) {
cluster.fork();
}
}else{
const workerPusher = zmq.socket('push').connect(/* ... */);
const workerPuller = zmq.socket('pull').connect(/* ... */);
workerPuller.on('message', msg => {
// Do work
workerPusher.send(/* work result */);
});
}
官方 Sample
Push/Pull
// producer.js
var zmq = require("zeromq"),
sock = zmq.socket("push");
sock.bindSync("tcp://127.0.0.1:3000");
console.log("Producer bound to port 3000");
setInterval(function() {
console.log("sending work");
sock.send("some work");
}, 500);
// worker.js
var zmq = require("zeromq"),
sock = zmq.socket("pull");
sock.connect("tcp://127.0.0.1:3000");
console.log("Worker connected to port 3000");
sock.on("message", function(msg) {
console.log("work: %s", msg.toString());
});
Pub/Sub
// pubber.js
var zmq = require("zeromq"),
sock = zmq.socket("pub");
sock.bindSync("tcp://127.0.0.1:3000");
console.log("Publisher bound to port 3000");
setInterval(function() {
console.log("sending a multipart message envelope");
sock.send(["kitty cats", "meow!"]);
}, 500);
// subber.js
var zmq = require("zeromq"),
sock = zmq.socket("sub");
sock.connect("tcp://127.0.0.1:3000");
sock.subscribe("kitty cats");
console.log("Subscriber connected to port 3000");
sock.on("message", function(topic, message) {
console.log(
"received a message related to:",
topic,
"containing message:",
message
);
});
other example
const zmq = require('zeromq');
const publisher = zmq.socket('pub');
publisher.bind("tcp://*:" + port, err =>{
if(err) throw err;
/* ... */
})
const responder = zmq.socket('rep');
responder.bind("tcp://127.0.0.1:" + port, err => {
if(err) throw err;
/* ... */
})
responder.on('message', data =>{
/* ... */
// publisher.send(/* ... */);
// responder.send(/* ... */);
})
const zmq = require('zeromq');
const pushSocket = zmq.socket('push');
pushSocket.bind("tcp://*:" + port, err =>{
if(err) throw err;
/* ... */
// pushSocket.send(/* ... */);
})
const zmq = require('zeromq');
const pullSocket = zmq.socket('pull');
pullSocket.on('message', msg => {
/* ... */
})
pullSocket.connect('tcp://127.0.0.1:' + port);