Unverified Commit 51a259a3 authored by Kamron Aroonrua's avatar Kamron Aroonrua 💬 Committed by GitHub

Merge pull request #2 from igridproject/dev

Dev
parents 2c3fddf5 eb8f6083
#Changelog
## [1.2] - 2018-08-11
### Fixed
- BS :: job registering with no token bug
## [UR] - 2018-07-10
### Added
- TRIGGER :: nbudp
### Fixed
- PLUGIN :: storage acl supported
- PLUGIN :: call acl supported
## [UR] - 2018-06-xx
### Added
- TRIGGER :: httplistener supported POST text/*,binary
- BS :: tokenization & access control
- SS :: REST API (v1.2ur) new object api
- SS :: object indexing
### Fixed
### Changed
- SS :: object address api 1.2 [ <storage_name>$obj_id ]
### Removed
## [UR] - 2018-05-xx
### Added
- SS :: Ipc Channel
- SS :: REST API (v1.1ur) method PUT DELETE
- PLUGIN :: do-storage with ipc channel supported
### Fixed
- WORKER :: active/unactive job bugs
### Changed
### Removed
\ No newline at end of file
module.exports = {
'amqp' : require('./amqp.json'),
'memstore' : require('./memstore.json'),
'storage' : require('./storage.json')
'storage' : require('./storage.json'),
'auth' : {
'secret': require('./secret.json'),
'acl' : require('./acl.json')
}
}
[
{"accept":true}
]
{
"type" : "rabbitmq",
"url" : "amqp://127.0.0.1"
"url" : "amqp://rabbitmq-server"
}
\ No newline at end of file
{
"type" : "redis",
"url" : "redis://127.0.0.1:6379/1"
"url" : "redis://redis-server:6379/1"
}
\ No newline at end of file
{
"type":"text",
"value":"bigstream-server"
}
\ No newline at end of file
{
"api_hostname" : "http://localhost:19080",
"api_hostname" : "http://bigstream-server:19080",
"repository" : "/var/bigstream/data"
}
\ No newline at end of file
......@@ -20,3 +20,20 @@ module.exports.getPlugins = function(type,name)
module.exports.sysenv = {
}
module.exports.getServiceUrl = function(port,opt)
{
return 'tcp://0.0.0.0:' + String(port);
}
module.exports.getClientUrl = function(port,opt)
{
return 'tcp://127.0.0.1:' + String(port);
}
module.exports.getUnixSocketUrl = function(name){
var sockname = name || 'test.sock';
return 'unix://' + __dirname + '/tmp/' + sockname;
}
//module.exports.socket_dir = __dirname + '/tmp';
//module.exports.tmp_dir = __dirname + '/tmp';
......@@ -5,6 +5,11 @@ var app = express();
var bodyParser = require('body-parser');
var ConnCtx = ctx.getLib('lib/conn/connection-context');
var Tokenizer = ctx.getLib('lib/auth/tokenizer');
var ACLValidator = ctx.getLib('lib/auth/acl-validator');
var jwt = require('express-jwt');
var JobManager = require('./lib/job-manager');
var TriggerManager = require('./lib/trigger-manager')
......@@ -32,19 +37,30 @@ ControllerAPI.prototype.start = function()
ControllerAPI.prototype._http_start = function()
{
var self = this;
var auth_cfg = this.config.auth;
app.use(bodyParser.json({limit: '128mb'}));
app.use(bodyParser.json({limit: '64mb'}));
app.use(bodyParser.urlencoded({
extended: true
}));
var context = ctx.getLib('lib/ws/http-context');
this.acl_validator = ACLValidator.create(auth_cfg);
app.use(context.middleware({
'conn' : self.conn,
'acl_validator':self.acl_validator,
'jobManager' : JobManager.create({'conn' : self.conn}),
'triggerManager' : TriggerManager.create({'conn' : self.conn})
}));
var tokenizer = Tokenizer.create(auth_cfg);
app.use(tokenizer.middleware());
app.use(function (err, req, res, next) {
if (err.name === 'UnauthorizedError') {
res.status(401).send('invalid token');
}
});
app.use(require('./ws'));
app.listen(API_PORT, function () {
......
......@@ -62,8 +62,10 @@ JobManager.prototype.setJob = function (prm,cb)
{
var self = this;
var job = prm.job;
var vo = prm.vo || "";
if(JUtils.validate(job)){
job._vo = vo;
self.job_registry.setJob(job.job_id,job);
if(job.trigger){
self.trigger_registry.setByJob(job,cb);
......
......@@ -29,7 +29,7 @@ TriggerManager.prototype.reload = function (prm,cb)
var msg = {
'trigger_type' : 'all',
'cmd' : 'reload',
'prm' : {}
'prm' : {'vo':prm.vo}
}
self.evp.send(topic,msg);
......
var schedule = require('node-schedule');
var ctx = require('../context');
var cfg = ctx.config;
var amqp_cfg = ctx.config.amqp;
var ConnCtx = ctx.getLib('lib/conn/connection-context');
var StorageEventList = ctx.getLib('lib/mems/storage-eventlist');
var CronList = ctx.getLib('lib/mems/cronlist');
var QueueCaller = ctx.getLib('lib/amqp/queuecaller');
var EvenSub = ctx.getLib('lib/amqp/event-sub');
var TRIGGER_TYPE = "storage";
module.exports.create = function (cfg)
{
return new StorageTrigger(cfg);
}
function StorageTrigger(cfg)
{
this.config = cfg;
this.conn = ConnCtx.create(this.config);
this.mem = this.conn.getMemstore();
this.jobcaller = new QueueCaller({'url':amqp_cfg.url,'name':'bs_jobs_cmd'});
this.evs = new EvenSub({'url':amqp_cfg.url,'name':'bs_trigger_cmd'});
this.storage_ev = new EvenSub({'url':amqp_cfg.url,'name':'bs_storage'});
this.sel = StorageEventList.create({'conn':this.config.memstore.url});
}
StorageTrigger.prototype.start = function ()
{
console.log('STORAGE_TRIGGER:Starting\t\t[OK]');
this._start_listener();
this._start_controller();
}
StorageTrigger.prototype._start_listener = function ()
{
console.log('STORAGE_TRIGGER:Starting Listener\t[OK]');
var self = this;
self.reset();
self.reload();
self.storage_ev.sub('storage.#.dataevent.newdata',(err,msg)=>{
if(!err){
var topic = msg.topic;
var storage_name = topic.substr(8,topic.length-26);
var sdata = msg.data;
sdata.storage_name = storage_name;
var jobs = self.sel.findJob(storage_name);
jobs.forEach(function(item){
self._callJob(item.jobid,sdata);
});
}
});
}
StorageTrigger.prototype.reload = function ()
{
this.sel.update(function(err){
if(!err){
console.log('STORAGE_TRIGGER:SEL Update\t\t[OK]');
}else{
console.log('STORAGE_TRIGGER:SEL Update\t\t[ERR]');
}
});
}
StorageTrigger.prototype.reset = function ()
{
this.sel.clean();
}
StorageTrigger.prototype._callJob = function(jobid,sl)
{
var storage_ev_data = {
'object_type' : 'storage_event_data',
'event':sl.event,
'storage_name' : sl.storage_name,
'resource_id' : sl.resource_id,
'resource_location':sl.resource_location
}
var cmd = {
'object_type':'job_execute',
'source' : 'storage_trigger',
'jobId' : jobid,
'option' : {'exe_level':'secondary'},
'input_data' : {
'type' : 'bsdata',
'value' : {
'object_type':'bsdata',
'data_type' : 'object',
'data' : storage_ev_data
}
}
}
this.jobcaller.send(cmd);
}
StorageTrigger.prototype._start_controller = function ()
{
var self=this;
var topic = 'ctl.trigger.#';
self.evs.sub(topic,function(err,msg){
if(err){
console.log('STORAGE_TRIGGER:AMQP ERROR Restarting ...');
setTimeout(function(){
process.exit(1);
},5000);
}
if(!msg){return;}
var ctl = msg.data;
if(ctl.trigger_type != TRIGGER_TYPE && ctl.trigger_type != 'all')
{
return;
}
if(ctl.cmd == 'reload')
{
console.log('STORAGE_TRIGGER:CMD Reload\t\t[OK]');
self.reload();
}
});
}
......@@ -2,5 +2,6 @@ var express = require('express')
, router = express.Router()
router.use('/v1',require('./v1'));
router.use('/v1.2',require('./v1.2'));
module.exports = router;
var express = require('express');
var router = express.Router();
router.use('/jobs',require('./ws-jobs'));
module.exports = router;
var ctx = require('../../../context');
var express = require('express');
var router = express.Router();
var cfg = ctx.config;
var response = ctx.getLib('lib/ws/response');
var request = ctx.getLib('lib/ws/request');
var Tokenizer = ctx.getLib('lib/auth/tokenizer');
const ACL_SERVICE_NAME = "job";
router.get('/',function (req, res) {
var reqHelper = request.create(req);
var respHelper = response.create(res);
var jm = req.context.jobManager;
var acl_validator = req.context.acl_validator;
var tInfo = Tokenizer.info(req.auth);
var result=[];
jm.listJob({},function (err,jobs){
jobs.forEach(job => {
var acp = acl_validator.isAccept(tInfo.acl,{
"vo":tInfo,"service":ACL_SERVICE_NAME,"resource":job,"mode":"l"
});
if(acp){
result.push(job);
}
});
respHelper.responseOK(result);
});
});
router.get('/:jid',function (req, res) {
var reqHelper = request.create(req);
var respHelper = response.create(res);
var jid = req.params.jid;
var jm = req.context.jobManager;
var acl_validator = req.context.acl_validator;
var tInfo = Tokenizer.info(req.auth);
var acp = acl_validator.isAccept(tInfo.acl,{
"vo":tInfo.vo,"service":ACL_SERVICE_NAME,"resource":jid,"mode":"r"
});
if(!acp){
return respHelper.response401();
}
jm.getJob({'job_id':jid},function (err,jobs){
if(jobs)
{
respHelper.responseOK(jobs);
}else{
respHelper.response404('Not found');
}
})
});
router.delete('/:jid',function (req, res) {
var reqHelper = request.create(req);
var respHelper = response.create(res);
var q = reqHelper.getQuery();
var jid = req.params.jid;
var jm = req.context.jobManager;
var tm = req.context.triggerManager;
var acl_validator = req.context.acl_validator;
var tInfo = Tokenizer.info(req.auth);
var acp = acl_validator.isAccept(tInfo.acl,{
"vo":tInfo.vo,"service":ACL_SERVICE_NAME,"resource":jid,"mode":"w"
});
if(!acp){
return respHelper.response401();
}
jm.deleteJob({'job_id':jid},function(err){
if(q.reload){
tm.reload();
}
respHelper.response200();
});
});
router.post('/',function (req, res) {
var reqHelper = request.create(req);
var respHelper = response.create(res);
var q = reqHelper.getQuery();
var jm = req.context.jobManager;
var tm = req.context.triggerManager;
var acl_validator = req.context.acl_validator;
var tInfo = Tokenizer.info(req.auth);
var json_job = req.body || {};
var jid = json_job.job_id || "";
var acp = acl_validator.isAccept(tInfo.acl,{
"vo":tInfo.vo,"service":ACL_SERVICE_NAME,"resource":jid,"mode":"w"
});
if(!acp){
return respHelper.response401();
}
jm.setJob({'job':json_job,'vo':tInfo.vo},function(err,res){
if(err)
{
respHelper.response400(err);
}else{
if(q.reload){
tm.reload({'vo':tInfo.vo});
}
respHelper.response201();
}
});
});
router.post('/action',function (req, res) {
var reqHelper = request.create(req);
var respHelper = response.create(res);
var q = reqHelper.getQuery();
var jm = req.context.jobManager;
var tm = req.context.triggerManager;
var acl_validator = req.context.acl_validator;
var tInfo = Tokenizer.info(req.auth);
var acp = acl_validator.isAccept(tInfo.acl,{
"vo":tInfo.vo,"service":ACL_SERVICE_NAME + '.action',"resource":"","mode":"x"
});
if(!acp){
return respHelper.response401();
}
var action = req.body;
jm.action({'action':action},function(err){
if(err)
{
respHelper.response400(err.message);
}else{
if(q.reload){
tm.reload({'vo':tInfo.vo});
}
respHelper.response201();
}
});
});
module.exports = router;
......@@ -50,8 +50,11 @@ HTTPListener.prototype._http_start = function()
app.use(bodyParser.json({limit: '128mb'}));
app.use(bodyParser.urlencoded({
extended: true
extended: true,
limit: '128mb'
}));
app.use(bodyParser.raw({limit: '128mb'}));
app.use(bodyParser.text({limit: '128mb',type:"text/*"}));
var context = require('./lib/http-context');
app.use(context.middleware({
......
......@@ -10,7 +10,7 @@ var response = ctx.getLib('lib/ws/response');
var request = ctx.getLib('lib/ws/request');
var process_get = function(req, res) {
var process_req = function(req, res ,method) {
var reqHelper = request.create(req);
var respHelper = response.create(res);
var appkey = req.params.akey;
......@@ -20,7 +20,7 @@ var process_get = function(req, res) {
//var evp = req.context.evp;
var jobcaller = req.context.jobcaller;
var j = httpacl.findJob(appkey,'get');
var j = httpacl.findJob(appkey,method);
var topic_prex = 'cmd.execute.';
......@@ -28,8 +28,14 @@ var process_get = function(req, res) {
j.forEach(function(item){
var httpdata = {
'object_type' : 'httpdata',
'method' : 'get',
'data' : reqHelper.getQuery()
'method' : method,
'data' : {}
}
if(method=='get'){
httpdata.data = reqHelper.getQuery();
}else if(method=='post'){
httpdata.data = req.body;
}
var job_execute_msg = {
......@@ -40,6 +46,7 @@ var process_get = function(req, res) {
'input_data' : {
'type' : 'bsdata',
'value' : {
'object_type':'bsdata',
'data_type' : 'object',
'data' : httpdata
}
......@@ -62,7 +69,8 @@ var process_get = function(req, res) {
}
}
router.get('/:akey',process_get);
router.get('/:akey',function(req, res){process_req(req,res,'get')});
router.post('/:akey',function(req, res){process_req(req,res,'post')});
module.exports = router;
......@@ -18,6 +18,8 @@ function JobTask (prm)
this.handle = prm.handle;
this.mem = prm.handle.mem;
this.jobcaller = prm.handle.jobcaller;
this.storagecaller = prm.handle.storagecaller;
this.acl_validator = prm.handle.acl_validator
this.jobcfg = prm.job_config;
this.input_meta = prm.input_meta;
......@@ -86,6 +88,7 @@ JobTask.prototype.run = function ()
}
var context = {
"acl_validator" : this.acl_validator,
"jobconfig" : job_tr_config,
"transaction" : ctx_transaction,
"input" : {'data':obj_input_data,'meta':input_meta} ,
......@@ -290,9 +293,11 @@ function perform_do(prm,cb)
var DOTask = getPlugins('do',do_cfg.type);
var doMem = new memstore({'job_id':job_id,'cat':'do','mem':prm.handle.mem});
var jobcaller = prm.handle.jobcaller;
var storagecaller = prm.handle.storagecaller;
do_context.task = {
"memstore" : doMem,
"jobcaller" : jobcaller
"jobcaller" : jobcaller,
"storagecaller" : storagecaller
}
var dout = new DOTask(do_context,prm.request);
......@@ -324,6 +329,7 @@ function getInputData(obj)
if(obj.type == 'bsdata')
{
var inp = bsdata.parse(obj.value);
if(!inp){ return {}}
return inp.data;
}else{
return {};
......
......@@ -36,7 +36,7 @@ JT.prototype.run = function (done)
job_registry.getJob(jobId,function(err,data){
if(!data){
callback('job ' + jobId + ' :: does not exits');
}else if(!data.active){
}else if(data.active.toString()=='false'){
callback('job ' + jobId + ' :: unactive');
}else{
callback(err,data);
......
......@@ -5,8 +5,13 @@ var QueueCaller = ctx.getLib('lib/amqp/queuecaller');
var QueueReceiver = ctx.getLib('lib/amqp/queuereceiver');
var ConnCtx = ctx.getLib('lib/conn/connection-context');
var JobRegistry = ctx.getLib('lib/mems/job-registry');
var SSCaller = ctx.getLib('lib/axon/rpccaller');
var ACLValidator = ctx.getLib('lib/auth/acl-validator');
var JobTransaction = require('./lib/jobtransaction')
//const SS_URL = 'tcp://127.0.0.1:19030'
var SS_URL = ctx.getUnixSocketUrl('ss.sock');
module.exports.create = function(prm)
{
var jw = new JW(prm);
......@@ -17,6 +22,7 @@ var JW = function JobWorker (prm)
{
var param = prm || {};
this.config = param.config || cfg;
this.auth_cfg = this.config.auth;
this.instance_name = param.name;
this.conn = ConnCtx.create(this.config);
......@@ -24,6 +30,9 @@ var JW = function JobWorker (prm)
this.jobcaller = new QueueCaller({'url':this.conn.getAmqpUrl(),'name':'bs_jobs_cmd'});
this.job_registry = JobRegistry.create({'redis':this.mem});
this.acl_validator = ACLValidator.create(this.auth_cfg);
this.storagecaller = new SSCaller({'url':SS_URL});
}
JW.prototype.start = function ()
......
var mmatch = require("minimatch");
module.exports.create = function(prm)
{
return new ACLValidator(prm);
}
function ACLValidator(prm)
{
this.acl = prm.acl;
if(!Array.isArray(this.acl)){
this.acl=[];
}
}
ACLValidator.prototype.appendACL = function(aacl)
{
if(Array.isArray(aacl))
{
this.acl.concat(aacl);
}
}
ACLValidator.prototype.isAccept = function(tkacl,prm)
{
var ret=true;
var chk = {
"vo":prm.vo||"",
"service":prm.service||"",
"resource":prm.resource||"",
"mode":prm.mode||""
}
var the_acl = this.acl;
if(Array.isArray(tkacl))
{
the_acl = this.acl.concat(tkacl);
}
the_acl.forEach((rule) => {
if(rulematch(chk,rule) && typeof rule.accept == 'boolean'){
ret = rule.accept;
}
});
function rulematch(chk,rule)
{
var m_vo = (!rule.vo)?true:mmatch(chk.vo,rule.vo);
var m_service = (!rule.service)?true:mmatch(chk.service,rule.service);
var m_resource = (!rule.resource)?true:mmatch(chk.resource,rule.resource);
var m_mode = (!rule.mode)?true:mmatch(chk.mode,rule.mode);
return m_vo && m_service && m_resource && m_mode;
}
return ret;
}
var jwt = require('express-jwt');
module.exports.create = function(cfg)
{
return new Tokenizer(cfg);
}
module.exports.info = function(tok)
{
var ret = {}
if(!tok){
ret = {"vo":"",acl:""}
}else{
ret = {
"vo":tok.vo || "",
"acl":tok.acl || []
}
}
return ret;
}
function Tokenizer(cfg)
{
//this.config = cfg;
this.cfg_auth = cfg
}
Tokenizer.prototype.getJWTSecret = function()
{
if(process.env.BS_SECRET)
{
return {'type':'env','value':process.env.BS_SECRET};
}else{
return this.cfg_auth.secret;
}
}
Tokenizer.prototype.middleware = function()
{
var self = this;
var jt = jwt({
secret: self.getJWTSecret().value,
requestProperty: 'auth',
credentialsRequired: false,
getToken: function fromHeaderOrQuerystring (req) {
if (req.headers.authorization && req.headers.authorization.split(' ')[0] === 'Bearer') {
return req.headers.authorization.split(' ')[1];
} else if (req.query && req.query.token) {
return req.query.token;
}
return null;
}
});
return jt;
}
\ No newline at end of file
var axon = require('axon');
var thunky = require('thunky');
function RPCCaller(config)
{
this.config = config;
this.url = config.url;
this.name = config.name || "rpc_queue";
this.sock = axon.socket('req');
var self = this;
this.opened = false;
this.open = thunky(open);
this.open();
function open (cb) {
//console.log('OPEN IPC >>>> ' + self.url);
self.sock.connect(self.url,function(){
//console.log('OPENED');
self.opened = true;
cb();
});
}
}
RPCCaller.prototype.call = function(req,cb)
{
var self = this;
//console.log('CALLING>>>');
this.open(()=>{
self.sock.send(req,(res)=>{
//console.log(req);
cb(null,res);
})
});
}
RPCCaller.prototype.close = function()
{
this.sock.close();
}
module.exports = RPCCaller;
var axon = require('axon');
var fs = require('fs');
function RPCServer(config)
{
this.config = config;
this.url = config.url;
this.name = config.name || "rpc_queue";
this.remote_function = null;
this.sock = axon.socket('rep');
}
RPCServer.prototype.start = function(cb)
{
var self = this;
self.sock.bind(self.url,function(){
if(typeof cb == 'function'){cb();}
});
self.sock.on('message', function(req, reply){
self.remote_function(req,function(err,resp){
reply(resp);
});
});
}
RPCServer.prototype.set_remote_function = function(func){
this.remote_function = func;
}
module.exports = RPCServer;
......@@ -216,5 +216,6 @@ Storage.prototype.reader = function(prm){
}
Storage.prototype.close = function(cb){
this.write_queue.pause( )
this.file.close(cb);
};
......@@ -67,7 +67,9 @@ HttpACL.prototype.update = function(cb)
var trigger = JSON.parse(res[k]);
if(trigger.type == 'http')
{
var acl = mkACL(trigger.appkey,trigger.method,trigger.job_id);
var _vo = trigger.vo || '';
var akey = (_vo=='$' || _vo=='')?trigger.appkey:_vo + '.' + trigger.appkey;
var acl = mkACL(akey,trigger.method,trigger.job_id);
self.add(acl);
}
}
......@@ -75,15 +77,6 @@ HttpACL.prototype.update = function(cb)
cb(err);
});
// this.mem.get(PREFIX, function (err, result) {
// if(!err && result){
//
// self.acl = JSON.parse(result);
// }
// cb(err);
// });
}
// HttpACL.prototype.commit = function(cb)
......
var Redis = require('redis');
const KEYS = 'bs:regis:triggers';
module.exports.create = function(cfg)
{
return new StorageEventList(cfg);
}
module.exports.mkSEL = mkSEL;
function mkSEL(storage_name,jobid,opt)
{
var a = {
'storage_name' : storage_name,
'jobid' : jobid
}
if(opt){a.opt = opt}
return a;
}
function StorageEventList(cfg)
{
this.config = cfg;
if(cfg.conn){
this.mem = Redis.createClient(cfg.conn);
}else if(cfg.redis){
this.mem = cfg.redis;
}else{
this.mem = null;
}
this.sel = [];
}
StorageEventList.prototype.add = function(sel)
{
var found = false;
this.sel.forEach( function (val) {
if(val.storage_name == sel.storage_name && val.jobid == sel.jobid){
found = true;
}
});
if(!found){
this.sel.push(sel);
}
}
StorageEventList.prototype.clean = function()
{
this.sel = [];
}
StorageEventList.prototype.update = function(cb)
{
var self=this;
self.clean()
self.mem.hgetall(KEYS,function (err,res){
if(!err && res){
var ks = Object.keys(res);
for(var i=0;i<ks.length;i++)
{
var k = ks[i];
var trigger = JSON.parse(res[k]);
if(trigger.type == 'storage')
{
var sel = mkSEL(trigger.storage_name,trigger.job_id);
self.add(sel);
}
}
}
cb(err);
});
}
StorageEventList.prototype.findJob= function(sname)
{
var jobs = [];
this.sel.forEach( function (val) {
if(val.storage_name == sname){
jobs.push(val);
}
});
return jobs;
}
......@@ -52,8 +52,10 @@ TR.prototype.setByJob = function(job,cb)
{
var self = this;
var id = 'def.' + job.job_id;
var vo = job._vo;
var trigger = job.trigger;
trigger.id = id;
trigger.vo = vo;
trigger.job_id = job.job_id;
self.setTrigger(id,trigger,cb);
}
......
......@@ -18,7 +18,7 @@ module.exports.parse = function(obj)
}else if(obj instanceof Buffer){
var jsonobj = BSON.parse(obj);
return new BSData(jsonobj.data,jsonobj.data_type);
}else if(obj.data_type && obj.data){
}else if( ((obj.object_type && obj.object_type == 'bsdata')||(obj.type && obj.type == 'bsdata')) && obj.data_type && obj.data){
var oData = obj.data;
if(obj.encoding == 'base64'){
oData = new Buffer(obj.data,'base64');
......
......@@ -76,6 +76,14 @@ responseHelper.prototype.response400 = function(msg){
}
}
responseHelper.prototype.response401 = function(msg){
if(msg){
this.response.status(401).json({response:'ERROR',message:msg});
}else{
this.response.status(401).send('unauthorized');
}
}
responseHelper.prototype.response403 = function(msg){
if(msg){
this.response.status(403).json({response:'ERROR',message:msg});
......
......@@ -20,7 +20,10 @@
"buffalo": "^0.1.3",
"dateformat": "^1.0.12",
"express": "^4.14.0",
"express-jwt": "^5.3.1",
"ioredis": "^2.5.0",
"jsonwebtoken": "^8.2.2",
"minimatch": "^3.0.4",
"minimist": "^1.2.0",
"moment": "^2.17.1",
"node-gyp": "^3.6.2",
......
var vm = require('vm');
var request = require('request').defaults({ encoding: null });
function execute_function(context,response){
......@@ -10,9 +11,23 @@ function execute_function(context,response){
var output_type = 'text';
var url = param.url;
var env = {
'input' : {
'meta' : input_meta,
'data' : input_data
},
'url' : ''
}
var script = new vm.Script("url=`" + url + "`");
var context = new vm.createContext(env);
script.runInContext(context);
url = env.url;
var reject = true;
if(param.reject==false){reject=false;}
if(typeof param.reject != 'undefined' && param.reject.toString()=="false"){reject=false;}
var encode = 'utf8';
if(param.encoding == 'binary'){
......@@ -22,7 +37,21 @@ function execute_function(context,response){
if(param.encoding=='json'){output_type='object'}
request({'url':url, 'encoding':encode}, function (error, resp, body) {
//Http Header
var http_headers = {};
if(param.auth){
if(param.auth.type == 'basic'){
var auth_header = "Basic " + new Buffer(param.auth.username + ":" + param.auth.password).toString("base64");
http_headers.Authorization = auth_header;
}
}
if(typeof param.headers == 'object')
{
http_headers = Object.assign(http_headers,param.headers)
}
request({'method': 'GET','url':url,'headers':http_headers ,'encoding':encode}, function (error, resp, body) {
response.meta = {'_status_code':(error)?0:resp.statusCode,'_error':(error)?true:false}
if (!error && resp.statusCode == 200) {
if(param.encoding=='json'){
......
......@@ -23,8 +23,14 @@ function perform_function(context,response){
var last_mod = {'fname':'','tts':0};
var fs_continue = false;
var buff_out = new Buffer(0);
if(param.last_modify_ts)
{
last_mod.tts = param.last_modify_ts*1000;
}
memstore.getItem('lastmodify',function(err,value){
if(value){
if(value && value.tts > last_mod.tts){
last_mod=value;
}
getData();
......
......@@ -2,13 +2,16 @@
var ctx = require('../../../context');
var Utils = ctx.getLib('lib/util/plugin-utils');
var bsdata = ctx.getLib('lib/model/bsdata');
const ACL_SERVICE_NAME = "job";
function perform_function(context,request,response){
var job_id = context.jobconfig.job_id;
var job_vo = context.jobconfig._vo || '';
var transaction_id = context.transaction.id;
var param = context.jobconfig.data_out.param;
var memstore = context.task.memstore;
var jobcaller = context.task.jobcaller;
var acl_validator = context.acl_validator;
var in_type = request.type;
var data = (Array.isArray(request.data))?request.data:[request.data];
......@@ -16,6 +19,12 @@ function perform_function(context,request,response){
var prm_to = (Array.isArray(param.to))?param.to:[param.to];
var def_acl = [
{
'accept':true,
'resource':(job_vo=='$'||job_vo=='')?'*':job_vo + '.*'
}
];
data.forEach((dat)=>{
prm_to.forEach((jobprm)=>{
......@@ -25,7 +34,14 @@ function perform_function(context,request,response){
'data' : dat
}
var job=Utils.vm_execute_text(ev,jobprm)
var acp = acl_validator.isAccept(def_acl,{
"vo":job_vo,"service":ACL_SERVICE_NAME,"resource":job,"mode":"x"
});
if(acp){
call_to(dat,job);
}else{
console.log('[Warn] Calling job :: ' + job + ' -> Not Permited')
}
});
});
......
......@@ -12,23 +12,31 @@ function perform_function(context,request,response){
var token = param.token;
var message = data;
var image_url = null;
if(param.message){
var env = {
'type' : output_type,
'data' : data,
'meta' : meta,
'msg' : data
'msg' : data,
'img' : null
}
var txt_script = ""
var script = new vm.Script("msg=`" + param.message + "`");
if(param.image_url){
env.msg="";
txt_script += "img=`" + param.image_url + "`; "
}
if(param.message){txt_script += "msg=`" + param.message + "`; "}
var script = new vm.Script(txt_script);
var context = new vm.createContext(env);
script.runInContext(context);
message = env.msg;
}
message = (typeof env.msg == 'string')?env.msg:JSON.stringify(env.msg);
image_url = env.img;
post_to_line(token,message,function(err){
post_to_line(token,message,{'imgurl':image_url},function(err){
if(!err){
response.success();
}else{
......@@ -40,7 +48,7 @@ function perform_function(context,request,response){
//response.error("error message")
}
function post_to_line(token,msg,cb)
function post_to_line(token,msg,resource,cb)
{
var options = { method: 'POST',
......@@ -49,7 +57,20 @@ function post_to_line(token,msg,cb)
{ 'cache-control': 'no-cache',
'authorization' : 'Bearer ' + token,
'content-type': 'multipart/form-data' },
formData: { message: String(msg) } };
formData: {
message: String(msg)
}
};
if(typeof resource == 'function')
{
cb=resource;
}else {
if(resource.imgurl){
options.formData.imageThumbnail = resource.imgurl;
options.formData.imageFullsize = resource.imgurl;
}
}
request(options, function (err, resp, body) {
if (!err && resp.statusCode==200) {
......
......@@ -6,12 +6,16 @@ var BinStream = ctx.getLib('lib/bss/binarystream_v1_1');
var bsdata = ctx.getLib('lib/model/bsdata');
var async = require('async');
const ACL_SERVICE_NAME = "storage";
function perform_function(context,request,response){
var job_id = context.jobconfig.job_id;
var job_vo = context.jobconfig._vo || '';
var transaction_id = context.transaction.id;
var param = context.jobconfig.data_out.param;
var memstore = context.task.memstore
var memstore = context.task.memstore;
var storagecaller = context.task.storagecaller;
var acl_validator = context.acl_validator;
var output_type = request.input_type;
var data = (Array.isArray(request.data))?request.data:[request.data];
......@@ -20,10 +24,15 @@ function perform_function(context,request,response){
var amqp_cfg = ctx.config.amqp;
var storage_name = param.storage_name;
var caller = new RPCCaller({
var caller = storagecaller;
if(param.channel!='ipc'){
caller = new RPCCaller({
url : amqp_cfg.url,
name :'storage_request'
});
}
var dc_meta = {
"_jid" : job_id,
......@@ -34,12 +43,19 @@ function perform_function(context,request,response){
if(meta && typeof meta == 'object')
{
Object.keys(meta).forEach((item)=>{
if(!item.startsWith('_')){
if(!item.startsWith('_') || item=='_key'){
dc_meta[item] = meta[item];
}
});
}
var def_acl = [
{
'accept':true,
'resource':(job_vo=='$'||job_vo=='')?'*':job_vo + '.*'
}
];
var idx = 0;
async.whilst(
function() { return idx < data.length; },
......@@ -51,8 +67,15 @@ function perform_function(context,request,response){
'data' : data[idx]
}
var sname=Utils.vm_execute_text(ev,storage_name)
var sname=Utils.vm_execute_text(ev,storage_name);
if(sname){
//storage permission
var acp = acl_validator.isAccept(def_acl,{
"vo":job_vo,"service":ACL_SERVICE_NAME,"resource":sname,"mode":"w"
});
if(acp){
send_storage(caller,dc_meta,el_data,sname,function(err){
if(!err){
idx++;
......@@ -62,7 +85,11 @@ function perform_function(context,request,response){
}
});
}else{
callback(new Error('invalid storage'));
callback('storage permission error');
}
}else{
callback('invalid storage');
}
},
......@@ -93,7 +120,7 @@ function send_storage(caller,dc_meta,dc_data,storage_name,cb)
}
}
}
//console.log(req);
caller.call(req,function(err,resp){
if(!err && resp.status=='OK'){
cb(null);
......
......@@ -38,6 +38,10 @@ function perform_function(context,request,response){
var meta = mapenv.meta;
var output_type = mapenv.type;
if(param.to_binary && typeof data == 'string'){
data = Buffer.from(data, 'base64');
output_type = 'binary';
}
response.success(data,{'meta':meta,'output_type':output_type});
......
......@@ -14,7 +14,7 @@ function perform_function(context,request,response){
//prm_reject :: bool
//prm_name :: text
var prm_size = (param.size && Number(param.size)>0)?Number(param.size):1;
var prm_reject = (param.reject==false)?false:true;
var prm_reject = (typeof param.reject != 'undefined' && param.reject.toString()=="false")?false:true;
var prm_name = (param.name)?'windw-'+param.name:'windw';
var obj = {
......
var Jimp = require("jimp");
var async = require('async');
function avg_point(prm,cb)
{
var bg = prm.bg;
var fg = prm.fg;
var dpoint = prm.point;
var radius = prm.radius || 10;
var bg_threshold = prm.bg_threshold || 20;
var mapping_threshold = prm.mapping_threshold || 128;
var table = prm.table || [];
async.waterfall([
p_readbg,
p_readfg,
p_process,
], function (err, result) {
cb(err,result);
});
function p_readbg(callback) {
Jimp.read(bg, function (err, img) {
callback(null, img);
});
}
function p_readfg(ibg, callback) {
Jimp.read(fg, function (err, img2) {
callback(null, ibg,img2);
});
}
function p_process(ibg,ifg,callback)
{
var w = ifg.bitmap.width;
var h = ifg.bitmap.height;
if(!dpoint){dpoint=[Math.floor(w/2),Math.floor(h/2)]}
var x0 = (dpoint[0]-radius>0)?dpoint[0]-radius:0;
var y0 = (dpoint[1]-radius>0)?dpoint[1]-radius:0;
var x1 = (dpoint[0]+radius<w)?dpoint[0]+radius:w;
var y1 = (dpoint[1]+radius<h)?dpoint[1]+radius:h;
var sum_point = [];
var sum=0;
for(var i=x0;i<=x1;i++){
for(var j=y0;j<=y1;j++){
if(pointInCircle(i,j,dpoint[0],dpoint[1],radius)){
var bpx = Jimp.intToRGBA(ibg.getPixelColor(i, j));
var fpx = Jimp.intToRGBA(ifg.getPixelColor(i, j));
//Channel threshold
// if(bpx.r-fpx.r > bg_threshold || bpx.g-fpx.g > bg_threshold || bpx.b-fpx.b > bg_threshold){
// newimg.setPixelColor(Jimp.rgbaToInt(fpx.r, fpx.g, fpx.b, 255), i, j);
// }
//Color Distance threshold
if(distance([bpx.r,bpx.g,bpx.b],[fpx.r,fpx.g,fpx.b])>bg_threshold){
var mv = mapping(table,[fpx.r,fpx.g,fpx.b])
if(mv.value>0 && mv.distance<mapping_threshold){
sum_point.push(mv.value);
sum+=mv.value;
}
}else{
sum_point.push(0);
}
}
}
}
var avg = (sum_point.length>0)?sum/sum_point.length:0;
var mdn = (sum_point.length>0)?median(sum_point):0;
callback(null,{'avg':avg,'mdn':mdn});
}
function p_process_image(ibg,ifg,callback)
{
var w = ibg.bitmap.width;
var h = ibg.bitmap.height;
if(!dpoint){dpoint=[Math.floor(w/2),Math.floor(h/2)]}
var x0 = (dpoint[0]-radius>0)?dpoint[0]-radius:0;
var y0 = (dpoint[1]-radius>0)?dpoint[1]-radius:0;
var x1 = (dpoint[0]+radius<w)?dpoint[0]+radius:w;
var y1 = (dpoint[1]+radius<h)?dpoint[1]+radius:h;
var newimage = new Jimp(w,h,0x0000FFFF,(err,newimg)=>{
var sum_point = [];
for(var i=x0;i<=x1;i++){
for(var j=y0;j<=y1;j++){
if(pointInCircle(i,j,dpoint[0],dpoint[1],radius)){
var bpx = Jimp.intToRGBA(ibg.getPixelColor(i, j));
var fpx = Jimp.intToRGBA(ifg.getPixelColor(i, j));
//Channel threshold
// if(bpx.r-fpx.r > bg_threshold || bpx.g-fpx.g > bg_threshold || bpx.b-fpx.b > bg_threshold){
// newimg.setPixelColor(Jimp.rgbaToInt(fpx.r, fpx.g, fpx.b, 255), i, j);
// }
//Color Distance threshold
if(distance([bpx.r,bpx.g,bpx.b],[fpx.r,fpx.g,fpx.b])>bg_threshold){
newimg.setPixelColor(Jimp.rgbaToInt(fpx.r, fpx.g, fpx.b, 255), i, j);
}
}
}
}
callback(null,newimg);
});
}
}
function mapping(map_table,color)
{
var dist = distance([0,0,0],[255,255,255]);
var out={'value':-1,'distance':dist}
if(!Array.isArray(map_table)){map_table=[];}
map_table.forEach((itm)=>{
if(itm.color && itm.value){
var d = distance(itm.color,color)
if(d<out.distance){
out.value = itm.value;
out.distance = d;
}
}
});
return out;
}
function median(values) {
values.sort( function(a,b) {return a - b;} );
var half = Math.floor(values.length/2);
if(values.length % 2)
return values[half];
else
return (values[half-1] + values[half]) / 2.0;
}
function pointInCircle(x, y, cx, cy, radius) {
var distancesquared = Math.pow(x - cx,2) + Math.pow(y - cy,2);
return Math.sqrt(distancesquared) <= radius;
}
function distance(a,b)
{
sum = 0;
sum = Math.pow(a[0] - b[0],2) + Math.pow(a[1] - b[1],2) + Math.pow(a[2] - b[2],2)
return Math.sqrt(sum)
}
module.exports.avg_point = avg_point;
var util = require('util');
var DTPlugin = require('../../dt-plugin');
function DTTask(context,request){
DTPlugin.call(this,context,request);
this.name = "radar-digitizer";
this.output_type = "";
}
util.inherits(DTTask,DTPlugin);
DTTask.prototype.perform = require('./perform');
module.exports = DTTask;
{
"name": "dt-radar-digitizer",
"version": "1.0.0",
"description": "",
"main": "run.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
},
"author": "",
"license": "ISC",
"dependencies": {
"jimp": "^0.2.28"
}
}
var vm = require('vm');
var hash = require('object-hash');
var digitizer = require("./digitizer.js");
function perform_function(context,request,response){
var job_id = context.jobconfig.job_id;
var transaction_id = context.transaction.id;
var param = context.task.config.param;
var memstore = context.task.memstore
var output_type = request.input_type;
var data = request.data;
var meta = request.meta || {};
//Parameters
var prm_backgroud = param.backgroud;
var prm_point = param.point;
var prm_radius = param.radius;
var prm_table = param.color_mapping || def_table();
var mapping_threshold = param.mapping_threshold;
var bg_threshold = param.backgroud_threshold;
var prefix = param.prefix || 'radar_';
var bg=null;
if(prm_backgroud)
{
if(typeof prm_backgroud == 'string')
{
bg = __dirname + '/img/' + prm_backgroud
}else if(typeof prm_backgroud == 'object' && prm_backgroud.base64){
bg = Buffer.from(prm_backgroud, 'base64');
}
}
var fg = data;
if(typeof data == 'object' && data.base64_data)
{
fg = Buffer.from(data, 'base64');
}
digitizer.avg_point({
'bg':bg,
'fg': fg,
'point':prm_point,
'table':prm_table,
'radius':prm_radius,
'mapping_threshold':mapping_threshold,
'bg_threshold ':bg_threshold
},function(err,res){
if(!err){
meta[prefix+'avg'] = res.avg;
meta[prefix+'median'] = res.mdn;
response.success(data,{'meta':meta,'output_type':output_type});
}else{
response.error(err)
}
})
// memstore.setItem('lasttransaction',transaction_id,function(err){
// response.success(data);
// });
// memstore.getItem('lasttransaction',function(err,value){
// response.success(value);
// });
//response.reject();
//response.error("error message")
}
function def_table()
{
var col_mapping = [
{"color":[0,0,0],"value":-1},
{"color":[255,255,255],"value":-1},
{"color":[0,254,130],"value":5.5},
{"color":[0,255,0],"value":10},
{"color":[0,173,0],"value":15},
{"color":[0,150,50],"value":20},
{"color":[255,255,0],"value":25},
{"color":[255,200,3],"value":30},
{"color":[255,170,0],"value":35},
{"color":[255,85,0],"value":41},
{"color":[255,0,0],"value":45},
{"color":[255,0,100],"value":50},
{"color":[255,0,255],"value":55},
{"color":[255,128,255],"value":60},
]
return col_mapping;
}
module.exports = perform_function;
//var Jimp = require("jimp");
var async = require('async');
var digitizer = require("./digitizer.js");
var Jimp = require("jimp");
var col_mapping = [
{"color":[0,0,0],"value":-1},
{"color":[255,255,255],"value":-1},
{"color":[0,254,130],"value":5.5},
{"color":[0,255,0],"value":10},
{"color":[0,173,0],"value":15},
{"color":[0,150,50],"value":20},
{"color":[255,255,0],"value":25},
{"color":[255,200,3],"value":30},
{"color":[255,170,0],"value":35},
{"color":[255,85,0],"value":41},
{"color":[255,0,0],"value":45},
{"color":[255,0,100],"value":50},
{"color":[255,0,255],"value":55},
{"color":[255,128,255],"value":60},
]
digitizer.avg_point({
'bg':'img/nck.jpg',
'fg':'D:\\Project\\radar_nck_process\\obj.jpg',
'point':[555,492],
'table':col_mapping,
'radius':10
},function(err,res){
console.log(res);
console.log(__dirname);
})
// var newimage = new Jimp(1600,1600,0x00000000,(err,newimg)=>{
// newimg.write( "out.png", (err,res)=>{console.log("OK");} )
// });
// Jimp.read("D:\\Project\\radar_nck_process\\base_bg.jpg", function (err, img) {
// // do stuff with the image (if no exception)
// img.write( "out.png", (err,res)=>{console.log("OK");} )
// });
//
// Jimp.read("D:\\Project\\radar_nck_process\\obj.jpg", function (err, img) {
// // do stuff with the image (if no exception)
// img.write( "out2.png", (err,res)=>{console.log("OK2");} )
// });
// var img = new Jimp("D:\\Project\\radar_nck_process\\base_bg.jpg");
// img.write( "out.png", (err,res)=>{console.log("OK");} )
......@@ -17,8 +17,8 @@
"instances" : 0
},
{
"name" : "bs.trigger.scheduler",
"script" : "./serv-scheduler.js"
"name" : "bs.trigger.core",
"script" : "./serv-coretrigger.js"
},
{
"name" : "bs.trigger.httplistener",
......@@ -27,5 +27,9 @@
{
"name" : "bs.api.service",
"script" : "./serv-api.js"
},
{
"name" : "bs.trigger.nbudp",
"script" : "./serv-nbudptrigger.js"
}]
}
\ No newline at end of file
var ctx = require('./context');
var SchedulerService = ctx.getLib('coreservice/scheduler');
var StorageEventService = ctx.getLib('coreservice/storage-trigger');
var ss = SchedulerService.create(ctx.config);
ss.start();
var ses = StorageEventService.create(ctx.config);
ses.start();
var ctx = require('./context');
var NBUdpTrigger = ctx.getLib('triggers/trg-nbudp');
var trg = NBUdpTrigger.create(ctx.config);
trg.start();
......@@ -5,18 +5,21 @@ var argv = require('minimist')(process.argv.slice(2));
//ss.start();
var m = {'read':false,'write':false}
if(argv['process-read']){m.read = true;}
if(argv['process-write']){m.write = true;}
var m = {'read':"false",'write':"false"}
if(argv['process-read']){m.read = "true";}
if(argv['process-write']){m.write = "true";}
if(argv['api-port']){ctx.config.storage.api_port = Number(argv['api-port']);}
//console.log(argv);
var ss = StorageService.create(ctx.config);
if(!m.read && !m.write)
if(m.read=="false" && m.write=="false")
{
ss.start();
}else{
if(m.read){ss.http_start();}
if(m.write){ss.amqp_start();}
if(m.read=="true"){ss.http_start();}
if(m.write=="true"){
ss.amqp_start();
ss.ipc_start();
}
}
......@@ -7,10 +7,16 @@ var bsdata = ctx.getLib('lib/model/bsdata');
var importer = require('./importer');
var dataevent = require('./dataevent');
var sutils = require('./storage-utils');
module.exports.create = function(prm)
{
return new BSSEngine(prm);
var ins = prm;
// var bssId = sutils.mkBssId(ins.repos_dir,ins.name,{'newInstance':ins.newInstance});
// ins.file = bssId.file;
// ins.serial = bssId.serial;
return new BSSEngine(ins);
}
function BSSEngine(prm)
......@@ -24,6 +30,8 @@ function BSSEngine(prm)
this.name = prm.name;
this.context = (prm.context)?prm.context:null;
this.concurrent = 0;
this.serial=prm.serial||'';
this.outdate=false;
}
BSSEngine.prototype.filepath = function()
......@@ -78,9 +86,11 @@ BSSEngine.prototype.cmd = function(cmd,cb)
var command = cmd.command;
var param = cmd.param;
var self=this;
switch (command) {
case 'write':
this.cmd_write(param,cb);
self.cmd_write(param,cb);
break;
default:
cb('invalid cmd');
......
......@@ -9,30 +9,39 @@ function BSSPool(prm)
this.size = 32;
}
BSSPool.prototype.get = function(name,cb)
BSSPool.prototype.get = function(name,opt,cb)
{
if(!cb){cb=opt;opt={};}
var self=this;
var filepath = this.repos_dir + '/' + name2path(name) + '.bss'
var bssname = name;
var bss = this.search(name);
if(bss){
if(bss && !opt.newInstance){
self.clean(function(err){
process.nextTick(function() {
cb(null,bss.engine);
});
});
}else{
bss = BSSEngine.create({'context':self.context,'file' : filepath,'name' : bssname});
bss.open(function(err){
var bss_engine = BSSEngine.create(
{ 'context':self.context,
'repos_dir':self.repos_dir,
'file' : filepath,
'name' : bssname,
'newInstance':opt.newInstance
});
bss_engine.open(function(err){
if(!err){
self.pool.push({
'name' : name,
'engine':bss
'engine':bss_engine
});
}
self.clean(function(err){
cb(err,bss);
cb(err,bss_engine);
});
});
}
......@@ -64,16 +73,27 @@ BSSPool.prototype.search = function(name)
newpool.push(bssI)
}
});
if(ret){newpool.push(ret)}
this.pool = newpool;
// for(var i=0;i<this.pool.length;i++)
// {
// var bssI = this.pool[i];
// if(bssI.name == name){
// ret = bssI;
// break;
// }
// }
return ret;
}
BSSPool.prototype.detach = function(name)
{
var ret=null;
var newpool=[];
this.pool.forEach((bssI)=>{
if(bssI.name == name){
ret = bssI;
}else{
newpool.push(bssI)
}
});
this.pool = newpool;
return ret;
}
......
var fs = require('fs');
var BSSPool = require('./bsspool');
var StorageIndexstore = require('./storage-indexstore')
module.exports.create = function(cfg){
return new Db(cfg);
......@@ -8,8 +10,11 @@ function Db(cfg)
{
this.repos_dir = cfg.repos_dir;
this.context = cfg.context;
this.mem = cfg.redis;
this.idxstore = new StorageIndexstore({'mem':this.mem });
this.bsspool = new BSSPool({'repos_dir':this.repos_dir,'context':this.context});
}
Db.prototype.request = function(req,cb)
......@@ -24,6 +29,14 @@ Db.prototype.request = function(req,cb)
var prm = req.param
this.bsscmd_w(prm,cb)
break;
case 'delete':
var prm = req.param
this.bsscmd_del(prm,cb)
break;
case 'idxget':
var prm = req.param
this.cmd_idxget(prm,cb)
break;
default:
cb(null,result_error('invalid command'));
}
......@@ -31,11 +44,27 @@ Db.prototype.request = function(req,cb)
}
Db.prototype.cmd_idxget = function(prm,cb)
{
var self = this;
var storage_name = prm.storage_name;
var key = prm.key;
self.idxstore.getIndex(storage_name,key,(err,val)=>{
if(!err && val){
cb(null,result_ok({'found':true,'object_id':val}));
}else{
cb(null,result_ok({}))
}
});
}
Db.prototype.bsscmd_w = function(prm,cb)
{
var self = this;
var filepath = this.repos_dir + '/' + name2path(prm.storage_name) + '.bss';
var bssname = prm.storage_name;
var metadata = prm.meta;
var w_cmd = {
'command' : 'write',
'param' : {
......@@ -44,11 +73,16 @@ Db.prototype.bsscmd_w = function(prm,cb)
}
}
this.bsspool.get(bssname,function(err,bss){
var bss_opt = {};
if(prm.opt && prm.opt.overwrite){bss_opt.newInstance=true;}
self.bsspool.get(bssname,bss_opt,function(err,bss){
if(!err){
bss.cmd(w_cmd,function(err,resp){
if(!err){
indexing(metadata,resp.resource_id,(err)=>{
cb(null,result_ok(resp));
})
}else{
cb(null,result_error('write error'));
}
......@@ -57,16 +91,66 @@ Db.prototype.bsscmd_w = function(prm,cb)
cb(null,result_error('bss error'));
}
});
function indexing(meta,objid,cb)
{
if(meta && meta._key && typeof meta._key == 'string'){
self.idxstore.setIndex(bssname,meta._key,objid,cb);
}else{
cb(null);
}
}
}
Db.prototype.bsscmd_del = function(prm,cb)
{
var self = this;
var filepath = this.repos_dir + '/' + name2path(prm.storage_name) + '.bss';
var bssname = prm.storage_name;
var bssI = self.bsspool.detach(bssname);
if(bssI){
bssI.engine.close((err)=>{
if(!err){
self.idxstore.flush(bssname);
unlink(filepath,cb);
}else{
cb(null,result_error('delete error'));
}
});
}else{
unlink(filepath,cb);
}
function unlink(fd,callback)
{
fs.exists(fd,function(exists){
if(exists){
fs.unlink(fd,function(err){
if(!err){
callback(null,result_ok());
}else{
callback(null,result_error('delete error'));
}
});
}else{
callback(null,result_error('storage not found'));
}
});
}
}
function name2path(name){
return name.split('.').join('/');
}
function result_error(msg)
function result_error(msg,code)
{
return {'status':'ERR','msg':msg}
return {'status':'ERR','msg':msg,'code':code}
}
function result_ok(resp)
......
......@@ -128,10 +128,9 @@ function bss_end(code)
function obj_out(obj,opt){
var ret = {
"_id" : (new ObjId(obj.header.ID)).toString()
}
var ret = {}
if(opt.id){ret._id = (new ObjId(obj.header.ID)).toString()}
if(opt.meta){ret.meta = obj.meta;}
if(opt.data){
if(obj.header.TY==BinStream.BINARY_TYPE)
......@@ -143,5 +142,13 @@ function obj_out(obj,opt){
}
}
if(opt.field=='_id'){
ret = ret._id;
}else if(opt.field=='_meta'){
ret = ret.meta;
}else if(opt.field=='_data'){
ret = ret.data;
}
return ret
}
var redis = require('redis');
const PREFIX = 'bs:storage:index';
function indexstore(conf){
this.prefix = PREFIX ;
if(conf.mem){
this.mem = conf.mem;
}else if(conf.conn){
this.mem = redis.createClient(conf.conn);
}
}
indexstore.prototype.setIndex = function(storage,keyname,value,cb){
var key = this.prefix + ":" + storage;
this.mem.hset(key,keyname,value,cb);
}
indexstore.prototype.getIndex = function(storage,keyname,cb)
{
var key = this.prefix + ":" + storage;
this.mem.hget(key,keyname,function(err,v){
var value = null;
if(!err && v){
//value = JSON.parse(v);
value = v;
}
cb(err,value);
});
}
indexstore.prototype.flush = function(storage,cb)
{
var key = this.prefix + ":" + storage;
this.mem.del(key);
if(typeof cb == 'function'){
cb();
}
}
module.exports = indexstore;
var path = require('path');
var fs = require('fs')
var fs = require('fs');
var bss_walk_sync = function(dir, filelist,cat) {
files = fs.readdirSync(dir);
......@@ -20,7 +20,25 @@ var bss_walk_sync = function(dir, filelist,cat) {
return filelist;
};
function name2path(name){
return name.split('.').join('/');
}
module.exports.list = function (repo)
{
return bss_walk_sync(repo)
}
/*
module.exports.mkBssId = function(repo,name,opt)
{
var prefix_dir = repo + '/' + name.split('.').slice(0,-1).join('/');
var dirlist = fs.readdirSync(dir);
var found_storage = [];
dirlist.forEach((file)=>{
});
}
*/
\ No newline at end of file
var ctx = require('../context');
var ConnCtx = ctx.getLib('lib/conn/connection-context');
var rpcserver = ctx.getLib('lib/amqp/rpcserver');
var SSServer = ctx.getLib('lib/axon/rpcserver');
var Db = ctx.getLib('storage-service/lib/db');
var WorkerPool = ctx.getLib('storage-service/lib/worker_pool');
var SSCaller = ctx.getLib('lib/axon/rpccaller');
var Tokenizer = ctx.getLib('lib/auth/tokenizer');
var ACLValidator = ctx.getLib('lib/auth/acl-validator');
var jwt = require('express-jwt');
var express = require('express');
var app = express();
var bodyParser = require('body-parser');
var EventPub = ctx.getLib('lib/amqp/event-pub');
var cfg = ctx.config;
var SS_LISTEN = ctx.getUnixSocketUrl('ss.sock');
var SS_URL = ctx.getUnixSocketUrl('ss.sock');
// var SS_LISTEN = ctx.getServiceUrl(19030);
// var SS_URL = ctx.getClientUrl(19030);
module.exports.create = function(cfg)
{
var ss = new SS(cfg);
......@@ -27,15 +40,22 @@ var SS = function StorageService(p_cfg)
'evp':new EventPub({'url':amqp_cfg.url,'name':'bs_storage'})
}
this.db = Db.create({'repos_dir':storage_cfg.repository,'context':this.context});
this.conn = ConnCtx.create(this.config);
this.mem = this.conn.getMemstore();
this.db = Db.create({'redis':this.mem,'repos_dir':storage_cfg.repository,'context':this.context});
this.worker_pool = WorkerPool.create({'size':2});
//this.storagecaller = new SSCaller({'url':SS_URL});
}
SS.prototype.start = function()
{
console.log('Starting Storage Service ...\n');
this.amqp_start();
this.ipc_start();
setTimeout(function(){
this.http_start();
},5000);
}
SS.prototype.amqp_start = function()
......@@ -74,26 +94,72 @@ SS.prototype.amqp_start = function()
});
}
SS.prototype.ipc_start = function()
{
var self = this;
if(this.ipc_server){return;}
this.ipc_server = new SSServer({
url : SS_LISTEN,
name : 'storage_request'
});
this.ipc_server.set_remote_function(function(req,callback){
//console.log("IPC Command");
self.db.request(req,function(err,res){
if(err){
console.log(err);
}
callback(err,res);
});
});
this.ipc_server.start(function(err){
if(!err){
console.log('SS:IPC START\t\t\t[OK]');
}else{
console.log('SS:IPC START\t\t\t[ERR]');
console.log('SS:IPC ERROR Restarting ...');
setTimeout(function(){
process.exit(1);
},5000);
}
});
}
SS.prototype.http_start = function()
{
var self = this;
var amqp_cfg = this.config.amqp;
var auth_cfg = this.config.auth;
var API_PORT = (this.config.storage.api_port)?this.config.storage.api_port:19080;
app.use(bodyParser.json({limit: '5mb'}));
app.use(bodyParser.json({limit: '64mb'}));
app.use(bodyParser.urlencoded({
extended: true
}));
var context = ctx.getLib('lib/ws/http-context');
this.storagecaller = new SSCaller({'url':SS_URL});
this.acl_validator = ACLValidator.create(auth_cfg);
this.worker_pool.initWorker();
app.use(context.middleware({
'worker_pool' : self.worker_pool
'acl_validator':self.acl_validator,
'worker_pool' : self.worker_pool,
'storagecaller':self.storagecaller
}));
app.use(require('./ws'));
var tokenizer = Tokenizer.create(auth_cfg);
app.use(tokenizer.middleware());
app.use(function (err, req, res, next) {
if (err.name === 'UnauthorizedError') {
res.status(401).send('invalid token');
}
});
app.use(require('./ws'));
app.listen(API_PORT, function () {
console.log('SS:DATA_API START\t\t[OK]');
......
......@@ -3,5 +3,7 @@ var express = require('express')
router.use('/v0.1',require('./v0.1'));
router.use('/v1',require('./v0.1'));
router.use('/v1.1',require('./v1.1'));
router.use('/v1.2',require('./v1.2'));
module.exports = router;
var express = require('express');
var router = express.Router();
router.use('/object',require('./service-object'));
router.use('/storage',require('./service-storage'));
module.exports = router;
var ctx = require('../../../context');
var express = require('express');
var router = express.Router();
var path = require('path');
var fs = require('fs');
var cfg = ctx.config;
var storage_cfg = cfg.storage
var response = ctx.getLib('lib/ws/response');
var request = ctx.getLib('lib/ws/request');
var BinStream = ctx.getLib('lib/bss/binarystream_v1_1');
var ObjId = ctx.getLib('lib/bss/objid');
var BSData = ctx.getLib('lib/model/bsdata');
router.get('/:id',function (req, res) {
var reqHelper = request.create(req);
var respHelper = response.create(res);
var oid = req.params.id;
var opt = {}
get_object(reqHelper,respHelper,{'oid':oid,'opt':opt});
});
router.get('/:id/data',function (req, res) {
var reqHelper = request.create(req);
var respHelper = response.create(res);
var query = reqHelper.getQuery();
var oid = req.params.id;
var opt = {
'field' : 'data'
}
opt.filetype = (query.filetype)?query.filetype:null
get_object(reqHelper,respHelper,{'oid':oid,'opt':opt});
});
function get_object(reqHelper,respHelper,prm)
{
prm=prm||{};
var oid = prm.oid;
var opt = prm.opt || {};
if(!oid){
return respHelper.response404();
}
var bss_ab_path = "default.bss";
var path_token= oid.split('.');
if(path_token.length >= 2){
var col = "";
var file = path_token[path_token.length-2] + ".bss";
var col = path_token.slice(0,path_token.length-2).join('/');
if(col.length>0){col = col + '/'}
bss_ab_path = col + file;
}else{
//Only Id no storage name
//Default Action
}
var tkoId = path_token[path_token.length-1];
var bss_full_path = storage_cfg.repository + "/" + bss_ab_path;
var obj_id = '';
try{
obj_id = new ObjId(tkoId);
}catch(err){
return respHelper.response404();
}
var seq = obj_id.extract().seq;
fs.exists(bss_full_path,function(exists){
if(exists){
BinStream.open(bss_full_path,function(err,bss){
var rd = bss.reader();
rd.objectAt(seq,function(err,obj){
bss.close(function(err){
if(obj && obj_id.toString() == (new ObjId(obj.header.ID)).toString()){
output(respHelper,obj,opt);
}else{respHelper.response404();}
});
});
});
}else{
respHelper.response404();
}
});
}
function output(resp,obj,opt)
{
if(opt.field=='data')
{
data_out(resp,obj,opt);
}else{
obj_out(resp,obj,opt);
}
}
function obj_out(resp,obj,opt)
{
var ret = {"_id" : (new ObjId(obj.header.ID)).toString(),
"meta" : obj.meta
}
if(obj.header.TY==BinStream.BINARY_TYPE)
{
var bs = BSData.create(obj.data);
ret.data = bs.serialize('object-encoded');
}else{
ret.data = obj.data;
}
resp.responseOK(ret);
}
function data_out(resp,obj,opt)
{
var objType = obj.header.TY;
if(objType == BinStream.BINARY_TYPE){
if(opt.filetype){
resp.response.type(opt.filetype);
}else{
}
resp.response.send(obj.data);
}else if(objType == BinStream.STRING_TYPE){
resp.response.send(obj.data);
}else{
resp.responseOK(obj.data);
}
}
module.exports = router;
var ctx = require('../../../context');
var express = require('express');
var router = express.Router();
var path = require('path');
var fs = require('fs');
var async = require('async');
//var Worker = require("tiny-worker");
var cfg = ctx.config;
var storage_cfg = cfg.storage;
var response = ctx.getLib('lib/ws/response');
var request = ctx.getLib('lib/ws/request');
var BinStream = ctx.getLib('lib/bss/binarystream_v1_1');
var ObjId = ctx.getLib('lib/bss/objid');
var BSData = ctx.getLib('lib/model/bsdata');
var StorageUtils = ctx.getLib('storage-service/lib/storage-utils');
const AGENT_NAME = "Storage API";
router.put('/:id',function (req, res) {
var reqHelper = request.create(req);
var respHelper = response.create(res);
var q = reqHelper.getQuery();
var sname = req.params.id;
var json_body = req.body;
var storagecaller = req.context.storagecaller;
if(!json_body){
return respHelper.response400();
}
if(!sname)
{
return respHelper.response400();
}
var databody = Array.isArray(json_body)?json_body:[json_body];
var idx = 0;
async.whilst(
function() { return idx < databody.length; },
function(callback) {
//var el_data = databody[idx].data;
var el_data = (BSData.parse(databody[idx].data)==null)?BSData.create(databody[idx].data).serialize('object-encoded'):databody[idx].data;
var meta = databody[idx].meta;
var dc_meta = {
"_agent" : AGENT_NAME,
"_ts" : Math.round((new Date).getTime() / 1000)
}
if(meta && typeof meta == 'object')
{
Object.keys(meta).forEach((item)=>{
//if(!item.startsWith('_') || item=='_key'){
dc_meta[item] = meta[item];
//}
});
}
send_storage(storagecaller,dc_meta,el_data,sname,function(err){
idx++;
if(!err){
callback(null);
}else{
callback(new Error('storage error'));
}
});
},
function (err) {
if(!err){
respHelper.response200();
}else{
respHelper.response400();
}
}
);
});
function send_storage(caller,dc_meta,dc_data,storage_name,cb)
{
var req = {
'object_type' : 'storage_request',
'command' : 'write',
'param' : {
'storage_name' : storage_name,
'meta' : dc_meta,
'data' : {
'type' : 'bsdata',
'value' : dc_data
}
}
}
caller.call(req,function(err,resp){
if(!err && resp.status=='OK'){
cb(null);
}else{
cb("error");
}
});
}
router.delete('/:id',function (req, res) {
var reqHelper = request.create(req);
var respHelper = response.create(res);
var q = reqHelper.getQuery();
var sname = req.params.id;
var caller = req.context.storagecaller;
var req = {
'object_type' : 'storage_request',
'command' : 'delete',
'param' : {
'storage_name' : sname
}
}
caller.call(req,function(err,resp){
if(!err && resp.status=='OK'){
respHelper.response200();
}else{
respHelper.response400(resp.msg);
}
});
});
router.get('/',function (req, res) {
var reqHelper = request.create(req);
var respHelper = response.create(res);
respHelper.responseOK(StorageUtils.list(storage_cfg.repository));
});
router.get('/:id/stats',function (req, res) {
var reqHelper = request.create(req);
var respHelper = response.create(res);
var sid = req.params.id;
if(!sid){
return respHelper.response404();
}
var storage_path = sid.split('.').join('/');
var bss_full_path = storage_cfg.repository + "/" + storage_path + ".bss";
fs.exists(bss_full_path,function(exists){
if(exists){
var fstat = stats = fs.statSync(bss_full_path);
BinStream.open(bss_full_path,function(err,bss){
var rd = bss.reader();
var obj_stat = {
"storagename" : sid,
"count" : rd.count(),
"filename" : storage_path + ".bss",
"filesize" : fstat.size
}
bss.close(function(err){
respHelper.responseOK(obj_stat);
});
});
}else{
respHelper.response404();
}
});
});
router.get('/:id/objects',function (req, res) {
var reqHelper = request.create(req);
var respHelper = response.create(res);
var sid = req.params.id;
var query = reqHelper.getQuery();
if(!query){query={};}
if(!sid){
return respHelper.response404();
}
var storage_path = sid.split('.').join('/');
var bss_full_path = storage_cfg.repository + "/" + storage_path + ".bss";
var from_seq = 1;
var limit = 0;
var sizelimit = 64 * 1000 * 1000;
//compat with v1
// param => offset
// param => obj_after
if(query.offset){query.obj_after=query.offset;}
if(query.obj_after){
var o_seq;
try{
var obj_id = new ObjId(query.obj_after);
o_seq = obj_id.extract().seq;
}catch(err){
return respHelper.response404();
}
from_seq = o_seq+1;
}
// param => limit
if(query.limit){
limit = Number(query.limit);
}
// param => sizelimit
if(query.sizelimit){
sizelimit = Number(query.sizelimit) * 1000 * 1000;
}
// param => output = [object],stream
var output_type = (query.output)?query.output:'object';
//compat with v1
// param => from
// param => seq_from
if(query.from){query.seq_from = query.from;}
if(query.seq_from){
from_seq = Number(query.seq_from);
}
// param => field = id,meta,[data]
var objOpt = {'meta':true,'data':true}
if(query.field == 'id'){
objOpt.meta = false;
objOpt.data = false;
}else if(query.field == 'meta'){
objOpt.data = false;
}
// param => last
var tail_no = query.last;
var rd_prm = {
'bss_full_path' : bss_full_path,
'tail_no' : tail_no,
'from_seq' : from_seq,
'limit' : limit,
'output_type' : output_type,
'objOpt' : objOpt,
'sizelimit' : sizelimit
}
var worker_pool = req.context.worker_pool;
var worker = worker_pool.get();
worker.resp = respHelper;
worker.output_type = output_type;
worker.execute({'cmd':'read','prm':rd_prm});
worker.on('start',function(data){
stream_start();
});
var firstline=true;
worker.on('data',function(data){
if(!firstline){
stream_newrec();
}else{firstline = false;}
stream_data(data);
});
worker.on('end',function(code){
if(code == '404')
{
end(404);
}else if(code == '200'){
stream_end();
end(200);
}
worker.shutdown();
//worker_pool.push(worker);
});
function stream_start()
{
var resp = worker.resp;
var type = worker.output_type;
if(type=='stream')
{
resp.type('text');
}else{
resp.type('application/json');
resp.write('[');
}
}
function stream_newrec()
{
var resp = worker.resp;
var type = worker.output_type;
if(type=='stream')
{
resp.write('\n');
}else{
resp.write(',');
}
}
function stream_data(data)
{
var resp = worker.resp;
resp.write(data);
}
function stream_end()
{
var resp = worker.resp;
var type = worker.output_type;
if(type=='stream')
{
resp.write('');
}else{
resp.write(']');
}
}
function end(code)
{
var resp = worker.resp;
if(code==404){
resp.response404()
}else{
resp.status(code).end();
}
}
});
module.exports = router;
var express = require('express');
var router = express.Router();
router.use('/object',require('./service-object'));
router.use('/storage',require('./service-storage'));
module.exports = router;
var ctx = require('../../../context');
var express = require('express');
var router = express.Router();
var path = require('path');
var fs = require('fs');
var cfg = ctx.config;
var storage_cfg = cfg.storage
var response = ctx.getLib('lib/ws/response');
var request = ctx.getLib('lib/ws/request');
var BinStream = ctx.getLib('lib/bss/binarystream_v1_1');
var ObjId = ctx.getLib('lib/bss/objid');
var BSData = ctx.getLib('lib/model/bsdata');
var Tokenizer = ctx.getLib('lib/auth/tokenizer');
const ACL_SERVICE_NAME = "storage";
router.get('/:id',function (req, res) {
var reqHelper = request.create(req);
var respHelper = response.create(res);
var oid = req.params.id;
var opt = {}
get_object(reqHelper,respHelper,{'oid':oid,'opt':opt});
});
router.get('/:id/data',function (req, res) {
var reqHelper = request.create(req);
var respHelper = response.create(res);
var query = reqHelper.getQuery();
var oid = req.params.id;
var opt = {
'field' : 'data'
}
opt.filetype = (query.filetype)?query.filetype:null
get_object(reqHelper,respHelper,{'oid':oid,'opt':opt});
});
function oid_parse(oid,caller,cb)
{
var ret = {'valid':true}
if(!oid)
{
return cb(null,{'valid':false});
}
var storage_and_addr = oid.split('$');
ret.storage_name = storage_and_addr[0];
if(storage_and_addr.length == 1)
{
ret.by = "seq"
ret.seq = -1;
return cb(null,ret);
}else if(storage_and_addr.length==2){
var str_addr = storage_and_addr[1];
if(str_addr.startsWith('{') && str_addr.endsWith('}')){
ret.by = "key";
ret.key = str_addr.substr(1,str_addr.length-2);
if(ret.key.length<=0){
return cb(null,{'valid':false});
}
var callreq = {
'object_type' : 'storage_request',
'command' : 'idxget',
'param' : {
'storage_name' : ret.storage_name,
'key' : ret.key
}
}
caller.call(callreq,function(err,resp){
if(!err && resp.status=='OK' && resp.resp.found){
ret.by = "obj";
ret.obj_id = resp.resp.object_id;
ret.seq = (new ObjId(ret.obj_id)).extract().seq;
cb(null,ret);
}else{
cb(null,{'valid':false});
}
});
}else if(str_addr.startsWith('[') && str_addr.endsWith(']')){
ret.by = "seq";
var str_num = str_addr.substr(1,str_addr.length-2);
if(isNaN(str_num) || !Number.isInteger(Number(str_num))){
ret.valid = false;
}else{
ret.seq = Number(str_num);
}
return cb(null,ret);
}else{
ret.by = "obj";
ret.obj_id = storage_and_addr[1];
try{
var obj_id = new ObjId(ret.obj_id);
ret.seq = obj_id.extract().seq;
}catch(err){
ret.valid=false;
}
return cb(null,ret);
}
}else{
ret.valid=false;
return cb(null,ret);
}
}
function get_object(reqHelper,respHelper,prm)
{
prm=prm||{};
var oid = prm.oid;
var opt = prm.opt || {};
var storagecaller = reqHelper.request.context.storagecaller;
oid_parse(oid,storagecaller,(err,oid_result)=>{
if(!oid_result.valid){
respHelper.response404();
}else{
var acl_validator = reqHelper.request.context.acl_validator;
var tInfo = Tokenizer.info(reqHelper.request.auth);
var acp = acl_validator.isAccept(tInfo.acl,{
"vo":tInfo.vo,"service":ACL_SERVICE_NAME,"resource":oid_result.storage_name,"mode":"r"
});
if(!acp){
respHelper.response403();
}else{
var bss_full_path = storage_cfg.repository + "/" + oid_result.storage_name.split('.').join('/') + ".bss";
fs.exists(bss_full_path,function(exists){
if(exists){
BinStream.open(bss_full_path,function(err,bss){
var rd = bss.reader();
var rec_count = rd.count();
var seq = (oid_result.seq>=0)?oid_result.seq:rec_count + oid_result.seq + 1;
rd.objectAt(seq,function(err,obj){
bss.close(function(err){
if(obj && (oid_result.by == 'seq' || oid_result.obj_id == (new ObjId(obj.header.ID)).toString()) ){
output(respHelper,obj,opt);
}else{respHelper.response404();}
});
});
});
}else{
respHelper.response404();
}
});
}
}
});
}
function output(resp,obj,opt)
{
if(opt.field=='data')
{
data_out(resp,obj,opt);
}else{
obj_out(resp,obj,opt);
}
}
function obj_out(resp,obj,opt)
{
var ret = {"_id" : (new ObjId(obj.header.ID)).toString(),
"meta" : obj.meta
}
if(obj.header.TY==BinStream.BINARY_TYPE)
{
var bs = BSData.create(obj.data);
ret.data = bs.serialize('object-encoded');
}else{
ret.data = obj.data;
}
resp.responseOK(ret);
}
function data_out(resp,obj,opt)
{
var objType = obj.header.TY;
if(objType == BinStream.BINARY_TYPE){
if(opt.filetype){
resp.response.type(opt.filetype);
}else{
}
resp.response.send(obj.data);
}else if(objType == BinStream.STRING_TYPE){
resp.response.send(obj.data);
}else{
resp.responseOK(obj.data);
}
}
module.exports = router;
This diff is collapsed.
//NB-UDP
var ctx = require('../../context');
var cfg = ctx.config;
var amqp_cfg = ctx.config.amqp;
var ConnCtx = ctx.getLib('lib/conn/connection-context');
var QueueCaller = ctx.getLib('lib/amqp/queuecaller');
var EvenSub = ctx.getLib('lib/amqp/event-sub');
var TriggerRegis = require('./lib/triggerregis');
var dgram = require('dgram');
var server = dgram.createSocket('udp4');
var TRIGGER_TYPE = "nbudp";
var PORT = 19150;
var HOST = '0.0.0.0';
module.exports.create = function (cfg)
{
return new NBUdpTrigger(cfg);
}
function NBUdpTrigger(cfg)
{
this.config = cfg;
this.conn = ConnCtx.create(this.config);
this.mem = this.conn.getMemstore();
this.jobcaller = new QueueCaller({'url':amqp_cfg.url,'name':'bs_jobs_cmd'});
this.evs = new EvenSub({'url':amqp_cfg.url,'name':'bs_trigger_cmd'});
this.regis = TriggerRegis.create({'conn':this.config.memstore.url});
}
NBUdpTrigger.prototype.start = function ()
{
console.log('NBUDP_TRIGGER:Starting\t\t[OK]');
this._start_listener();
this._start_controller();
}
NBUdpTrigger.prototype._start_listener = function ()
{
console.log('NBUDP_TRIGGER:Starting Listener\t[OK]');
var self = this;
self.reset();
self.reload();
server.on('listening', function () {
console.log('NBUDP_TRIGGER:Listening\t[OK]');
});
server.on('message', function (message, remote) {
if(!message){return;}
var udpdata = message.toString();
var datachunk = udpdata.split(',');
if(datachunk.length>1)
{
var kname = datachunk[0];
var jobs = self.regis.findJob(kname);
jobs.forEach(function(item){
self._callJob(item.jobid,udpdata);
});
}
});
server.bind(PORT, HOST);
}
NBUdpTrigger.prototype.reload = function ()
{
this.regis.update(function(err){
if(!err){
console.log('NBUDP_TRIGGER:REG Update\t\t[OK]');
}else{
console.log('NBUDP_TRIGGER:REG Update\t\t[ERR]');
}
});
}
NBUdpTrigger.prototype.reset = function ()
{
this.regis.clean();
}
NBUdpTrigger.prototype._callJob = function(jobid,udpdata)
{
var udpmsg = udpdata.substring(udpdata.indexOf(',')+1);
var trigger_data = {
'object_type' : 'nbudp_data',
'udpdata' : udpdata,
'message' : udpmsg
}
var cmd = {
'object_type':'job_execute',
'source' : 'nbudp_trigger',
'jobId' : jobid,
'option' : {'exe_level':'secondary'},
'input_data' : {
'type' : 'bsdata',
'value' : {
'object_type':'bsdata',
'data_type' : 'object',
'data' : trigger_data
}
}
}
this.jobcaller.send(cmd);
}
NBUdpTrigger.prototype._start_controller = function ()
{
var self=this;
var topic = 'ctl.trigger.#';
self.evs.sub(topic,function(err,msg){
if(err){
console.log('NBUDP_TRIGGER:AMQP ERROR Restarting ...');
setTimeout(function(){
process.exit(1);
},5000);
}
if(!msg){return;}
var ctl = msg.data;
if(ctl.trigger_type != TRIGGER_TYPE && ctl.trigger_type != 'all')
{
return;
}
if(ctl.cmd == 'reload')
{
console.log('NBUDP_TRIGGER:CMD Reload\t\t[OK]');
self.reload();
}
});
}
var Redis = require('redis');
const KEYS = 'bs:regis:triggers';
const TRIGGER_TYPE = "nbudp";
module.exports.create = function(cfg)
{
return new TriggerRegister(cfg);
}
module.exports.mkRegis = mkRegis
function mkRegis(trigger,opt)
{
if(!trigger.keyname)
{
return null;
}
var vo = trigger.vo || '';
if(vo=='$'){vo=''}
var a = {
'vo':vo,
'keyname' : trigger.keyname,
'jobid' : trigger.job_id
}
if(opt){a.opt = opt}
return a;
}
function TriggerRegister(cfg)
{
this.config = cfg;
if(cfg.conn){
this.mem = Redis.createClient(cfg.conn);
}else if(cfg.redis){
this.mem = cfg.redis;
}else{
this.mem = null;
}
this.regis = [];
}
TriggerRegister.prototype.add = function(rg)
{
if(!rg){return;}
var found = false;
this.regis.forEach( function (val) {
if(val.vo == rg.vo && val.keyname == rg.keyname && val.jobid == rg.jobid){
found = true;
}
});
if(!found){
this.regis.push(rg);
}
}
TriggerRegister.prototype.clean = function()
{
this.regis = [];
}
TriggerRegister.prototype.update = function(cb)
{
var self=this;
self.clean()
self.mem.hgetall(KEYS,function (err,res){
if(!err && res){
var ks = Object.keys(res);
for(var i=0;i<ks.length;i++)
{
var k = ks[i];
var trigger = JSON.parse(res[k]);
if(trigger.type == TRIGGER_TYPE)
{
var reg = mkRegis(trigger);
self.add(reg);
}
}
}
cb(err);
});
}
TriggerRegister.prototype.findJob= function(kname)
{
var jobs = [];
this.regis.forEach( function (val) {
var fullkey = (val.vo == '')?val.keyname:val.vo + ':' + val.keyname;
if(fullkey == kname){
jobs.push(val);
}
});
return jobs;
}
{
"version":"1.2",
"build":"201808021100"
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment