Commit 72f1d7b1 authored by Kamron Aroonrua's avatar Kamron Aroonrua 💬

new storage rpc

parent c1d15ed5
...@@ -6,6 +6,7 @@ var QueueReceiver = ctx.getLib('lib/amqp/queuereceiver'); ...@@ -6,6 +6,7 @@ var QueueReceiver = ctx.getLib('lib/amqp/queuereceiver');
var ConnCtx = ctx.getLib('lib/conn/connection-context'); var ConnCtx = ctx.getLib('lib/conn/connection-context');
var JobRegistry = ctx.getLib('lib/mems/job-registry'); var JobRegistry = ctx.getLib('lib/mems/job-registry');
var SSCaller = ctx.getLib('lib/axon/rpccaller'); var SSCaller = ctx.getLib('lib/axon/rpccaller');
var RPCCaller = ctx.getLib('lib/amqp/rpccaller');
var ACLValidator = ctx.getLib('lib/auth/acl-validator'); var ACLValidator = ctx.getLib('lib/auth/acl-validator');
var JobTransaction = require('./lib/jobtransaction') var JobTransaction = require('./lib/jobtransaction')
...@@ -34,7 +35,10 @@ var JW = function JobWorker (prm) ...@@ -34,7 +35,10 @@ var JW = function JobWorker (prm)
/* Disable RPC Feature */ /* Disable RPC Feature */
//this.storagecaller = new SSCaller({'url':SS_URL}); //this.storagecaller = new SSCaller({'url':SS_URL});
this.storagecaller = null; this.storagecaller = new RPCCaller({
url : this.conn.getAmqpUrl(),
name :'storage_request'
});
} }
JW.prototype.start = function () JW.prototype.start = function ()
......
...@@ -27,7 +27,7 @@ function RPCCaller(config) ...@@ -27,7 +27,7 @@ function RPCCaller(config)
ch.responseEmitter = new EventEmitter(); ch.responseEmitter = new EventEmitter();
ch.responseEmitter.setMaxListeners(0); ch.responseEmitter.setMaxListeners(0);
ch.consume(REPLY_QUEUE , ch.consume(REPLY_QUEUE ,
(msg) => { console.log(msg); ch.responseEmitter.emit(msg.properties.correlationId, JSON.parse(msg.content.toString()))}, (msg) => { ch.responseEmitter.emit(msg.properties.correlationId, JSON.parse(msg.content.toString()))},
{noAck: true}); {noAck: true});
self.opened = true; self.opened = true;
......
...@@ -24,10 +24,12 @@ function perform_function(context,request,response){ ...@@ -24,10 +24,12 @@ function perform_function(context,request,response){
var amqp_cfg = ctx.config.amqp; var amqp_cfg = ctx.config.amqp;
var storage_name = param.storage_name; var storage_name = param.storage_name;
var caller = new RPCCaller({ // var caller = new RPCCaller({
url : amqp_cfg.url, // url : amqp_cfg.url,
name :'storage_request' // name :'storage_request'
}); // });
var caller = storagecaller;
// if(param.channel!='ipc'){ // if(param.channel!='ipc'){
// caller = new RPCCaller({ // caller = new RPCCaller({
...@@ -79,9 +81,7 @@ function perform_function(context,request,response){ ...@@ -79,9 +81,7 @@ function perform_function(context,request,response){
}); });
if(acp){ if(acp){
console.log('send data ' + String(idx+1) + ' of ' + String(data.length))
send_storage(caller,dc_meta,el_data,sname,function(err){ send_storage(caller,dc_meta,el_data,sname,function(err){
console.log('OK>>')
if(!err){ if(!err){
idx++; idx++;
callback(null); callback(null);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment