Commit def4692d authored by project's avatar project

--no commit message

--no commit message
parent a9ef971b
...@@ -16,3 +16,7 @@ module.exports.getPlugins = function(type,name) ...@@ -16,3 +16,7 @@ module.exports.getPlugins = function(type,name)
return require(path); return require(path);
} }
module.exports.sysenv = {
}
var util = require('util'); var util = require('util');
var domain = require('domain');
var async = require('async'); var async = require('async');
var domain = require('domain'); var domain = require('domain');
var crypto = require("crypto");
var EventEmitter = require('events').EventEmitter; var EventEmitter = require('events').EventEmitter;
var ctx = require('../../context'); var ctx = require('../../context');
var memstore = ctx.getLib('jobexecutor/lib/memstore'); var memstore = require('./memstore');
var bsdata = ctx.getLib('lib/model/bsdata'); var bsdata = ctx.getLib('lib/model/bsdata');
module.exports = JobTask; module.exports = JobTask;
...@@ -12,17 +14,31 @@ function JobTask (prm) ...@@ -12,17 +14,31 @@ function JobTask (prm)
{ {
EventEmitter.call(this); EventEmitter.call(this);
if(!prm.opt){prm.opt={};}
this.handle = prm.handle; this.handle = prm.handle;
this.mem = prm.handle.mem; this.mem = prm.handle.mem;
this.jobcfg = prm.job_config; this.jobcfg = prm.job_config;
this.input_data = prm.input_data; this.input_data = prm.input_data;
this.transaction_id = prm.transaction_id; this.transaction_id = prm.transaction_id;
this.job_timeout = prm.opt.job_timeout || 3000;
//0=>IDLE,1=>RUNNING,2=>DONE
this.state = 0;
}; };
util.inherits(JobTask, EventEmitter); util.inherits(JobTask, EventEmitter);
//handle.emit('done',{'status':'error','data':err}); //handle.emit('done',{'status':'error','data':err});
JobTask.prototype.stop = function (status)
{
if(this.state==1){
this.state = 2;
this.emit('done', status);
}
}
JobTask.prototype.run = function () JobTask.prototype.run = function ()
{ {
var self=this; var self=this;
...@@ -31,6 +47,7 @@ JobTask.prototype.run = function () ...@@ -31,6 +47,7 @@ JobTask.prototype.run = function ()
var job_tr_config = this.jobcfg; var job_tr_config = this.jobcfg;
var job_id = job_tr_config.job_id; var job_id = job_tr_config.job_id;
self.state = 1;
var ctx_transaction = { var ctx_transaction = {
"id" : transaction_id "id" : transaction_id
...@@ -80,14 +97,26 @@ JobTask.prototype.run = function () ...@@ -80,14 +97,26 @@ JobTask.prototype.run = function ()
}); });
} }
var jtimeout = setTimeout(function(){
self.stop({'status':'error','data':'job execution timeout'});
//self.emit('error',new Error('job execution timeout'))
},self.job_timeout);
async.waterfall([task_di,task_dt,task_do],function (err,resp) { async.waterfall([task_di,task_dt,task_do],function (err,resp) {
clearTimeout(jtimeout);
if(!err){ if(!err){
console.log('***** JOB SUCCESSFULLY DONE *****'); self.stop(resp)
//console.log('***** JOB SUCCESSFULLY DONE *****');
}else{ }else{
console.log('***** JOB UNSUCCESSFULLY DONE *****'); self.stop(err)
//console.log('***** JOB UNSUCCESSFULLY DONE *****');
} }
}); });
} }
function perform_di(prm,cb) function perform_di(prm,cb)
...@@ -147,6 +176,7 @@ function perform_do(prm,cb) ...@@ -147,6 +176,7 @@ function perform_do(prm,cb)
var dout = new DOTask(do_context,prm.request); var dout = new DOTask(do_context,prm.request);
dout.run(); dout.run();
dout.on('done',function(resp){ dout.on('done',function(resp){
cb(null,resp); cb(null,resp);
}); });
} }
...@@ -159,5 +189,6 @@ function getPlugins(type,name) ...@@ -159,5 +189,6 @@ function getPlugins(type,name)
function genTransactionId() function genTransactionId()
{ {
return "TR" + (new Date).getTime(); var id = crypto.randomBytes(3).toString("hex");
return "TR" + (new Date).getTime() + id;
} }
...@@ -7,18 +7,19 @@ function perform_function(context,response){ ...@@ -7,18 +7,19 @@ function perform_function(context,response){
var output_type = 'text' var output_type = 'text'
var data = 'hello world ' + transaction_id; var data = 'hello world ' + transaction_id;
// memstore.setItem('lasttransaction',transaction_id,function(err){ // memstore.setItem('lasttransaction',transaction_id,function(err){
// response.success(data); // response.success(data);
// }); // });
memstore.getItem('lasttransaction2',function(err,value){ // memstore.getItem('lasttransaction2',function(err,value){
console.log('key'); // console.log('key');
console.log(value); // console.log(value);
response.success(value); // response.success(value);
}); // });
setTimeout(function(){
response.success(data,output_type);
},2000)
//response.success(data,output_type); //response.success(data,output_type);
//response.reject(); //response.reject();
//response.error("error message") //response.error("error message")
......
...@@ -122,26 +122,26 @@ const crypto = require("crypto"); ...@@ -122,26 +122,26 @@ const crypto = require("crypto");
// console.log(val.t); // console.log(val.t);
// }); // });
// var redis = require('redis'); var redis = require('redis');
// var handle = {'mem' : redis.createClient('redis://:@bigmaster.igridproject.info:6379/1')} var handle = {'mem' : redis.createClient('redis://bigmaster.igridproject.info:6379/1')}
// var input_data = {}; var input_data = {};
// var job_config = { var job_config = {
// "job_id" : "example", "job_id" : "example",
// "active" : true, "active" : true,
// "trigger" : { "trigger" : {
// "type": "cron", "type": "cron",
// "cmd": "29,59 * * * * *" "cmd": "29,59 * * * * *"
// }, },
// "data_in" : { "data_in" : {
// "type": "example" "type": "example"
// }, },
// "data_transform" : { "data_transform" : {
// "type": "noop" "type": "noop"
// }, },
// "data_out" : { "data_out" : {
// "type": "console" "type": "console"
// } }
// } }
var ag = { var ag = {
"job_id" : "agritronics-gistda-01", "job_id" : "agritronics-gistda-01",
...@@ -191,8 +191,12 @@ var JobTask = ctx.getLib('jobworker/lib/jobtask'); ...@@ -191,8 +191,12 @@ var JobTask = ctx.getLib('jobworker/lib/jobtask');
var job = new JobTask({ var job = new JobTask({
'handle' : handle, 'handle' : handle,
'job_config' : job_config, 'job_config' : job_config,
'input_data' : input_data 'input_data' : input_data,
'opt' : {'job_timeout' :30000}
}); });
job.on('done',function(res){
console.log(res);
});
job.run(); job.run();
// async.reduce([1,2,3], 0, function(memo, item, callback) { // async.reduce([1,2,3], 0, function(memo, item, callback) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment