Commit fc478cd2 authored by project's avatar project

--no commit message

--no commit message
parent d32c9829
...@@ -45,7 +45,7 @@ var process_get = function(req, res) { ...@@ -45,7 +45,7 @@ var process_get = function(req, res) {
var msg = job_execute_msg; var msg = job_execute_msg;
msg.jobId = item.jobid; msg.jobId = item.jobid;
evp.send(topic,JSON.stringify(msg)); evp.send(topic,msg);
}); });
if(j.length > 0) if(j.length > 0)
......
...@@ -36,11 +36,11 @@ function EventPub(config) ...@@ -36,11 +36,11 @@ function EventPub(config)
} }
EventPub.prototype.send = function(topic,msg,cb) EventPub.prototype.send = function(topic,msg)
{ {
var self=this; var self=this;
this.open(function(err){ this.open(function(err){
self.ch.publish(self.name, topic, new Buffer(msg)); self.ch.publish(self.name, topic, new Buffer(JSON.stringify(msg)));
}); });
} }
......
var amqp = require('amqplib/callback_api');
var thunky = require('thunky');
function QueueCaller(config)
{
this.config = config;
this.url = config.url;
this.name = config.name || "task_queue";
this.conn = null;
this.ch = null;
var self = this;
this.opened = false;
this.open = thunky(open);
this.open();
function open (cb) {
amqp.connect(self.url, function(err, conn) {
if (err){return cb(err)}
conn.createChannel(function(err, ch) {
if (err){return cb(err)}
var q = self.name;
ch.assertQueue(q, {durable: true});
self.opened = true;
self.conn = conn;
self.ch = ch;
cb();
});
});
}
}
QueueCaller.prototype.send = function(msg,cb)
{
var self=this;
this.open(function(err){
self.ch.sendToQueue(self.name, new Buffer(JSON.stringify(msg)), {persistent: true});
});
}
QueueCaller.prototype.close = function(cb)
{
var self=this;
self.conn.close(cb);
}
module.exports = QueueCaller;
var amqp = require('amqplib/callback_api');
var thunky = require('thunky');
function QueueReceiver(config)
{
this.config = config;
this.url = config.url;
this.name = config.name || "task_queue";
this.execute_function = null;
}
QueueReceiver.prototype.start = function (cb)
{
var self = this;
amqp.connect(self.url, function(err, conn) {
if(err){
return cb(err);
}
conn.createChannel(function(err, ch) {
if(err){
return cb(err);
}
var q = self.name;
ch.assertQueue(q, {durable: true});
ch.prefetch(1);
ch.consume(q, function(msg) {
var objmsg = JSON.parse(msg.content.toString());
self.execute_function(objmsg,function(){
ch.ack(msg);
});
}, {noAck: false});
cb(null);
});
});
}
QueueReceiver.prototype.close = function(cb)
{
var self=this;
self.conn.close(cb);
}
QueueReceiver.prototype.set_execute_function = function(func){
this.execute_function = func;
}
module.exports = QueueReceiver;
var ctx = require('../context');
var amqp_cfg = ctx.config.amqp;
var QueueCaller = ctx.getLib('lib/amqp/queuecaller');
var qc = new QueueCaller({'url':'amqp://bigmaster.igridproject.info'});
qc.send({'name':'kamron'});
var ctx = require('../context');
var amqp_cfg = ctx.config.amqp;
var QueueReceiver = ctx.getLib('lib/amqp/queuereceiver');
var server = new QueueReceiver({
url : amqp_cfg.url,
});
server.set_execute_function(function(data,callback){
console.log(data);
callback();
});
server.start(function(err){
console.log('server start');
})
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment