Commit c5766091 authored by Kamron Aroonrua's avatar Kamron Aroonrua 💬

storage list

parent 4c843232
...@@ -23,27 +23,29 @@ var process_get = function(req, res) { ...@@ -23,27 +23,29 @@ var process_get = function(req, res) {
var j = httpacl.findJob(appkey,'get'); var j = httpacl.findJob(appkey,'get');
var topic_prex = 'cmd.execute.'; var topic_prex = 'cmd.execute.';
var httpdata = {
'object_type' : 'httpdata',
'method' : 'get',
'data' : reqHelper.getQuery()
}
var job_execute_msg = {
'object_type':'job_execute', j.forEach(function(item){
'source' : 'http_listener', var httpdata = {
'jobId' : '', 'object_type' : 'httpdata',
'option' : {}, 'method' : 'get',
'input_data' : { 'data' : reqHelper.getQuery()
'type' : 'bsdata', }
'value' : {
'data_type' : 'object', var job_execute_msg = {
'data' : httpdata 'object_type':'job_execute',
'source' : 'http_listener',
'jobId' : '',
'option' : {},
'input_data' : {
'type' : 'bsdata',
'value' : {
'data_type' : 'object',
'data' : httpdata
}
} }
} }
}
j.forEach(function(item){
var topic = topic_prex + item.jobid; var topic = topic_prex + item.jobid;
var msg = job_execute_msg; var msg = job_execute_msg;
msg.jobId = item.jobid; msg.jobId = item.jobid;
......
...@@ -35,7 +35,9 @@ JT.prototype.run = function (done) ...@@ -35,7 +35,9 @@ JT.prototype.run = function (done)
//red jobconfig //red jobconfig
job_registry.getJob(jobId,function(err,data){ job_registry.getJob(jobId,function(err,data){
if(!data){ if(!data){
callback('job ' + jobId + ' does not exits'); callback('job ' + jobId + ' :: does not exits');
}else if(!data.active){
callback('job ' + jobId + ' :: unactive');
}else{ }else{
callback(err,data); callback(err,data);
} }
...@@ -48,7 +50,7 @@ JT.prototype.run = function (done) ...@@ -48,7 +50,7 @@ JT.prototype.run = function (done)
'job_config' : jobCfg, 'job_config' : jobCfg,
'input_meta' : command.input_meta, 'input_meta' : command.input_meta,
'input_data' : command.input_data, 'input_data' : command.input_data,
'opt' : {'job_timeout' :60000} 'opt' : {'job_timeout' :90000}
} }
if(jobCfg.job_timeout){ if(jobCfg.job_timeout){
task_prm.opt.job_timeout = jobCfg.job_timeout; task_prm.opt.job_timeout = jobCfg.job_timeout;
......
var vm = require('vm');
module.exports.parse_script_param = function(param) module.exports.parse_script_param = function(param)
{ {
if(!param) if(!param)
...@@ -8,3 +10,15 @@ module.exports.parse_script_param = function(param) ...@@ -8,3 +10,15 @@ module.exports.parse_script_param = function(param)
return scrp; return scrp;
} }
} }
module.exports.vm_execute_text = function(env,param)
{
if(!param){return null}
var sandbox = env;
sandbox.vm_text_parameter = null;
var script = new vm.Script("vm_text_parameter=`" + param + "`");
var context = new vm.createContext(sandbox);
script.runInContext(context);
return sandbox.vm_text_parameter;
}
var ctx = require('../../../context'); var ctx = require('../../../context');
var Utils = ctx.getLib('lib/util/plugin-utils');
var bsdata = ctx.getLib('lib/model/bsdata'); var bsdata = ctx.getLib('lib/model/bsdata');
function perform_function(context,request,response){ function perform_function(context,request,response){
...@@ -9,22 +10,27 @@ function perform_function(context,request,response){ ...@@ -9,22 +10,27 @@ function perform_function(context,request,response){
var memstore = context.task.memstore; var memstore = context.task.memstore;
var jobcaller = context.task.jobcaller; var jobcaller = context.task.jobcaller;
var data = request.data; var in_type = request.type;
var data = (Array.isArray(request.data))?request.data:[request.data];
var meta = request.meta; var meta = request.meta;
var prm_to = param.to; var prm_to = (Array.isArray(param.to))?param.to:[param.to];
if(Array.isArray(prm_to))
{ data.forEach((dat)=>{
prm_to.forEach(function(job){ prm_to.forEach((jobprm)=>{
call_to(job); var ev = {
'type' : in_type,
'meta' : meta,
'data' : dat
}
var job=Utils.vm_execute_text(ev,jobprm)
call_to(dat,job);
}); });
}else{ });
call_to(prm_to);
}
function call_to(target) function call_to(obj,target)
{ {
var job_id = null; var job_id = null;
if(typeof target == 'string') if(typeof target == 'string')
...@@ -34,13 +40,13 @@ function perform_function(context,request,response){ ...@@ -34,13 +40,13 @@ function perform_function(context,request,response){
var cmd = { var cmd = {
'object_type':'job_execute', 'object_type':'job_execute',
'source' : 'scheduler', 'source' : 'do',
'jobId' : job_id, 'jobId' : job_id,
'option' : {'exe_level':'secondary'}, 'option' : {'exe_level':'secondary'},
'input_meta' : meta, 'input_meta' : meta,
'input_data' : { 'input_data' : {
'type' : 'bsdata', 'type' : 'bsdata',
'value' : bsdata.create(data).serialize('object-encoded') 'value' : bsdata.create(obj).serialize('object-encoded')
} }
} }
......
var ctx = require('../../../context'); var ctx = require('../../../context');
var Utils = ctx.getLib('lib/util/plugin-utils');
var RPCCaller = ctx.getLib('lib/amqp/rpccaller'); var RPCCaller = ctx.getLib('lib/amqp/rpccaller');
var BinStream = ctx.getLib('lib/bss/binarystream_v1_1'); var BinStream = ctx.getLib('lib/bss/binarystream_v1_1');
...@@ -13,7 +14,8 @@ function perform_function(context,request,response){ ...@@ -13,7 +14,8 @@ function perform_function(context,request,response){
var memstore = context.task.memstore var memstore = context.task.memstore
var output_type = request.input_type; var output_type = request.input_type;
var data = request.data; var data = (Array.isArray(request.data))?request.data:[request.data];
var meta = request.meta;
var amqp_cfg = ctx.config.amqp; var amqp_cfg = ctx.config.amqp;
var storage_name = param.storage_name; var storage_name = param.storage_name;
...@@ -29,45 +31,44 @@ function perform_function(context,request,response){ ...@@ -29,45 +31,44 @@ function perform_function(context,request,response){
"_ts" : Math.round((new Date).getTime() / 1000) "_ts" : Math.round((new Date).getTime() / 1000)
} }
if(Array.isArray(data)){ var idx = 0;
var idx = 0; async.whilst(
async.whilst( function() { return idx < data.length; },
function() { return idx < data.length; }, function(callback) {
function(callback) { var el_data = bsdata.create(data[idx]).serialize('object-encoded');
var el_data = bsdata.create(data[idx]).serialize('object-encoded'); var ev = {
send_storage(caller,dc_meta,el_data,storage_name,function(err){ 'type' : output_type,
idx++; 'meta' : meta,
'data' : data[idx]
}
var sname=Utils.vm_execute_text(ev,storage_name)
if(sname){
send_storage(caller,dc_meta,el_data,sname,function(err){
if(!err){ if(!err){
idx++;
callback(null); callback(null);
}else{ }else{
callback(err); callback(new Error('storage error'));
} }
}); });
}, }else{
function (err) { callback(new Error('invalid storage'));
if(!err){ }
response.success();
}else{ },
response.error("storage error"); function (err) {
} if(!err){
response.success();
}else{
response.error("storage error");
} }
);
}else{
var dc_data = bsdata.create(data).serialize('object-encoded');
send_storage(caller,dc_meta,dc_data,storage_name,function(err){
if(!err){
response.success();
}else{
response.error("storage error");
} }
});
} );
} }
function send_storage(caller,dc_meta,dc_data,storage_name,cb) function send_storage(caller,dc_meta,dc_data,storage_name,cb)
{ {
var req = { var req = {
...@@ -83,7 +84,6 @@ function send_storage(caller,dc_meta,dc_data,storage_name,cb) ...@@ -83,7 +84,6 @@ function send_storage(caller,dc_meta,dc_data,storage_name,cb)
} }
} }
caller.call(req,function(err,resp){ caller.call(req,function(err,resp){
if(!err && resp.status=='OK'){ if(!err && resp.status=='OK'){
cb(null); cb(null);
......
...@@ -10,6 +10,9 @@ function perform_function(context,request,response){ ...@@ -10,6 +10,9 @@ function perform_function(context,request,response){
var in_meta = request.meta; var in_meta = request.meta;
//parameter //parameter
//prm_size :: int
//prm_reject :: bool
//prm_name :: text
var prm_size = (param.size && Number(param.size)>0)?Number(param.size):1; var prm_size = (param.size && Number(param.size)>0)?Number(param.size):1;
var prm_reject = (param.reject==false)?false:true; var prm_reject = (param.reject==false)?false:true;
var prm_name = (param.name)?'windw-'+param.name:'windw'; var prm_name = (param.name)?'windw-'+param.name:'windw';
......
...@@ -15,6 +15,16 @@ var BinStream = ctx.getLib('lib/bss/binarystream_v1_1'); ...@@ -15,6 +15,16 @@ var BinStream = ctx.getLib('lib/bss/binarystream_v1_1');
var ObjId = ctx.getLib('lib/bss/objid'); var ObjId = ctx.getLib('lib/bss/objid');
var BSData = ctx.getLib('lib/model/bsdata'); var BSData = ctx.getLib('lib/model/bsdata');
var StorageUtils = ctx.getLib('storage-service/lib/storage-utils');
router.get('/',function (req, res) {
var reqHelper = request.create(req);
var respHelper = response.create(res);
respHelper.responseOK(StorageUtils.list(storage_cfg.repository));
});
router.get('/:id/stats',function (req, res) { router.get('/:id/stats',function (req, res) {
var reqHelper = request.create(req); var reqHelper = request.create(req);
var respHelper = response.create(res); var respHelper = response.create(res);
......
...@@ -237,8 +237,31 @@ var job_config = { ...@@ -237,8 +237,31 @@ var job_config = {
// console.log(crons.list); // console.log(crons.list);
// }); // });
var hash = require('object-hash'); // var hash = require('object-hash');
var dat = {'a':'hello','b':10}; // var dat = {'a':'hello','b':10};
var dat2 = new Buffer(10); // var dat2 = new Buffer(10);
console.log(dat2); // console.log(dat2);
console.log(hash(dat2)); // console.log(hash(dat2));
var path = require('path');
var fs = fs || require('fs')
var walkSync = function(dir, filelist,cat) {
files = fs.readdirSync(dir);
filelist = filelist || [];
cat = cat || '';
files.forEach(function(file) {
if (fs.statSync(path.join(dir, file)).isDirectory()) {
var base_cat = cat + file + '.'
filelist = walkSync(path.join(dir, file), filelist,base_cat);
}
else {
if(path.extname(file) == '.bss'){
var storage = cat + path.basename(file,'.bss');
filelist.push(storage);
}
}
});
return filelist;
};
console.log(walkSync('D:/testfile/BSDATA'));
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment