Commit 4db7e525 authored by Kamron Aroonrua's avatar Kamron Aroonrua 💬

Merge branch 'covid' into 'dev'

Covid

See merge request !22
parents 9c21f4b6 0fddbb6e
#Changelog #Changelog
## [1.2.3] - 2019-11-19 ## [1.2.3-UR]
### Fixed
- BS :: AMQP RPC Singleton Connection ,auto ack
- STORAGE :: 50x speedup
### Added ### Added
- BS :: env configuration context
- PLUGIN :: dt-transform register - PLUGIN :: dt-transform register
- BS :: Configuration Context with ENV - BS :: Configuration Context with ENV
- PLUGIN :: do-http - PLUGIN :: do-http
......
...@@ -108,7 +108,7 @@ JobTask.prototype.run = function () ...@@ -108,7 +108,7 @@ JobTask.prototype.run = function ()
perform_di({'context':context,'handle':self} ,function(err,resp){ perform_di({'context':context,'handle':self} ,function(err,resp){
if(resp){ if(resp){
console.log('[DI STATUS]\t\t: ' + resp.status); // console.log('[DI STATUS]\t\t: ' + resp.status);
self.stats.di = resp; self.stats.di = resp;
} }
if(resp.status == 'success'){ if(resp.status == 'success'){
...@@ -149,7 +149,7 @@ JobTask.prototype.run = function () ...@@ -149,7 +149,7 @@ JobTask.prototype.run = function ()
perform_dt({'cfg':cur_cfg,'name':dt_name,'context':context,'request':dt_request,'handle':self},function(err,dt_resp){ perform_dt({'cfg':cur_cfg,'name':dt_name,'context':context,'request':dt_request,'handle':self},function(err,dt_resp){
if(dt_resp){ if(dt_resp){
console.log('[DT:' + dt_name + ' STATUS]\t\t: ' + dt_resp.status); // console.log('[DT:' + dt_name + ' STATUS]\t\t: ' + dt_resp.status);
} }
idx++; idx++;
...@@ -188,7 +188,9 @@ JobTask.prototype.run = function () ...@@ -188,7 +188,9 @@ JobTask.prototype.run = function ()
dm_o.run(function() { dm_o.run(function() {
perform_do({'context':context,'request':do_request,'handle':self},function(err,do_resp){ perform_do({'context':context,'request':do_request,'handle':self},function(err,do_resp){
if(do_resp){console.log('[DO STATUS]\t\t: ' + do_resp.status);} if(do_resp){
//console.log('[DO STATUS]\t\t: ' + do_resp.status);
}
if(do_resp.status == 'success'){ if(do_resp.status == 'success'){
callback(null,do_resp); callback(null,do_resp);
}else { }else {
...@@ -205,18 +207,21 @@ JobTask.prototype.run = function () ...@@ -205,18 +207,21 @@ JobTask.prototype.run = function ()
//self.emit('error',new Error('job execution timeout')) //self.emit('error',new Error('job execution timeout'))
},self.job_timeout); },self.job_timeout);
console.log('***** JOB RUNNING *****'); // console.log('***** JOB RUNNING *****');
console.log('[JOB ID]\t\t: ' + job_id); // console.log('[JOB ID]\t\t: ' + job_id);
console.log('[TRANSACTION ID]\t: ' + transaction_id); // console.log('[TRANSACTION ID]\t: ' + transaction_id);
async.waterfall([task_di,task_dt,task_do],function (err,resp) { async.waterfall([task_di,task_dt,task_do],function (err,resp) {
clearTimeout(jtimeout); clearTimeout(jtimeout);
//console.log('[JOB DONE] id=' + job_id + ' ,tr=' + transaction_id + '\t' + resp.status);
if(!err){ if(!err){
self.stop(resp) self.stop(resp)
console.log('***** JOB SUCCESSFULLY DONE *****\n'); // console.log('***** JOB SUCCESSFULLY DONE *****\n');
console.log('[JOB DONE] id=' + job_id + ' ,tr=' + transaction_id + '\tsuccess');
}else{ }else{
self.stop(err) self.stop(err)
console.log('***** JOB UNSUCCESSFULLY DONE *****\n'); // console.log('***** JOB UNSUCCESSFULLY DONE *****\n');
console.log('[JOB DONE] id=' + job_id + ' ,tr=' + transaction_id + '\tunsuccess');
} }
}); });
......
...@@ -25,7 +25,7 @@ memstore.prototype.getItem = function(k,cb) ...@@ -25,7 +25,7 @@ memstore.prototype.getItem = function(k,cb)
if(!err && v){ if(!err && v){
if(typeof v == 'object' && v.type == 'Buffer') if(typeof v == 'object' && v.type == 'Buffer')
{ {
value = new Buffer(v.data); value = new Buffer.from(v.data);
}else{ }else{
value = JSON.parse(v); value = JSON.parse(v);
} }
......
...@@ -6,6 +6,7 @@ var QueueReceiver = ctx.getLib('lib/amqp/queuereceiver'); ...@@ -6,6 +6,7 @@ var QueueReceiver = ctx.getLib('lib/amqp/queuereceiver');
var ConnCtx = ctx.getLib('lib/conn/connection-context'); var ConnCtx = ctx.getLib('lib/conn/connection-context');
var JobRegistry = ctx.getLib('lib/mems/job-registry'); var JobRegistry = ctx.getLib('lib/mems/job-registry');
var SSCaller = ctx.getLib('lib/axon/rpccaller'); var SSCaller = ctx.getLib('lib/axon/rpccaller');
var RPCCaller = ctx.getLib('lib/amqp/rpccaller');
var ACLValidator = ctx.getLib('lib/auth/acl-validator'); var ACLValidator = ctx.getLib('lib/auth/acl-validator');
var JobTransaction = require('./lib/jobtransaction') var JobTransaction = require('./lib/jobtransaction')
...@@ -34,7 +35,10 @@ var JW = function JobWorker (prm) ...@@ -34,7 +35,10 @@ var JW = function JobWorker (prm)
/* Disable RPC Feature */ /* Disable RPC Feature */
//this.storagecaller = new SSCaller({'url':SS_URL}); //this.storagecaller = new SSCaller({'url':SS_URL});
this.storagecaller = null; this.storagecaller = new RPCCaller({
url : this.conn.getAmqpUrl(),
name :'storage_request'
});
} }
JW.prototype.start = function () JW.prototype.start = function ()
......
var amqp = require('amqplib/callback_api'); var amqp = require('amqplib/callback_api');
var EventEmitter = require('events').EventEmitter;
var thunky = require('thunky');
const REPLY_QUEUE = 'amq.rabbitmq.reply-to';
function RPCCaller(config)
{
this.config = config;
this.url = config.url;
this.name = config.name || "rpc_queue";
this.conn = null;
this.ch = null;
var self = this;
this.opened = false;
this.open = thunky(open);
this.open();
function open (cb) {
amqp.connect(self.url, function(err, conn) {
if (err){return cb(err)}
conn.createChannel(function(err, ch) {
if (err){return cb(err)}
ch.responseEmitter = new EventEmitter();
ch.responseEmitter.setMaxListeners(0);
ch.prefetch(4);
ch.consume(REPLY_QUEUE ,
(msg) => { ch.responseEmitter.emit(msg.properties.correlationId, JSON.parse(msg.content.toString()))},
{noAck: true});
self.opened = true;
self.conn = conn;
self.ch = ch;
cb();
});
});
}
}
RPCCaller.prototype.call = function(req,cb){
var self = this;
var corr = generateUuid();
self.open(function(err){
if(err){
console.log(err);
}
self.ch.responseEmitter.once(corr, (resp)=>{
cb(null,resp);
});
self.ch.sendToQueue(self.name, new Buffer(JSON.stringify(req)), { correlationId: corr, replyTo: REPLY_QUEUE,persistent: false })
});
function generateUuid() {
return Math.random().toString() +
Math.random().toString() +
Math.random().toString();
}
}
/*
function RPCCaller(config) function RPCCaller(config)
{ {
this.config = config; this.config = config;
...@@ -35,5 +98,6 @@ RPCCaller.prototype.call = function(req,cb){ ...@@ -35,5 +98,6 @@ RPCCaller.prototype.call = function(req,cb){
Math.random().toString(); Math.random().toString();
} }
} }
*/
module.exports = RPCCaller; module.exports = RPCCaller;
...@@ -23,18 +23,18 @@ RPCServer.prototype.start = function(cb) ...@@ -23,18 +23,18 @@ RPCServer.prototype.start = function(cb)
var q = self.name; var q = self.name;
ch.assertQueue(q, {durable: false}); ch.assertQueue(q, {durable: false});
ch.prefetch(1); ch.prefetch(4);
//console.log(' [x] Awaiting RPC requests'); //console.log(' [x] Awaiting RPC requests');
ch.consume(q, function reply(msg) { ch.consume(q, function reply(msg) {
var req = JSON.parse(msg.content.toString()); var req = JSON.parse(msg.content.toString());
self.remote_function(req,function(err,resp){ self.remote_function(req,function(err,resp){
ch.sendToQueue(msg.properties.replyTo,new Buffer(JSON.stringify(resp)),{correlationId: msg.properties.correlationId}); ch.sendToQueue(msg.properties.replyTo,new Buffer(JSON.stringify(resp)),{correlationId: msg.properties.correlationId,persistent: false});
}); });
ch.ack(msg); //ch.ack(msg);
}); }, {noAck: true});
cb(null); cb(null);
}); });
......
...@@ -70,24 +70,24 @@ function perform_function(context,response){ ...@@ -70,24 +70,24 @@ function perform_function(context,response){
'filesize': f_target.size, 'filesize': f_target.size,
'modify_ts' : Math.round(f_target.modifyTime/1000) 'modify_ts' : Math.round(f_target.modifyTime/1000)
} }
return sftp.get(prm_dir + '/' + f_target.name,null,null); return sftp.get(prm_dir + '/' + f_target.name);
}else{ }else{
return null; return null;
} }
}).then((data) => { }).then((data) => {
if(data){ if(data){
data.on('data',(dat)=>{
var nb = Buffer.concat([buff_out,dat]); var nb = Buffer.concat([buff_out,data]);
buff_out = nb; buff_out = nb;
})
data.on('end',()=>{
sftp.end() sftp.end()
memstore.setItem('lastmodify',last_mod,function(err){ memstore.setItem('lastmodify',last_mod,function(err){
var result=(prm_encoding=='binary')?buff_out:buff_out.toString('utf8'); var result=(prm_encoding=='binary')?buff_out:buff_out.toString('utf8');
response.success(result, {"meta":meta,"continue": fs_continue}); response.success(result, {"meta":meta,"continue": fs_continue});
}); });
});
}else{ }else{
sftp.end(); sftp.end();
response.reject(); response.reject();
......
...@@ -24,10 +24,12 @@ function perform_function(context,request,response){ ...@@ -24,10 +24,12 @@ function perform_function(context,request,response){
var amqp_cfg = ctx.config.amqp; var amqp_cfg = ctx.config.amqp;
var storage_name = param.storage_name; var storage_name = param.storage_name;
var caller = new RPCCaller({ // var caller = new RPCCaller({
url : amqp_cfg.url, // url : amqp_cfg.url,
name :'storage_request' // name :'storage_request'
}); // });
var caller = storagecaller;
// if(param.channel!='ipc'){ // if(param.channel!='ipc'){
// caller = new RPCCaller({ // caller = new RPCCaller({
......
var ctx = require('../context'); var ctx = require('../context');
var async = require('async');
var amqp_cfg = ctx.config.amqp; var amqp_cfg = ctx.config.amqp;
var RPCCaller = ctx.getLib('lib/amqp/rpccaller'); var RPCCaller = ctx.getLib('lib/amqp/rpccaller');
var caller = new RPCCaller({ var caller = new RPCCaller({
url : amqp_cfg.url, url : amqp_cfg.url,
name :'storage_request' name :'test_request'
}); });
var req = { var req = {
'object_type' : 'storage_request', 'object_type' : 'test_request',
'command' : 'write', 'command' : 'write',
'param' : { 'param' : {
'storage_name' : 'gcs.file.test', 'storage_name' : 'gcs.file.test',
...@@ -24,10 +25,29 @@ var req = { ...@@ -24,10 +25,29 @@ var req = {
} }
} }
caller.call(req,function(err,resp){
console.log(resp);
});
var idx = 0;
async.whilst(
function() { return idx < 2000; },
function(callback) {
req.id = idx;
caller.call(req,function(err,resp){
if(idx%100==0){
console.log(resp)
}
idx++;
callback(null);
});
},
function (err) {
if(!err){
console.log('finish')
}else{
console.log(err);
}
}
);
......
...@@ -5,26 +5,14 @@ var rpcserver = ctx.getLib('lib/amqp/rpcserver'); ...@@ -5,26 +5,14 @@ var rpcserver = ctx.getLib('lib/amqp/rpcserver');
var server = new rpcserver({ var server = new rpcserver({
url : amqp_cfg.url, url : amqp_cfg.url,
name : 'test_request'
}); });
server.set_remote_function(function(req,callback){ server.set_remote_function(function(req,callback){
var n = parseInt(req.t); callback(null,{'cmd':req.command,'id':req.id});
console.log('REQUEST ' + req);
setTimeout(function(){
callback(null,{'time':n,'data':req.d});
},n);
}) })
server.start(function(err){ server.start(function(err){
console.log('server start'); console.log('server start');
}) })
var http = require('http');
http.createServer(function (req, res) {
res.writeHead(200, {
'Content-Type': 'text/plain; charset=UTF-8'
});
console.log(req.body);
res.end("req");
}).listen(9080, "");
{ {
"version":"1.2.3", "version":"1.2.3",
"build":"202003061200" "build":"202004111200"
} }
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment