Commit 6d10a3e2 authored by project's avatar project

--no commit message

--no commit message
parent d3ed8814
{ {
"name": "node-bigstream", "name": "node-bigstream",
"description": "", "description": "",
"version": "0.0.1", "version": "0.0.1",
"main": "./bigstream.js", "main": "./bigstream.js",
"author": { "author": {
"name": "Kamron Aroonrua", "name": "Kamron Aroonrua",
"email": "kamron.aroonrua@nectec.or.th" "email": "kamron.aroonrua@nectec.or.th"
}, },
"keywords": [], "keywords": [],
"licenses": { "licenses": {
"type": "none" "type": "none"
}, },
"dependencies": { "dependencies": {
"amqplib": "^0.4.2", "amqplib": "^0.4.2",
"async": "^2.0.1", "async": "^2.0.1",
"body-parser": "^1.15.2", "body-parser": "^1.15.2",
...@@ -19,7 +19,8 @@ ...@@ -19,7 +19,8 @@
"buffalo": "^0.1.3", "buffalo": "^0.1.3",
"dateformat": "^1.0.12", "dateformat": "^1.0.12",
"express": "^4.14.0", "express": "^4.14.0",
"node-persist": "^2.0.7",
"query-string": "^4.2.3", "query-string": "^4.2.3",
"random-access-file": "^1.3.0" "random-access-file": "^1.3.0"
} }
} }
\ No newline at end of file
function execute_function(context,response){ function execute_function(context,response){
var job_id = context.jobconfig.job_id; var job_id = context.jobconfig.job_id;
var transaction_id = context.job.transaction_id; var transaction_id = context.transaction.id;
var param = context.jobconfig.data_in.param; var param = context.jobconfig.data_in.param;
var memstore = context.job.memstore var memstore = context.task.memstore
var data = 'hello world'; var output_type = 'text'
var data = 'hello world ' + transaction_id;
//memstore.setvalue('timestamp.xxx',ts) // memstore.setItem('lasttransaction',transaction_id,function(err){
// response.success(data);
// });
// memstore.getItem('lasttransaction',function(err,value){
// response.success(value);
// });
response.success(data,output_type);
//response.reject(); //response.reject();
//response.error("error message") //response.error("error message")
//response.success(data);
} }
module.exports = execute_function; module.exports = execute_function;
...@@ -31,20 +31,40 @@ module.exports = DIPlugin; ...@@ -31,20 +31,40 @@ module.exports = DIPlugin;
function DIResponse(handle){ function DIResponse(handle){
this.handle = handle; this.handle = handle;
this.status = null;
this.data = null;
this.output_type = '*';
} }
DIResponse.prototype.success = function(data){ DIResponse.prototype.success = function(data,type){
this.handle.emit('done',response('success',data)); var handle = this.handle;
process.nextTick(function(){
handle.emit('done',response('success',data,type));
});
} }
DIResponse.prototype.error = function(err){ DIResponse.prototype.error = function(err){
this.handle.emit('done',response('error',err)); var handle = this.handle;
process.nextTick(function(){
handle.emit('done',response('error',err));
});
} }
DIResponse.prototype.reject = function(){ DIResponse.prototype.reject = function(){
this.handle.emit('done',response('reject',null)); var handle = this.handle;
process.nextTick(function(){
handle.emit('done',response('reject',null));
});
} }
function response(status,data){ function response(status,data,type){
return {'status':status,'data':data} var resp = {'status':status,'data':data};
if(type){
resp.type = type;
this.output_type = type;
this.data = data;
this.status = status;
}
return resp;
} }
{
"job_id" : "001",
"active" : true,
"trigger" : {
"type": "cron",
"cmd": "*/10 * * * *"
},
"data_in" : {
"type": "agritronics",
"profile": {
"station_id": "GISDA-01",
"latitude": "",
"longitude": ""
},
"param": {
"url": "http://agritronics.nstda.or.th/ws/get.php",
"appkey": "0c5a295bd8c07a081f4f0061eee6665c38",
"station_id": "GISTDA-01",
"data_types": ["1", "2", "4", "5", "6", "7", "8", "10", "2021", "2022"],
"node_id": "4096",
"node_id_camera": "7328",
"init_date_observed": "2016-11-23",
"init_time_observed": "09:00:05"
}
},
"data_transform" : {
"type": "agritronics"
},
"data_out" : {
"type": "bss-storage",
"param" : {
"name": "sds.agritronics"
}
}
}
{
"job_id" : "001",
"active" : true,
"trigger" : {
"type": "cron",
"cmd": "*/10 * * * *"
},
"data_in" : {
"type": "http-request",
"param": {
"url": "http://agritronics.nstda.or.th/ws/get.php"
}
},
"data_out" : {
"type": "bss-storage",
"param" : {
"name": "sds.agritronics"
}
}
}
function memstore(pref,storage){
this.prefix = pref;
this.storage = storage;
}
memstore.prototype.setItem = function(k,v,cb){
var key = this.prefix + "." + k;
this.storage.setItem(key,v,cb);
}
memstore.prototype.getItem = function(k,cb)
{
var key = this.prefix + "." + k;
this.storage.getItem(key,cb);
}
module.exports = memstore;
var storage = require('node-persist');
storage.initSync({
dir:'db'
});
var memstore = require('./lib/memstore');
var CFG_FILE = "./jobs/testhttp.json";
var jobcfg = require(CFG_FILE);
run_job(jobcfg);
function run_job(cfg)
{
var jobconfig = cfg;
var tranId = "TR" + (new Date).getTime();
var transaction = {
"id" : tranId
}
var context = {
"jobconfig" : jobconfig,
"transaction" : transaction
}
//process di
perform_di(context,function(err,resp){
console.log('>> ' + resp.data);
});
}
function perform_di(context,cb)
{
var di_context = context;
var jobId = di_context.jobconfig.job_id;
var di_cfg = di_context.jobconfig.data_in;
var DITask = getPlugins('di',di_cfg.type);
var mempref = "ms." + jobId + '.di';
var diMem = new memstore(mempref,storage);
di_context.task = {
"memstore" : diMem
}
var di = new DITask(di_context);
di.run();
di.on('done',function(response){
cb(null,response);
});
}
function getPlugins(type,name)
{
var path = '../plugins/' + type + '/' + type + '-' +name;
return require(path);
}
...@@ -6,3 +6,7 @@ di.run(); ...@@ -6,3 +6,7 @@ di.run();
di.on('done',function(response){ di.on('done',function(response){
console.log('>> ' + response.data); console.log('>> ' + response.data);
}); });
di.on('start',function(){
console.log('>> start');
});
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment