Commit 3ceed7b3 authored by Kamron Aroonrua's avatar Kamron Aroonrua 💬

v1.1

parent 2c3fddf5
#Changelog
## [UR] - 2018-04-xx
### Added
- SS :: Ipc Channel
- SS :: REST API (v1.1ur) method PUT
- PLUGIN :: do-storage with ipc channel supported
### Fixed
- WORKER :: active/unactive job bugs
### Changed
### Removed
\ No newline at end of file
......@@ -20,3 +20,20 @@ module.exports.getPlugins = function(type,name)
module.exports.sysenv = {
}
module.exports.getServiceUrl = function(port,opt)
{
return 'tcp://0.0.0.0:' + String(port);
}
module.exports.getClientUrl = function(port,opt)
{
return 'tcp://127.0.0.1:' + String(port);
}
module.exports.getUnixSocketUrl = function(name){
var sockname = name || 'test.sock';
return 'unix://' + __dirname + '/tmp/' + sockname;
}
//module.exports.socket_dir = __dirname + '/tmp';
//module.exports.tmp_dir = __dirname + '/tmp';
var schedule = require('node-schedule');
var ctx = require('../context');
var cfg = ctx.config;
var amqp_cfg = ctx.config.amqp;
var ConnCtx = ctx.getLib('lib/conn/connection-context');
var StorageEventList = ctx.getLib('lib/mems/storage-eventlist');
var CronList = ctx.getLib('lib/mems/cronlist');
var QueueCaller = ctx.getLib('lib/amqp/queuecaller');
var EvenSub = ctx.getLib('lib/amqp/event-sub');
var TRIGGER_TYPE = "storage";
module.exports.create = function (cfg)
{
return new StorageTrigger(cfg);
}
function StorageTrigger(cfg)
{
this.config = cfg;
this.conn = ConnCtx.create(this.config);
this.mem = this.conn.getMemstore();
this.jobcaller = new QueueCaller({'url':amqp_cfg.url,'name':'bs_jobs_cmd'});
this.evs = new EvenSub({'url':amqp_cfg.url,'name':'bs_trigger_cmd'});
this.storage_ev = new EvenSub({'url':amqp_cfg.url,'name':'bs_storage'});
this.sel = StorageEventList.create({'conn':this.config.memstore.url});
}
StorageTrigger.prototype.start = function ()
{
console.log('STORAGE_TRIGGER:Starting\t\t[OK]');
this._start_listener();
this._start_controller();
}
StorageTrigger.prototype._start_listener = function ()
{
console.log('STORAGE_TRIGGER:Starting Listener\t[OK]');
var self = this;
self.reset();
self.reload();
self.storage_ev.sub('storage.#.dataevent.newdata',(err,msg)=>{
if(!err){
var topic = msg.topic;
var storage_name = topic.substr(8,topic.length-26);
var sdata = msg.data;
sdata.storage_name = storage_name;
var jobs = self.sel.findJob(storage_name);
jobs.forEach(function(item){
self._callJob(item.jobid,sdata);
});
}
});
}
StorageTrigger.prototype.reload = function ()
{
this.sel.update(function(err){
if(!err){
console.log('STORAGE_TRIGGER:SEL Update\t\t[OK]');
}else{
console.log('STORAGE_TRIGGER:SEL Update\t\t[ERR]');
}
});
}
StorageTrigger.prototype.reset = function ()
{
this.sel.clean();
}
StorageTrigger.prototype._callJob = function(jobid,sl)
{
var storage_ev_data = {
'object_type' : 'storage_event_data',
'event':sl.event,
'storage_name' : sl.storage_name,
'resource_id' : sl.resource_id,
'resource_location':sl.resource_location
}
var cmd = {
'object_type':'job_execute',
'source' : 'storage_trigger',
'jobId' : jobid,
'option' : {'exe_level':'secondary'},
'input_data' : {
'type' : 'bsdata',
'value' : {
'object_type':'bsdata',
'data_type' : 'object',
'data' : storage_ev_data
}
}
}
this.jobcaller.send(cmd);
}
StorageTrigger.prototype._start_controller = function ()
{
var self=this;
var topic = 'ctl.trigger.#';
self.evs.sub(topic,function(err,msg){
if(err){
console.log('STORAGE_TRIGGER:AMQP ERROR Restarting ...');
setTimeout(function(){
process.exit(1);
},5000);
}
if(!msg){return;}
var ctl = msg.data;
if(ctl.trigger_type != TRIGGER_TYPE && ctl.trigger_type != 'all')
{
return;
}
if(ctl.cmd == 'reload')
{
console.log('STORAGE_TRIGGER:CMD Reload\t\t[OK]');
self.reload();
}
});
}
......@@ -10,7 +10,7 @@ var response = ctx.getLib('lib/ws/response');
var request = ctx.getLib('lib/ws/request');
var process_get = function(req, res) {
var process_req = function(req, res ,method) {
var reqHelper = request.create(req);
var respHelper = response.create(res);
var appkey = req.params.akey;
......@@ -20,7 +20,7 @@ var process_get = function(req, res) {
//var evp = req.context.evp;
var jobcaller = req.context.jobcaller;
var j = httpacl.findJob(appkey,'get');
var j = httpacl.findJob(appkey,method);
var topic_prex = 'cmd.execute.';
......@@ -28,8 +28,14 @@ var process_get = function(req, res) {
j.forEach(function(item){
var httpdata = {
'object_type' : 'httpdata',
'method' : 'get',
'data' : reqHelper.getQuery()
'method' : method,
'data' : {}
}
if(method=='get'){
httpdata.data = reqHelper.getQuery();
}else if(method=='post'){
httpdata.data = req.body;
}
var job_execute_msg = {
......@@ -40,6 +46,7 @@ var process_get = function(req, res) {
'input_data' : {
'type' : 'bsdata',
'value' : {
'object_type':'bsdata',
'data_type' : 'object',
'data' : httpdata
}
......@@ -62,7 +69,8 @@ var process_get = function(req, res) {
}
}
router.get('/:akey',process_get);
router.get('/:akey',function(req, res){process_req(req,res,'get')});
router.post('/:akey',function(req, res){process_req(req,res,'post')});
module.exports = router;
......@@ -18,6 +18,7 @@ function JobTask (prm)
this.handle = prm.handle;
this.mem = prm.handle.mem;
this.jobcaller = prm.handle.jobcaller;
this.storagecaller = prm.handle.storagecaller;
this.jobcfg = prm.job_config;
this.input_meta = prm.input_meta;
......@@ -290,9 +291,11 @@ function perform_do(prm,cb)
var DOTask = getPlugins('do',do_cfg.type);
var doMem = new memstore({'job_id':job_id,'cat':'do','mem':prm.handle.mem});
var jobcaller = prm.handle.jobcaller;
var storagecaller = prm.handle.storagecaller;
do_context.task = {
"memstore" : doMem,
"jobcaller" : jobcaller
"jobcaller" : jobcaller,
"storagecaller" : storagecaller
}
var dout = new DOTask(do_context,prm.request);
......@@ -324,6 +327,7 @@ function getInputData(obj)
if(obj.type == 'bsdata')
{
var inp = bsdata.parse(obj.value);
if(!inp){ return {}}
return inp.data;
}else{
return {};
......
......@@ -36,7 +36,7 @@ JT.prototype.run = function (done)
job_registry.getJob(jobId,function(err,data){
if(!data){
callback('job ' + jobId + ' :: does not exits');
}else if(!data.active){
}else if(data.active.toString()=='false'){
callback('job ' + jobId + ' :: unactive');
}else{
callback(err,data);
......
......@@ -5,8 +5,12 @@ var QueueCaller = ctx.getLib('lib/amqp/queuecaller');
var QueueReceiver = ctx.getLib('lib/amqp/queuereceiver');
var ConnCtx = ctx.getLib('lib/conn/connection-context');
var JobRegistry = ctx.getLib('lib/mems/job-registry');
var SSCaller = ctx.getLib('lib/axon/rpccaller');
var JobTransaction = require('./lib/jobtransaction')
//const SS_URL = 'tcp://127.0.0.1:19030'
var SS_URL = ctx.getUnixSocketUrl('ss.sock');
module.exports.create = function(prm)
{
var jw = new JW(prm);
......@@ -24,6 +28,8 @@ var JW = function JobWorker (prm)
this.jobcaller = new QueueCaller({'url':this.conn.getAmqpUrl(),'name':'bs_jobs_cmd'});
this.job_registry = JobRegistry.create({'redis':this.mem});
this.storagecaller = new SSCaller({'url':SS_URL});
}
JW.prototype.start = function ()
......
var axon = require('axon');
var thunky = require('thunky');
function RPCCaller(config)
{
this.config = config;
this.url = config.url;
this.name = config.name || "rpc_queue";
this.sock = axon.socket('req');
var self = this;
this.opened = false;
this.open = thunky(open);
this.open();
function open (cb) {
//console.log('OPEN IPC >>>> ' + self.url);
self.sock.connect(self.url,function(){
//console.log('OPENED');
self.opened = true;
cb();
});
}
}
RPCCaller.prototype.call = function(req,cb)
{
var self = this;
//console.log('CALLING>>>');
this.open(()=>{
self.sock.send(req,(res)=>{
//console.log(req);
cb(null,res);
})
});
}
RPCCaller.prototype.close = function()
{
this.sock.close();
}
module.exports = RPCCaller;
var axon = require('axon');
var fs = require('fs');
function RPCServer(config)
{
this.config = config;
this.url = config.url;
this.name = config.name || "rpc_queue";
this.remote_function = null;
this.sock = axon.socket('rep');
}
RPCServer.prototype.start = function(cb)
{
var self = this;
self.sock.bind(self.url,function(){
if(typeof cb == 'function'){cb();}
});
self.sock.on('message', function(req, reply){
self.remote_function(req,function(err,resp){
reply(resp);
});
});
}
RPCServer.prototype.set_remote_function = function(func){
this.remote_function = func;
}
module.exports = RPCServer;
......@@ -216,5 +216,6 @@ Storage.prototype.reader = function(prm){
}
Storage.prototype.close = function(cb){
this.write_queue.pause( )
this.file.close(cb);
};
......@@ -75,15 +75,6 @@ HttpACL.prototype.update = function(cb)
cb(err);
});
// this.mem.get(PREFIX, function (err, result) {
// if(!err && result){
//
// self.acl = JSON.parse(result);
// }
// cb(err);
// });
}
// HttpACL.prototype.commit = function(cb)
......
var Redis = require('redis');
const KEYS = 'bs:regis:triggers';
module.exports.create = function(cfg)
{
return new StorageEventList(cfg);
}
module.exports.mkSEL = mkSEL;
function mkSEL(storage_name,jobid,opt)
{
var a = {
'storage_name' : storage_name,
'jobid' : jobid
}
if(opt){a.opt = opt}
return a;
}
function StorageEventList(cfg)
{
this.config = cfg;
if(cfg.conn){
this.mem = Redis.createClient(cfg.conn);
}else if(cfg.redis){
this.mem = cfg.redis;
}else{
this.mem = null;
}
this.sel = [];
}
StorageEventList.prototype.add = function(sel)
{
var found = false;
this.sel.forEach( function (val) {
if(val.storage_name == sel.storage_name && val.jobid == sel.jobid){
found = true;
}
});
if(!found){
this.sel.push(sel);
}
}
StorageEventList.prototype.clean = function()
{
this.sel = [];
}
StorageEventList.prototype.update = function(cb)
{
var self=this;
self.clean()
self.mem.hgetall(KEYS,function (err,res){
if(!err && res){
var ks = Object.keys(res);
for(var i=0;i<ks.length;i++)
{
var k = ks[i];
var trigger = JSON.parse(res[k]);
if(trigger.type == 'storage')
{
var sel = mkSEL(trigger.storage_name,trigger.job_id);
self.add(sel);
}
}
}
cb(err);
});
}
StorageEventList.prototype.findJob= function(sname)
{
var jobs = [];
this.sel.forEach( function (val) {
if(val.storage_name == sname){
jobs.push(val);
}
});
return jobs;
}
......@@ -18,7 +18,7 @@ module.exports.parse = function(obj)
}else if(obj instanceof Buffer){
var jsonobj = BSON.parse(obj);
return new BSData(jsonobj.data,jsonobj.data_type);
}else if(obj.data_type && obj.data){
}else if( ((obj.object_type && obj.object_type == 'bsdata')||(obj.type && obj.type == 'bsdata')) && obj.data_type && obj.data){
var oData = obj.data;
if(obj.encoding == 'base64'){
oData = new Buffer(obj.data,'base64');
......
var vm = require('vm');
var request = require('request').defaults({ encoding: null });
function execute_function(context,response){
......@@ -10,9 +11,23 @@ function execute_function(context,response){
var output_type = 'text';
var url = param.url;
var env = {
'input' : {
'meta' : input_meta,
'data' : input_data
},
'url' : ''
}
var script = new vm.Script("url=`" + url + "`");
var context = new vm.createContext(env);
script.runInContext(context);
url = env.url;
var reject = true;
if(param.reject==false){reject=false;}
if(typeof param.reject != 'undefined' && param.reject.toString()=="false"){reject=false;}
var encode = 'utf8';
if(param.encoding == 'binary'){
......@@ -22,7 +37,21 @@ function execute_function(context,response){
if(param.encoding=='json'){output_type='object'}
request({'url':url, 'encoding':encode}, function (error, resp, body) {
//Http Header
var http_headers = {};
if(param.auth){
if(param.auth.type == 'basic'){
var auth_header = "Basic " + new Buffer(param.auth.username + ":" + param.auth.password).toString("base64");
http_headers.Authorization = auth_header;
}
}
if(typeof param.headers == 'object')
{
http_headers = Object.assign(http_headers,param.headers)
}
request({'method': 'GET','url':url,'headers':http_headers ,'encoding':encode}, function (error, resp, body) {
response.meta = {'_status_code':(error)?0:resp.statusCode,'_error':(error)?true:false}
if (!error && resp.statusCode == 200) {
if(param.encoding=='json'){
......
......@@ -23,8 +23,14 @@ function perform_function(context,response){
var last_mod = {'fname':'','tts':0};
var fs_continue = false;
var buff_out = new Buffer(0);
if(param.last_modify_ts)
{
last_mod.tts = param.last_modify_ts*1000;
}
memstore.getItem('lastmodify',function(err,value){
if(value){
if(value && value.tts > last_mod.tts){
last_mod=value;
}
getData();
......
......@@ -12,23 +12,31 @@ function perform_function(context,request,response){
var token = param.token;
var message = data;
var image_url = null;
if(param.message){
var env = {
'type' : output_type,
'data' : data,
'meta' : meta,
'msg' : data
'msg' : data,
'img' : null
}
var txt_script = ""
var script = new vm.Script("msg=`" + param.message + "`");
if(param.image_url){
env.msg="";
txt_script += "img=`" + param.image_url + "`; "
}
if(param.message){txt_script += "msg=`" + param.message + "`; "}
var script = new vm.Script(txt_script);
var context = new vm.createContext(env);
script.runInContext(context);
message = env.msg;
}
message = (typeof env.msg == 'string')?env.msg:JSON.stringify(env.msg);
image_url = env.img;
post_to_line(token,message,function(err){
post_to_line(token,message,{'imgurl':image_url},function(err){
if(!err){
response.success();
}else{
......@@ -40,7 +48,7 @@ function perform_function(context,request,response){
//response.error("error message")
}
function post_to_line(token,msg,cb)
function post_to_line(token,msg,resource,cb)
{
var options = { method: 'POST',
......@@ -49,7 +57,20 @@ function post_to_line(token,msg,cb)
{ 'cache-control': 'no-cache',
'authorization' : 'Bearer ' + token,
'content-type': 'multipart/form-data' },
formData: { message: String(msg) } };
formData: {
message: String(msg)
}
};
if(typeof resource == 'function')
{
cb=resource;
}else {
if(resource.imgurl){
options.formData.imageThumbnail = resource.imgurl;
options.formData.imageFullsize = resource.imgurl;
}
}
request(options, function (err, resp, body) {
if (!err && resp.statusCode==200) {
......
......@@ -11,7 +11,8 @@ function perform_function(context,request,response){
var job_id = context.jobconfig.job_id;
var transaction_id = context.transaction.id;
var param = context.jobconfig.data_out.param;
var memstore = context.task.memstore
var memstore = context.task.memstore;
var storagecaller = context.task.storagecaller;
var output_type = request.input_type;
var data = (Array.isArray(request.data))?request.data:[request.data];
......@@ -20,10 +21,14 @@ function perform_function(context,request,response){
var amqp_cfg = ctx.config.amqp;
var storage_name = param.storage_name;
var caller = new RPCCaller({
var caller = storagecaller;
if(param.channel!='ipc'){
caller = new RPCCaller({
url : amqp_cfg.url,
name :'storage_request'
});
}
var dc_meta = {
"_jid" : job_id,
......@@ -93,7 +98,7 @@ function send_storage(caller,dc_meta,dc_data,storage_name,cb)
}
}
}
//console.log(req);
caller.call(req,function(err,resp){
if(!err && resp.status=='OK'){
cb(null);
......
......@@ -38,6 +38,10 @@ function perform_function(context,request,response){
var meta = mapenv.meta;
var output_type = mapenv.type;
if(param.to_binary && typeof data == 'string'){
data = Buffer.from(data, 'base64');
output_type = 'binary';
}
response.success(data,{'meta':meta,'output_type':output_type});
......
......@@ -14,7 +14,7 @@ function perform_function(context,request,response){
//prm_reject :: bool
//prm_name :: text
var prm_size = (param.size && Number(param.size)>0)?Number(param.size):1;
var prm_reject = (param.reject==false)?false:true;
var prm_reject = (typeof param.reject != 'undefined' && param.reject.toString()=="false")?false:true;
var prm_name = (param.name)?'windw-'+param.name:'windw';
var obj = {
......
var Jimp = require("jimp");
var async = require('async');
function avg_point(prm,cb)
{
var bg = prm.bg;
var fg = prm.fg;
var dpoint = prm.point;
var radius = prm.radius || 10;
var bg_threshold = prm.bg_threshold || 20;
var mapping_threshold = prm.mapping_threshold || 128;
var table = prm.table || [];
async.waterfall([
p_readbg,
p_readfg,
p_process,
], function (err, result) {
cb(err,result);
});
function p_readbg(callback) {
Jimp.read(bg, function (err, img) {
callback(null, img);
});
}
function p_readfg(ibg, callback) {
Jimp.read(fg, function (err, img2) {
callback(null, ibg,img2);
});
}
function p_process(ibg,ifg,callback)
{
var w = ifg.bitmap.width;
var h = ifg.bitmap.height;
if(!dpoint){dpoint=[Math.floor(w/2),Math.floor(h/2)]}
var x0 = (dpoint[0]-radius>0)?dpoint[0]-radius:0;
var y0 = (dpoint[1]-radius>0)?dpoint[1]-radius:0;
var x1 = (dpoint[0]+radius<w)?dpoint[0]+radius:w;
var y1 = (dpoint[1]+radius<h)?dpoint[1]+radius:h;
var sum_point = [];
var sum=0;
for(var i=x0;i<=x1;i++){
for(var j=y0;j<=y1;j++){
if(pointInCircle(i,j,dpoint[0],dpoint[1],radius)){
var bpx = Jimp.intToRGBA(ibg.getPixelColor(i, j));
var fpx = Jimp.intToRGBA(ifg.getPixelColor(i, j));
//Channel threshold
// if(bpx.r-fpx.r > bg_threshold || bpx.g-fpx.g > bg_threshold || bpx.b-fpx.b > bg_threshold){
// newimg.setPixelColor(Jimp.rgbaToInt(fpx.r, fpx.g, fpx.b, 255), i, j);
// }
//Color Distance threshold
if(distance([bpx.r,bpx.g,bpx.b],[fpx.r,fpx.g,fpx.b])>bg_threshold){
var mv = mapping(table,[fpx.r,fpx.g,fpx.b])
if(mv.value>0 && mv.distance<mapping_threshold){
sum_point.push(mv.value);
sum+=mv.value;
}
}else{
sum_point.push(0);
}
}
}
}
var avg = (sum_point.length>0)?sum/sum_point.length:0;
var mdn = (sum_point.length>0)?median(sum_point):0;
callback(null,{'avg':avg,'mdn':mdn});
}
function p_process_image(ibg,ifg,callback)
{
var w = ibg.bitmap.width;
var h = ibg.bitmap.height;
if(!dpoint){dpoint=[Math.floor(w/2),Math.floor(h/2)]}
var x0 = (dpoint[0]-radius>0)?dpoint[0]-radius:0;
var y0 = (dpoint[1]-radius>0)?dpoint[1]-radius:0;
var x1 = (dpoint[0]+radius<w)?dpoint[0]+radius:w;
var y1 = (dpoint[1]+radius<h)?dpoint[1]+radius:h;
var newimage = new Jimp(w,h,0x0000FFFF,(err,newimg)=>{
var sum_point = [];
for(var i=x0;i<=x1;i++){
for(var j=y0;j<=y1;j++){
if(pointInCircle(i,j,dpoint[0],dpoint[1],radius)){
var bpx = Jimp.intToRGBA(ibg.getPixelColor(i, j));
var fpx = Jimp.intToRGBA(ifg.getPixelColor(i, j));
//Channel threshold
// if(bpx.r-fpx.r > bg_threshold || bpx.g-fpx.g > bg_threshold || bpx.b-fpx.b > bg_threshold){
// newimg.setPixelColor(Jimp.rgbaToInt(fpx.r, fpx.g, fpx.b, 255), i, j);
// }
//Color Distance threshold
if(distance([bpx.r,bpx.g,bpx.b],[fpx.r,fpx.g,fpx.b])>bg_threshold){
newimg.setPixelColor(Jimp.rgbaToInt(fpx.r, fpx.g, fpx.b, 255), i, j);
}
}
}
}
callback(null,newimg);
});
}
}
function mapping(map_table,color)
{
var dist = distance([0,0,0],[255,255,255]);
var out={'value':-1,'distance':dist}
if(!Array.isArray(map_table)){map_table=[];}
map_table.forEach((itm)=>{
if(itm.color && itm.value){
var d = distance(itm.color,color)
if(d<out.distance){
out.value = itm.value;
out.distance = d;
}
}
});
return out;
}
function median(values) {
values.sort( function(a,b) {return a - b;} );
var half = Math.floor(values.length/2);
if(values.length % 2)
return values[half];
else
return (values[half-1] + values[half]) / 2.0;
}
function pointInCircle(x, y, cx, cy, radius) {
var distancesquared = Math.pow(x - cx,2) + Math.pow(y - cy,2);
return Math.sqrt(distancesquared) <= radius;
}
function distance(a,b)
{
sum = 0;
sum = Math.pow(a[0] - b[0],2) + Math.pow(a[1] - b[1],2) + Math.pow(a[2] - b[2],2)
return Math.sqrt(sum)
}
module.exports.avg_point = avg_point;
var util = require('util');
var DTPlugin = require('../../dt-plugin');
function DTTask(context,request){
DTPlugin.call(this,context,request);
this.name = "radar-digitizer";
this.output_type = "";
}
util.inherits(DTTask,DTPlugin);
DTTask.prototype.perform = require('./perform');
module.exports = DTTask;
{
"name": "dt-radar-digitizer",
"version": "1.0.0",
"description": "",
"main": "run.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
},
"author": "",
"license": "ISC",
"dependencies": {
"jimp": "^0.2.28"
}
}
var vm = require('vm');
var hash = require('object-hash');
var digitizer = require("./digitizer.js");
function perform_function(context,request,response){
var job_id = context.jobconfig.job_id;
var transaction_id = context.transaction.id;
var param = context.task.config.param;
var memstore = context.task.memstore
var output_type = request.input_type;
var data = request.data;
var meta = request.meta || {};
//Parameters
var prm_backgroud = param.backgroud;
var prm_point = param.point;
var prm_radius = param.radius;
var prm_table = param.color_mapping || def_table();
var mapping_threshold = param.mapping_threshold;
var bg_threshold = param.backgroud_threshold;
var prefix = param.prefix || 'radar_';
var bg=null;
if(prm_backgroud)
{
if(typeof prm_backgroud == 'string')
{
bg = __dirname + '/img/' + prm_backgroud
}else if(typeof prm_backgroud == 'object' && prm_backgroud.base64){
bg = Buffer.from(prm_backgroud, 'base64');
}
}
var fg = data;
if(typeof data == 'object' && data.base64_data)
{
fg = Buffer.from(data, 'base64');
}
digitizer.avg_point({
'bg':bg,
'fg': fg,
'point':prm_point,
'table':prm_table,
'radius':prm_radius,
'mapping_threshold':mapping_threshold,
'bg_threshold ':bg_threshold
},function(err,res){
if(!err){
meta[prefix+'avg'] = res.avg;
meta[prefix+'median'] = res.mdn;
response.success(data,{'meta':meta,'output_type':output_type});
}else{
response.error(err)
}
})
// memstore.setItem('lasttransaction',transaction_id,function(err){
// response.success(data);
// });
// memstore.getItem('lasttransaction',function(err,value){
// response.success(value);
// });
//response.reject();
//response.error("error message")
}
function def_table()
{
var col_mapping = [
{"color":[0,0,0],"value":-1},
{"color":[255,255,255],"value":-1},
{"color":[0,254,130],"value":5.5},
{"color":[0,255,0],"value":10},
{"color":[0,173,0],"value":15},
{"color":[0,150,50],"value":20},
{"color":[255,255,0],"value":25},
{"color":[255,200,3],"value":30},
{"color":[255,170,0],"value":35},
{"color":[255,85,0],"value":41},
{"color":[255,0,0],"value":45},
{"color":[255,0,100],"value":50},
{"color":[255,0,255],"value":55},
{"color":[255,128,255],"value":60},
]
return col_mapping;
}
module.exports = perform_function;
//var Jimp = require("jimp");
var async = require('async');
var digitizer = require("./digitizer.js");
var Jimp = require("jimp");
var col_mapping = [
{"color":[0,0,0],"value":-1},
{"color":[255,255,255],"value":-1},
{"color":[0,254,130],"value":5.5},
{"color":[0,255,0],"value":10},
{"color":[0,173,0],"value":15},
{"color":[0,150,50],"value":20},
{"color":[255,255,0],"value":25},
{"color":[255,200,3],"value":30},
{"color":[255,170,0],"value":35},
{"color":[255,85,0],"value":41},
{"color":[255,0,0],"value":45},
{"color":[255,0,100],"value":50},
{"color":[255,0,255],"value":55},
{"color":[255,128,255],"value":60},
]
digitizer.avg_point({
'bg':'img/nck.jpg',
'fg':'D:\\Project\\radar_nck_process\\obj.jpg',
'point':[555,492],
'table':col_mapping,
'radius':10
},function(err,res){
console.log(res);
console.log(__dirname);
})
// var newimage = new Jimp(1600,1600,0x00000000,(err,newimg)=>{
// newimg.write( "out.png", (err,res)=>{console.log("OK");} )
// });
// Jimp.read("D:\\Project\\radar_nck_process\\base_bg.jpg", function (err, img) {
// // do stuff with the image (if no exception)
// img.write( "out.png", (err,res)=>{console.log("OK");} )
// });
//
// Jimp.read("D:\\Project\\radar_nck_process\\obj.jpg", function (err, img) {
// // do stuff with the image (if no exception)
// img.write( "out2.png", (err,res)=>{console.log("OK2");} )
// });
// var img = new Jimp("D:\\Project\\radar_nck_process\\base_bg.jpg");
// img.write( "out.png", (err,res)=>{console.log("OK");} )
......@@ -17,8 +17,8 @@
"instances" : 0
},
{
"name" : "bs.trigger.scheduler",
"script" : "./serv-scheduler.js"
"name" : "bs.trigger.core",
"script" : "./serv-coretrigger.js"
},
{
"name" : "bs.trigger.httplistener",
......
var ctx = require('./context');
var SchedulerService = ctx.getLib('coreservice/scheduler');
var StorageEventService = ctx.getLib('coreservice/storage-trigger');
var ss = SchedulerService.create(ctx.config);
ss.start();
var ses = StorageEventService.create(ctx.config);
ses.start();
......@@ -5,18 +5,21 @@ var argv = require('minimist')(process.argv.slice(2));
//ss.start();
var m = {'read':false,'write':false}
if(argv['process-read']){m.read = true;}
if(argv['process-write']){m.write = true;}
var m = {'read':"false",'write':"false"}
if(argv['process-read']){m.read = "true";}
if(argv['process-write']){m.write = "true";}
if(argv['api-port']){ctx.config.storage.api_port = Number(argv['api-port']);}
//console.log(argv);
var ss = StorageService.create(ctx.config);
if(!m.read && !m.write)
if(m.read=="false" && m.write=="false")
{
ss.start();
}else{
if(m.read){ss.http_start();}
if(m.write){ss.amqp_start();}
if(m.read=="true"){ss.http_start();}
if(m.write=="true"){
ss.amqp_start();
ss.ipc_start();
}
}
......@@ -7,10 +7,16 @@ var bsdata = ctx.getLib('lib/model/bsdata');
var importer = require('./importer');
var dataevent = require('./dataevent');
var sutils = require('./storage-utils');
module.exports.create = function(prm)
{
return new BSSEngine(prm);
var ins = prm;
// var bssId = sutils.mkBssId(ins.repos_dir,ins.name,{'newInstance':ins.newInstance});
// ins.file = bssId.file;
// ins.serial = bssId.serial;
return new BSSEngine(ins);
}
function BSSEngine(prm)
......@@ -24,6 +30,8 @@ function BSSEngine(prm)
this.name = prm.name;
this.context = (prm.context)?prm.context:null;
this.concurrent = 0;
this.serial=prm.serial||'';
this.outdate=false;
}
BSSEngine.prototype.filepath = function()
......@@ -78,9 +86,11 @@ BSSEngine.prototype.cmd = function(cmd,cb)
var command = cmd.command;
var param = cmd.param;
var self=this;
switch (command) {
case 'write':
this.cmd_write(param,cb);
self.cmd_write(param,cb);
break;
default:
cb('invalid cmd');
......
......@@ -9,30 +9,39 @@ function BSSPool(prm)
this.size = 32;
}
BSSPool.prototype.get = function(name,cb)
BSSPool.prototype.get = function(name,opt,cb)
{
if(!cb){cb=opt;opt={};}
var self=this;
var filepath = this.repos_dir + '/' + name2path(name) + '.bss'
var bssname = name;
var bss = this.search(name);
if(bss){
if(bss && !opt.newInstance){
self.clean(function(err){
process.nextTick(function() {
cb(null,bss.engine);
});
});
}else{
bss = BSSEngine.create({'context':self.context,'file' : filepath,'name' : bssname});
bss.open(function(err){
var bss_engine = BSSEngine.create(
{ 'context':self.context,
'repos_dir':self.repos_dir,
'file' : filepath,
'name' : bssname,
'newInstance':opt.newInstance
});
bss_engine.open(function(err){
if(!err){
self.pool.push({
'name' : name,
'engine':bss
'engine':bss_engine
});
}
self.clean(function(err){
cb(err,bss);
cb(err,bss_engine);
});
});
}
......@@ -64,16 +73,27 @@ BSSPool.prototype.search = function(name)
newpool.push(bssI)
}
});
if(ret){newpool.push(ret)}
this.pool = newpool;
// for(var i=0;i<this.pool.length;i++)
// {
// var bssI = this.pool[i];
// if(bssI.name == name){
// ret = bssI;
// break;
// }
// }
return ret;
}
BSSPool.prototype.detach = function(name)
{
var ret=null;
var newpool=[];
this.pool.forEach((bssI)=>{
if(bssI.name == name){
ret = bssI;
}else{
newpool.push(bssI)
}
});
this.pool = newpool;
return ret;
}
......
var fs = require('fs');
var BSSPool = require('./bsspool');
module.exports.create = function(cfg){
......@@ -24,6 +25,10 @@ Db.prototype.request = function(req,cb)
var prm = req.param
this.bsscmd_w(prm,cb)
break;
case 'delete':
var prm = req.param
this.bsscmd_del(prm,cb)
break;
default:
cb(null,result_error('invalid command'));
}
......@@ -44,7 +49,10 @@ Db.prototype.bsscmd_w = function(prm,cb)
}
}
this.bsspool.get(bssname,function(err,bss){
var bss_opt = {};
if(prm.opt && prm.opt.overwrite){bss_opt.newInstance=true;}
self.bsspool.get(bssname,bss_opt,function(err,bss){
if(!err){
bss.cmd(w_cmd,function(err,resp){
if(!err){
......@@ -59,6 +67,46 @@ Db.prototype.bsscmd_w = function(prm,cb)
});
}
Db.prototype.bsscmd_del = function(prm,cb)
{
var self = this;
var filepath = this.repos_dir + '/' + name2path(prm.storage_name) + '.bss';
var bssname = prm.storage_name;
var engine = self.bsspool.detach(bssname);
if(engine){
engine.close((err)=>{
if(!err){
unlink(filepath,cb);
}else{
cb(null,result_error('delete error'));
}
});
}else{
unlink(filepath,cb);
}
function unlink(fd,callback)
{
fs.exists(fd,function(exists){
if(exists){
fs.unlink(fd,function(err){
if(!err){
callback(null,result_ok());
}else{
callback(null,result_error('delete error'));
}
});
}else{
callback(null,result_error('file not found'));
}
});
}
}
function name2path(name){
return name.split('.').join('/');
......
var path = require('path');
var fs = require('fs')
var fs = require('fs');
var bss_walk_sync = function(dir, filelist,cat) {
files = fs.readdirSync(dir);
......@@ -20,7 +20,25 @@ var bss_walk_sync = function(dir, filelist,cat) {
return filelist;
};
function name2path(name){
return name.split('.').join('/');
}
module.exports.list = function (repo)
{
return bss_walk_sync(repo)
}
/*
module.exports.mkBssId = function(repo,name,opt)
{
var prefix_dir = repo + '/' + name.split('.').slice(0,-1).join('/');
var dirlist = fs.readdirSync(dir);
var found_storage = [];
dirlist.forEach((file)=>{
});
}
*/
\ No newline at end of file
var ctx = require('../context');
var rpcserver = ctx.getLib('lib/amqp/rpcserver');
var SSServer = ctx.getLib('lib/axon/rpcserver');
var Db = ctx.getLib('storage-service/lib/db');
var WorkerPool = ctx.getLib('storage-service/lib/worker_pool');
var SSCaller = ctx.getLib('lib/axon/rpccaller');
var express = require('express');
var app = express();
......@@ -10,6 +12,11 @@ var bodyParser = require('body-parser');
var EventPub = ctx.getLib('lib/amqp/event-pub');
var cfg = ctx.config;
var SS_LISTEN = ctx.getUnixSocketUrl('ss.sock');
var SS_URL = ctx.getUnixSocketUrl('ss.sock');
// var SS_LISTEN = ctx.getServiceUrl(19030);
// var SS_URL = ctx.getClientUrl(19030);
module.exports.create = function(cfg)
{
var ss = new SS(cfg);
......@@ -29,13 +36,17 @@ var SS = function StorageService(p_cfg)
this.db = Db.create({'repos_dir':storage_cfg.repository,'context':this.context});
this.worker_pool = WorkerPool.create({'size':2});
//this.storagecaller = new SSCaller({'url':SS_URL});
}
SS.prototype.start = function()
{
console.log('Starting Storage Service ...\n');
this.amqp_start();
this.ipc_start();
setTimeout(function(){
this.http_start();
},5000);
}
SS.prototype.amqp_start = function()
......@@ -74,6 +85,40 @@ SS.prototype.amqp_start = function()
});
}
SS.prototype.ipc_start = function()
{
var self = this;
if(this.ipc_server){return;}
this.ipc_server = new SSServer({
url : SS_LISTEN,
name : 'storage_request'
});
this.ipc_server.set_remote_function(function(req,callback){
//console.log("IPC Command");
self.db.request(req,function(err,res){
if(err){
console.log(err);
}
callback(err,res);
});
});
this.ipc_server.start(function(err){
if(!err){
console.log('SS:IPC START\t\t\t[OK]');
}else{
console.log('SS:IPC START\t\t\t[ERR]');
console.log('SS:IPC ERROR Restarting ...');
setTimeout(function(){
process.exit(1);
},5000);
}
});
}
SS.prototype.http_start = function()
{
var self = this;
......@@ -81,15 +126,17 @@ SS.prototype.http_start = function()
var API_PORT = (this.config.storage.api_port)?this.config.storage.api_port:19080;
app.use(bodyParser.json({limit: '5mb'}));
app.use(bodyParser.json({limit: '64mb'}));
app.use(bodyParser.urlencoded({
extended: true
}));
var context = ctx.getLib('lib/ws/http-context');
this.storagecaller = new SSCaller({'url':SS_URL});
this.worker_pool.initWorker();
app.use(context.middleware({
'worker_pool' : self.worker_pool
'worker_pool' : self.worker_pool,
'storagecaller':self.storagecaller
}));
app.use(require('./ws'));
......
......@@ -3,5 +3,6 @@ var express = require('express')
router.use('/v0.1',require('./v0.1'));
router.use('/v1',require('./v0.1'));
router.use('/v1.1',require('./v1.1'));
module.exports = router;
var express = require('express');
var router = express.Router();
router.use('/object',require('./service-object'));
router.use('/storage',require('./service-storage'));
module.exports = router;
var ctx = require('../../../context');
var express = require('express');
var router = express.Router();
var path = require('path');
var fs = require('fs');
var cfg = ctx.config;
var storage_cfg = cfg.storage
var response = ctx.getLib('lib/ws/response');
var request = ctx.getLib('lib/ws/request');
var BinStream = ctx.getLib('lib/bss/binarystream_v1_1');
var ObjId = ctx.getLib('lib/bss/objid');
var BSData = ctx.getLib('lib/model/bsdata');
router.get('/:id',function (req, res) {
var reqHelper = request.create(req);
var respHelper = response.create(res);
var oid = req.params.id;
var opt = {}
get_object(reqHelper,respHelper,{'oid':oid,'opt':opt});
});
router.get('/:id/data',function (req, res) {
var reqHelper = request.create(req);
var respHelper = response.create(res);
var query = reqHelper.getQuery();
var oid = req.params.id;
var opt = {
'field' : 'data'
}
opt.filetype = (query.filetype)?query.filetype:null
get_object(reqHelper,respHelper,{'oid':oid,'opt':opt});
});
function get_object(reqHelper,respHelper,prm)
{
prm=prm||{};
var oid = prm.oid;
var opt = prm.opt || {};
if(!oid){
return respHelper.response404();
}
var bss_ab_path = "default.bss";
var path_token= oid.split('.');
if(path_token.length >= 2){
var col = "";
var file = path_token[path_token.length-2] + ".bss";
var col = path_token.slice(0,path_token.length-2).join('/');
if(col.length>0){col = col + '/'}
bss_ab_path = col + file;
}else{
//Only Id no storage name
//Default Action
}
var tkoId = path_token[path_token.length-1];
var bss_full_path = storage_cfg.repository + "/" + bss_ab_path;
var obj_id = '';
try{
obj_id = new ObjId(tkoId);
}catch(err){
return respHelper.response404();
}
var seq = obj_id.extract().seq;
fs.exists(bss_full_path,function(exists){
if(exists){
BinStream.open(bss_full_path,function(err,bss){
var rd = bss.reader();
rd.objectAt(seq,function(err,obj){
bss.close(function(err){
if(obj && obj_id.toString() == (new ObjId(obj.header.ID)).toString()){
output(respHelper,obj,opt);
}else{respHelper.response404();}
});
});
});
}else{
respHelper.response404();
}
});
}
function output(resp,obj,opt)
{
if(opt.field=='data')
{
data_out(resp,obj,opt);
}else{
obj_out(resp,obj,opt);
}
}
function obj_out(resp,obj,opt)
{
var ret = {"_id" : (new ObjId(obj.header.ID)).toString(),
"meta" : obj.meta
}
if(obj.header.TY==BinStream.BINARY_TYPE)
{
var bs = BSData.create(obj.data);
ret.data = bs.serialize('object-encoded');
}else{
ret.data = obj.data;
}
resp.responseOK(ret);
}
function data_out(resp,obj,opt)
{
var objType = obj.header.TY;
if(objType == BinStream.BINARY_TYPE){
if(opt.filetype){
resp.response.type(opt.filetype);
}else{
}
resp.response.send(obj.data);
}else if(objType == BinStream.STRING_TYPE){
resp.response.send(obj.data);
}else{
resp.responseOK(obj.data);
}
}
module.exports = router;
var ctx = require('../../../context');
var express = require('express');
var router = express.Router();
var path = require('path');
var fs = require('fs');
var async = require('async');
//var Worker = require("tiny-worker");
var cfg = ctx.config;
var storage_cfg = cfg.storage;
var response = ctx.getLib('lib/ws/response');
var request = ctx.getLib('lib/ws/request');
var BinStream = ctx.getLib('lib/bss/binarystream_v1_1');
var ObjId = ctx.getLib('lib/bss/objid');
var BSData = ctx.getLib('lib/model/bsdata');
var StorageUtils = ctx.getLib('storage-service/lib/storage-utils');
const AGENT_NAME = "Storage API";
router.put('/:id',function (req, res) {
var reqHelper = request.create(req);
var respHelper = response.create(res);
var q = reqHelper.getQuery();
var sname = req.params.id;
var json_body = req.body;
var storagecaller = req.context.storagecaller;
if(!json_body){
return respHelper.response400();
}
if(!sname)
{
return respHelper.response400();
}
var databody = Array.isArray(json_body)?json_body:[json_body];
var idx = 0;
async.whilst(
function() { return idx < databody.length; },
function(callback) {
//var el_data = databody[idx].data;
var el_data = (BSData.parse(databody[idx].data)==null)?BSData.create(databody[idx].data).serialize('object-encoded'):databody[idx].data;
var meta = databody[idx].meta;
var dc_meta = {
"_agent" : AGENT_NAME,
"_ts" : Math.round((new Date).getTime() / 1000)
}
if(meta && typeof meta == 'object')
{
Object.keys(meta).forEach((item)=>{
if(!item.startsWith('_')){
dc_meta[item] = meta[item];
}
});
}
send_storage(storagecaller,dc_meta,el_data,sname,function(err){
idx++;
if(!err){
callback(null);
}else{
callback(new Error('storage error'));
}
});
},
function (err) {
if(!err){
respHelper.response200();
}else{
respHelper.response400();
}
}
);
});
function send_storage(caller,dc_meta,dc_data,storage_name,cb)
{
var req = {
'object_type' : 'storage_request',
'command' : 'write',
'param' : {
'storage_name' : storage_name,
'meta' : dc_meta,
'data' : {
'type' : 'bsdata',
'value' : dc_data
}
}
}
caller.call(req,function(err,resp){
if(!err && resp.status=='OK'){
cb(null);
}else{
cb("error");
}
});
}
router.delete('/:id',function (req, res) {
var reqHelper = request.create(req);
var respHelper = response.create(res);
var q = reqHelper.getQuery();
var sname = req.params.id;
var caller = req.context.storagecaller;
var req = {
'object_type' : 'storage_request',
'command' : 'delete',
'param' : {
'storage_name' : sname
}
}
caller.call(req,function(err,resp){
if(!err && resp.status=='OK'){
respHelper.response200();
}else{
respHelper.response400(resp.msg);
}
});
});
router.get('/',function (req, res) {
var reqHelper = request.create(req);
var respHelper = response.create(res);
respHelper.responseOK(StorageUtils.list(storage_cfg.repository));
});
router.get('/:id/stats',function (req, res) {
var reqHelper = request.create(req);
var respHelper = response.create(res);
var sid = req.params.id;
if(!sid){
return respHelper.response404();
}
var storage_path = sid.split('.').join('/');
var bss_full_path = storage_cfg.repository + "/" + storage_path + ".bss";
fs.exists(bss_full_path,function(exists){
if(exists){
var fstat = stats = fs.statSync(bss_full_path);
BinStream.open(bss_full_path,function(err,bss){
var rd = bss.reader();
var obj_stat = {
"storagename" : sid,
"count" : rd.count(),
"filename" : storage_path + ".bss",
"filesize" : fstat.size
}
bss.close(function(err){
respHelper.responseOK(obj_stat);
});
});
}else{
respHelper.response404();
}
});
});
router.get('/:id/objects',function (req, res) {
var reqHelper = request.create(req);
var respHelper = response.create(res);
var sid = req.params.id;
var query = reqHelper.getQuery();
if(!query){query={};}
if(!sid){
return respHelper.response404();
}
var storage_path = sid.split('.').join('/');
var bss_full_path = storage_cfg.repository + "/" + storage_path + ".bss";
var from_seq = 1;
var limit = 0;
var sizelimit = 64 * 1000 * 1000;
//compat with v1
// param => offset
// param => obj_after
if(query.offset){query.obj_after=query.offset;}
if(query.obj_after){
var o_seq;
try{
var obj_id = new ObjId(query.obj_after);
o_seq = obj_id.extract().seq;
}catch(err){
return respHelper.response404();
}
from_seq = o_seq+1;
}
// param => limit
if(query.limit){
limit = Number(query.limit);
}
// param => sizelimit
if(query.sizelimit){
sizelimit = Number(query.sizelimit) * 1000 * 1000;
}
// param => output = [object],stream
var output_type = (query.output)?query.output:'object';
//compat with v1
// param => from
// param => seq_from
if(query.from){query.seq_from = query.from;}
if(query.seq_from){
from_seq = Number(query.seq_from);
}
// param => field = id,meta,[data]
var objOpt = {'meta':true,'data':true}
if(query.field == 'id'){
objOpt.meta = false;
objOpt.data = false;
}else if(query.field == 'meta'){
objOpt.data = false;
}
// param => last
var tail_no = query.last;
var rd_prm = {
'bss_full_path' : bss_full_path,
'tail_no' : tail_no,
'from_seq' : from_seq,
'limit' : limit,
'output_type' : output_type,
'objOpt' : objOpt,
'sizelimit' : sizelimit
}
var worker_pool = req.context.worker_pool;
var worker = worker_pool.get();
worker.resp = respHelper;
worker.output_type = output_type;
worker.execute({'cmd':'read','prm':rd_prm});
worker.on('start',function(data){
stream_start();
});
var firstline=true;
worker.on('data',function(data){
if(!firstline){
stream_newrec();
}else{firstline = false;}
stream_data(data);
});
worker.on('end',function(code){
if(code == '404')
{
end(404);
}else if(code == '200'){
stream_end();
end(200);
}
worker.shutdown();
//worker_pool.push(worker);
});
function stream_start()
{
var resp = worker.resp;
var type = worker.output_type;
if(type=='stream')
{
resp.type('text');
}else{
resp.type('application/json');
resp.write('[');
}
}
function stream_newrec()
{
var resp = worker.resp;
var type = worker.output_type;
if(type=='stream')
{
resp.write('\n');
}else{
resp.write(',');
}
}
function stream_data(data)
{
var resp = worker.resp;
resp.write(data);
}
function stream_end()
{
var resp = worker.resp;
var type = worker.output_type;
if(type=='stream')
{
resp.write('');
}else{
resp.write(']');
}
}
function end(code)
{
var resp = worker.resp;
if(code==404){
resp.response404()
}else{
resp.status(code).end();
}
}
});
module.exports = router;
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment