Commit ae3802f7 authored by Kamron Aroonrua's avatar Kamron Aroonrua 💬

new rpc

parent 5cdbf970
var amqp = require('amqplib/callback_api'); var amqp = require('amqplib/callback_api');
const REPLY_QUEUE = 'amq.rabbitmq.reply-to';
function RPCCaller(config)
{
this.config = config;
this.url = config.url;
this.name = config.name || "rpc_queue";
this.conn = null;
this.ch = null;
var self = this;
this.opened = false;
this.open = thunky(open);
this.open();
function open (cb) {
amqp.connect(self.url, function(err, conn) {
if (err){return cb(err)}
conn.createChannel(function(err, ch) {
if (err){return cb(err)}
ch.responseEmitter = new EventEmitter();
ch.responseEmitter.setMaxListeners(0);
ch.consume(REPLY_QUEUE ,
(msg) => channel.responseEmitter.emit(msg.properties.correlationId, JSON.parse(msg.content.toString())),
{noAck: true});
self.opened = true;
self.conn = conn;
self.ch = ch;
cb();
});
});
}
}
RPCCaller.prototype.call = function(req,cb){
var self = this;
var corr = generateUuid();
this.open(function(err){
if(err){
console.log(err);
}
self.ch.responseEmitter.once(corr, (resp)=>{
cb(null,resp);
});
self.ch.sendToQueue(self.name, new Buffer(JSON.stringify(req)), { correlationId: corr, replyTo: REPLY_QUEUE })
});
function generateUuid() {
return Math.random().toString() +
Math.random().toString() +
Math.random().toString();
}
}
/*
function RPCCaller(config) function RPCCaller(config)
{ {
this.config = config; this.config = config;
...@@ -35,5 +96,6 @@ RPCCaller.prototype.call = function(req,cb){ ...@@ -35,5 +96,6 @@ RPCCaller.prototype.call = function(req,cb){
Math.random().toString(); Math.random().toString();
} }
} }
*/
module.exports = RPCCaller; module.exports = RPCCaller;
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment