Commit 0b5e342b authored by Kamron Aroonrua's avatar Kamron Aroonrua 💬

new udpserv

parent c2bcc0e6
#Changelog
## [UR] - 2018-07-10
### Added
- TRIGGER :: nbudp
### Fixed
- PLUGIN :: storage acl supported
- PLUGIN :: call acl supported
## [UR] - 2018-06-xx
### Added
- TRIGGER :: httplistener supported POST text/*,binary
- BS :: tokenization & access control
- SS :: REST API (v1.2ur) new object api
- SS :: object indexing
......
......@@ -62,8 +62,12 @@ JobManager.prototype.setJob = function (prm,cb)
{
var self = this;
var job = prm.job;
var vo = prm.vo || "";
if(JUtils.validate(job)){
if(prm.vo){
job._vo = prm.vo;
}
self.job_registry.setJob(job.job_id,job);
if(job.trigger){
self.trigger_registry.setByJob(job,cb);
......
......@@ -29,7 +29,7 @@ TriggerManager.prototype.reload = function (prm,cb)
var msg = {
'trigger_type' : 'all',
'cmd' : 'reload',
'prm' : {}
'prm' : {'vo':prm.vo}
}
self.evp.send(topic,msg);
......
......@@ -103,18 +103,19 @@ router.post('/',function (req, res) {
return respHelper.response401();
}
jm.setJob({'job':json_job},function(err,res){
jm.setJob({'job':json_job,'vo':tInfo.vo},function(err,res){
if(err)
{
respHelper.response400(err);
}else{
if(q.reload){
tm.reload();
tm.reload({'vo':tInfo.vo});
}
respHelper.response201();
}
});
});
router.post('/action',function (req, res) {
......@@ -141,7 +142,7 @@ router.post('/action',function (req, res) {
respHelper.response400(err.message);
}else{
if(q.reload){
tm.reload();
tm.reload({'vo':tInfo.vo});
}
respHelper.response201();
}
......
......@@ -50,8 +50,11 @@ HTTPListener.prototype._http_start = function()
app.use(bodyParser.json({limit: '128mb'}));
app.use(bodyParser.urlencoded({
extended: true
extended: true,
limit: '128mb'
}));
app.use(bodyParser.raw({limit: '128mb'}));
app.use(bodyParser.text({limit: '128mb',type:"text/*"}));
var context = require('./lib/http-context');
app.use(context.middleware({
......
......@@ -19,6 +19,7 @@ function JobTask (prm)
this.mem = prm.handle.mem;
this.jobcaller = prm.handle.jobcaller;
this.storagecaller = prm.handle.storagecaller;
this.acl_validator = prm.handle.acl_validator
this.jobcfg = prm.job_config;
this.input_meta = prm.input_meta;
......@@ -87,6 +88,7 @@ JobTask.prototype.run = function ()
}
var context = {
"acl_validator" : this.acl_validator,
"jobconfig" : job_tr_config,
"transaction" : ctx_transaction,
"input" : {'data':obj_input_data,'meta':input_meta} ,
......@@ -284,7 +286,7 @@ function perform_dt(prm,cb)
function perform_do(prm,cb)
{
var do_context = prm.context
var job_id = do_context.jobconfig.job_id;
var do_cfg = do_context.jobconfig.data_out;
......
......@@ -6,6 +6,7 @@ var QueueReceiver = ctx.getLib('lib/amqp/queuereceiver');
var ConnCtx = ctx.getLib('lib/conn/connection-context');
var JobRegistry = ctx.getLib('lib/mems/job-registry');
var SSCaller = ctx.getLib('lib/axon/rpccaller');
var ACLValidator = ctx.getLib('lib/auth/acl-validator');
var JobTransaction = require('./lib/jobtransaction')
......@@ -21,6 +22,7 @@ var JW = function JobWorker (prm)
{
var param = prm || {};
this.config = param.config || cfg;
this.auth_cfg = this.config.auth;
this.instance_name = param.name;
this.conn = ConnCtx.create(this.config);
......@@ -28,6 +30,7 @@ var JW = function JobWorker (prm)
this.jobcaller = new QueueCaller({'url':this.conn.getAmqpUrl(),'name':'bs_jobs_cmd'});
this.job_registry = JobRegistry.create({'redis':this.mem});
this.acl_validator = ACLValidator.create(this.auth_cfg);
this.storagecaller = new SSCaller({'url':SS_URL});
}
......
......@@ -67,7 +67,9 @@ HttpACL.prototype.update = function(cb)
var trigger = JSON.parse(res[k]);
if(trigger.type == 'http')
{
var acl = mkACL(trigger.appkey,trigger.method,trigger.job_id);
var _vo = trigger.vo || '';
var akey = (_vo=='$' || _vo=='')?trigger.appkey:_vo + '.' + trigger.appkey;
var acl = mkACL(akey,trigger.method,trigger.job_id);
self.add(acl);
}
}
......
......@@ -52,8 +52,10 @@ TR.prototype.setByJob = function(job,cb)
{
var self = this;
var id = 'def.' + job.job_id;
var vo = job._vo;
var trigger = job.trigger;
trigger.id = id;
trigger.vo = vo;
trigger.job_id = job.job_id;
self.setTrigger(id,trigger,cb);
}
......
......@@ -2,13 +2,16 @@
var ctx = require('../../../context');
var Utils = ctx.getLib('lib/util/plugin-utils');
var bsdata = ctx.getLib('lib/model/bsdata');
const ACL_SERVICE_NAME = "job";
function perform_function(context,request,response){
var job_id = context.jobconfig.job_id;
var job_vo = context.jobconfig._vo || '';
var transaction_id = context.transaction.id;
var param = context.jobconfig.data_out.param;
var memstore = context.task.memstore;
var jobcaller = context.task.jobcaller;
var acl_validator = context.acl_validator;
var in_type = request.type;
var data = (Array.isArray(request.data))?request.data:[request.data];
......@@ -16,6 +19,12 @@ function perform_function(context,request,response){
var prm_to = (Array.isArray(param.to))?param.to:[param.to];
var def_acl = [
{
'accept':true,
'resource':(job_vo=='$'||job_vo=='')?'*':job_vo + '.*'
}
];
data.forEach((dat)=>{
prm_to.forEach((jobprm)=>{
......@@ -25,7 +34,14 @@ function perform_function(context,request,response){
'data' : dat
}
var job=Utils.vm_execute_text(ev,jobprm)
call_to(dat,job);
var acp = acl_validator.isAccept(def_acl,{
"vo":job_vo,"service":ACL_SERVICE_NAME,"resource":job,"mode":"x"
});
if(acp){
call_to(dat,job);
}else{
console.log('[Warn] Calling job :: ' + job + ' -> Not Permited')
}
});
});
......
......@@ -6,13 +6,16 @@ var BinStream = ctx.getLib('lib/bss/binarystream_v1_1');
var bsdata = ctx.getLib('lib/model/bsdata');
var async = require('async');
const ACL_SERVICE_NAME = "storage";
function perform_function(context,request,response){
var job_id = context.jobconfig.job_id;
var job_vo = context.jobconfig._vo || '';
var transaction_id = context.transaction.id;
var param = context.jobconfig.data_out.param;
var memstore = context.task.memstore;
var storagecaller = context.task.storagecaller;
var acl_validator = context.acl_validator;
var output_type = request.input_type;
var data = (Array.isArray(request.data))?request.data:[request.data];
......@@ -30,6 +33,7 @@ function perform_function(context,request,response){
});
}
var dc_meta = {
"_jid" : job_id,
"_tid" : transaction_id,
......@@ -45,6 +49,13 @@ function perform_function(context,request,response){
});
}
var def_acl = [
{
'accept':true,
'resource':(job_vo=='$'||job_vo=='')?'*':job_vo + '.*'
}
];
var idx = 0;
async.whilst(
function() { return idx < data.length; },
......@@ -56,18 +67,29 @@ function perform_function(context,request,response){
'data' : data[idx]
}
var sname=Utils.vm_execute_text(ev,storage_name)
var sname=Utils.vm_execute_text(ev,storage_name);
if(sname){
send_storage(caller,dc_meta,el_data,sname,function(err){
if(!err){
idx++;
callback(null);
}else{
callback(new Error('storage error'));
}
//storage permission
var acp = acl_validator.isAccept(def_acl,{
"vo":job_vo,"service":ACL_SERVICE_NAME,"resource":sname,"mode":"w"
});
if(acp){
send_storage(caller,dc_meta,el_data,sname,function(err){
if(!err){
idx++;
callback(null);
}else{
callback(new Error('storage error'));
}
});
}else{
callback('storage permission error');
}
}else{
callback(new Error('invalid storage'));
callback('invalid storage');
}
},
......
......@@ -27,5 +27,9 @@
{
"name" : "bs.api.service",
"script" : "./serv-api.js"
},
{
"name" : "bs.trigger.nbudp",
"script" : "./serv-nbudptrigger.js"
}]
}
\ No newline at end of file
var ctx = require('./context');
var NBUdpTrigger = ctx.getLib('triggers/trg-nbudp');
var trg = NBUdpTrigger.create(ctx.config);
trg.start();
//NB-UDP
var ctx = require('../../context');
var cfg = ctx.config;
var amqp_cfg = ctx.config.amqp;
var ConnCtx = ctx.getLib('lib/conn/connection-context');
var QueueCaller = ctx.getLib('lib/amqp/queuecaller');
var EvenSub = ctx.getLib('lib/amqp/event-sub');
var TriggerRegis = require('./lib/triggerregis');
var dgram = require('dgram');
var server = dgram.createSocket('udp4');
var TRIGGER_TYPE = "nbudp";
var PORT = 19150;
var HOST = '0.0.0.0';
module.exports.create = function (cfg)
{
return new NBUdpTrigger(cfg);
}
function NBUdpTrigger(cfg)
{
this.config = cfg;
this.conn = ConnCtx.create(this.config);
this.mem = this.conn.getMemstore();
this.jobcaller = new QueueCaller({'url':amqp_cfg.url,'name':'bs_jobs_cmd'});
this.evs = new EvenSub({'url':amqp_cfg.url,'name':'bs_trigger_cmd'});
this.regis = TriggerRegis.create({'conn':this.config.memstore.url});
}
NBUdpTrigger.prototype.start = function ()
{
console.log('NBUDP_TRIGGER:Starting\t\t[OK]');
this._start_listener();
this._start_controller();
}
NBUdpTrigger.prototype._start_listener = function ()
{
console.log('NBUDP_TRIGGER:Starting Listener\t[OK]');
var self = this;
self.reset();
self.reload();
server.on('listening', function () {
console.log('NBUDP_TRIGGER:Listening\t[OK]');
});
server.on('message', function (message, remote) {
if(!message){return;}
var udpdata = message.toString();
var datachunk = udpdata.split(',');
if(datachunk.length>1)
{
var kname = datachunk[0];
var jobs = self.regis.findJob(kname);
jobs.forEach(function(item){
self._callJob(item.jobid,udpdata);
});
}
});
server.bind(PORT, HOST);
}
NBUdpTrigger.prototype.reload = function ()
{
this.regis.update(function(err){
if(!err){
console.log('NBUDP_TRIGGER:REG Update\t\t[OK]');
}else{
console.log('NBUDP_TRIGGER:REG Update\t\t[ERR]');
}
});
}
NBUdpTrigger.prototype.reset = function ()
{
this.regis.clean();
}
NBUdpTrigger.prototype._callJob = function(jobid,udpdata)
{
var udpmsg = udpdata.substring(udpdata.indexOf(',')+1);
var trigger_data = {
'object_type' : 'nbudp_data',
'udpdata' : udpdata,
'message' : udpmsg
}
var cmd = {
'object_type':'job_execute',
'source' : 'nbudp_trigger',
'jobId' : jobid,
'option' : {'exe_level':'secondary'},
'input_data' : {
'type' : 'bsdata',
'value' : {
'object_type':'bsdata',
'data_type' : 'object',
'data' : trigger_data
}
}
}
this.jobcaller.send(cmd);
}
NBUdpTrigger.prototype._start_controller = function ()
{
var self=this;
var topic = 'ctl.trigger.#';
self.evs.sub(topic,function(err,msg){
if(err){
console.log('NBUDP_TRIGGER:AMQP ERROR Restarting ...');
setTimeout(function(){
process.exit(1);
},5000);
}
if(!msg){return;}
var ctl = msg.data;
if(ctl.trigger_type != TRIGGER_TYPE && ctl.trigger_type != 'all')
{
return;
}
if(ctl.cmd == 'reload')
{
console.log('NBUDP_TRIGGER:CMD Reload\t\t[OK]');
self.reload();
}
});
}
var Redis = require('redis');
const KEYS = 'bs:regis:triggers';
const TRIGGER_TYPE = "nbudp";
module.exports.create = function(cfg)
{
return new TriggerRegister(cfg);
}
module.exports.mkRegis = mkRegis
function mkRegis(trigger,opt)
{
if(!trigger.keyname)
{
return null;
}
var vo = trigger.vo || '';
if(vo=='$'){vo=''}
var a = {
'vo':vo,
'keyname' : trigger.keyname,
'jobid' : trigger.job_id
}
if(opt){a.opt = opt}
return a;
}
function TriggerRegister(cfg)
{
this.config = cfg;
if(cfg.conn){
this.mem = Redis.createClient(cfg.conn);
}else if(cfg.redis){
this.mem = cfg.redis;
}else{
this.mem = null;
}
this.regis = [];
}
TriggerRegister.prototype.add = function(rg)
{
if(!rg){return;}
var found = false;
this.regis.forEach( function (val) {
if(val.vo == rg.vo && val.keyname == rg.keyname && val.jobid == rg.jobid){
found = true;
}
});
if(!found){
this.regis.push(rg);
}
}
TriggerRegister.prototype.clean = function()
{
this.regis = [];
}
TriggerRegister.prototype.update = function(cb)
{
var self=this;
self.clean()
self.mem.hgetall(KEYS,function (err,res){
if(!err && res){
var ks = Object.keys(res);
for(var i=0;i<ks.length;i++)
{
var k = ks[i];
var trigger = JSON.parse(res[k]);
if(trigger.type == TRIGGER_TYPE)
{
var reg = mkRegis(trigger);
self.add(reg);
}
}
}
cb(err);
});
}
TriggerRegister.prototype.findJob= function(kname)
{
var jobs = [];
this.regis.forEach( function (val) {
var fullkey = (val.vo == '')?val.keyname:val.vo + ':' + val.keyname;
if(fullkey == kname){
jobs.push(val);
}
});
return jobs;
}
{
"version":"1.2",
"build":"201806131400"
"build":"201807101500"
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment