Commit 2d01ad5d authored by project's avatar project

--no commit message

--no commit message
parent 3262a392
...@@ -2,17 +2,24 @@ var schedule = require('node-schedule'); ...@@ -2,17 +2,24 @@ var schedule = require('node-schedule');
var ctx = require('../context'); var ctx = require('../context');
var cfg = ctx.config; var cfg = ctx.config;
var amqp_cfg = ctx.config.amqp;
var ConnCtx = ctx.getLib('lib/conn/connection-context'); var ConnCtx = ctx.getLib('lib/conn/connection-context');
var CronList = ctx.getLib('lib/mems/cronlist'); var CronList = ctx.getLib('lib/mems/cronlist');
var QueueCaller = ctx.getLib('lib/amqp/queuecaller'); var QueueCaller = ctx.getLib('lib/amqp/queuecaller');
module.exports.create = function (cfg)
{
return new SchedulerService(cfg);
}
function SchedulerService(cfg) function SchedulerService(cfg)
{ {
this.config = cfg; this.config = cfg;
this.conn = ConnCtx.create(this.config); this.conn = ConnCtx.create(this.config);
this.mem = this.conn.getMemstore(); this.mem = this.conn.getMemstore();
this.jobcaller = new QueueCaller({'url':amqp_cfg.url,'name':'bs_jobs_queue'});
this.crons = CronList.create({'redis':this.mem}); this.crons = CronList.create({'redis':this.mem});
this.engine = []; this.engine = [];
...@@ -20,20 +27,24 @@ function SchedulerService(cfg) ...@@ -20,20 +27,24 @@ function SchedulerService(cfg)
SchedulerService.prototype.start = function () SchedulerService.prototype.start = function ()
{ {
console.log('SCHEDULER:Starting\t\t[OK]');
this.reload();
} }
SchedulerService.prototype.reload = function () SchedulerService.prototype.reload = function ()
{ {
console.log('SCHEDULER:Reloading CronList\t[OK]');
var self = this; var self = this;
self.clean();
self.crons.update(function(err){ self.crons.update(function(err){
var cl = self.crons.list; var cl = self.crons.list;
for(var i=0;i<cl.length;i++) for(var i=0;i<cl.length;i++)
{ {
var s = schedule.scheduleJob(cl[i].cmd, function(y){
self._callJob(y);
}.bind(null,cl[i]));
self.engine.push(s);
} }
}); });
} }
...@@ -47,3 +58,23 @@ SchedulerService.prototype.clean = function () ...@@ -47,3 +58,23 @@ SchedulerService.prototype.clean = function ()
} }
this.engine = []; this.engine = [];
} }
SchedulerService.prototype._callJob = function(cron)
{
var cmd = {
'object_type':'job_execute',
'source' : 'scheduler',
'jobId' : cron.jobid,
'option' : {},
'input_data' : {
'type' : 'bsdata',
'value' : {
'data_type' : 'object',
'data' : {}
}
}
}
this.jobcaller.send(cmd);
}
...@@ -43,7 +43,7 @@ JobTask.prototype.run = function () ...@@ -43,7 +43,7 @@ JobTask.prototype.run = function ()
{ {
var self=this; var self=this;
var transaction_id = this.transaction_id || genTransactionId(); var transaction_id = this.transaction_id || genTransactionId();
var input_data = this.input_data; var obj_input_data = getInputData(this.input_data);
var job_tr_config = this.jobcfg; var job_tr_config = this.jobcfg;
var job_id = job_tr_config.job_id; var job_id = job_tr_config.job_id;
...@@ -61,62 +61,95 @@ JobTask.prototype.run = function () ...@@ -61,62 +61,95 @@ JobTask.prototype.run = function ()
var context = { var context = {
"jobconfig" : job_tr_config, "jobconfig" : job_tr_config,
"transaction" : ctx_transaction, "transaction" : ctx_transaction,
"input_data" : input_data, "input_data" : obj_input_data,
"job" : ctx_job "job" : ctx_job
} }
var task_di = function (callback) { var task_di = function (callback) {
perform_di({'context':context,'handle':self} ,function(err,resp){ var dm_i = domain.create();
if(resp.status == 'success'){ dm_i.on('error', function(err) {
callback(null,resp); console.log('[di] plugins error');
}else{ console.log(err);
callback(resp); callback(err)
}
}); });
dm_i.run(function() {
perform_di({'context':context,'handle':self} ,function(err,resp){
if(resp.status == 'success'){
callback(null,resp);
}else{
callback(resp);
}
});
});
} }
var task_dt = function (request,callback) { var task_dt = function (request,callback) {
var dt_request = {'input_type':request.type,'data':request.data} var dt_request = {'input_type':request.type,'data':request.data}
perform_dt({'context':context,'request':dt_request,'handle':self},function(err,dt_resp){
if(dt_resp.status == 'success'){ var dm_t = domain.create();
callback(null,dt_resp); dm_t.on('error', function(err) {
}else { console.log('[dt] plugins error');
callback(dt_resp); console.log(err);
} callback(err)
});
dm_t.run(function() {
perform_dt({'context':context,'request':dt_request,'handle':self},function(err,dt_resp){
if(dt_resp.status == 'success'){
callback(null,dt_resp);
}else {
callback(dt_resp);
}
});
}); });
} }
var task_do = function (request,callback) { var task_do = function (request,callback) {
var do_request = {'input_type':request.type,'data':request.data} var do_request = {'input_type':request.type,'data':request.data}
perform_do({'context':context,'request':do_request,'handle':self},function(err,do_resp){
if(do_resp.status == 'success'){ var dm_o = domain.create();
callback(null,do_resp); dm_o.on('error', function(err) {
}else { console.log('[do] plugins error');
callback(do_resp); console.log(err);
} callback(err)
});
dm_o.run(function() {
perform_do({'context':context,'request':do_request,'handle':self},function(err,do_resp){
if(do_resp.status == 'success'){
callback(null,do_resp);
}else {
callback(do_resp);
}
});
}); });
} }
var jtimeout = setTimeout(function(){ var jtimeout = setTimeout(function(){
self.stop({'status':'error','data':'job execution timeout'}); self.stop({'status':'error','data':'job execution timeout'});
//self.emit('error',new Error('job execution timeout')) //self.emit('error',new Error('job execution timeout'))
},self.job_timeout); },self.job_timeout);
console.log('***** JOB RUNNING *****');
console.log('[JOB ID]\t\t: ' + job_id);
console.log('[TRANSACTION ID]\t: ' + transaction_id);
async.waterfall([task_di,task_dt,task_do],function (err,resp) { async.waterfall([task_di,task_dt,task_do],function (err,resp) {
clearTimeout(jtimeout); clearTimeout(jtimeout);
if(!err){ if(!err){
self.stop(resp) self.stop(resp)
//console.log('***** JOB SUCCESSFULLY DONE *****'); console.log('***** JOB SUCCESSFULLY DONE *****\n');
}else{ }else{
self.stop(err) self.stop(err)
//console.log('***** JOB UNSUCCESSFULLY DONE *****'); console.log('***** JOB UNSUCCESSFULLY DONE *****\n');
} }
}); });
} }
function perform_di(prm,cb) function perform_di(prm,cb)
...@@ -187,6 +220,18 @@ function getPlugins(type,name) ...@@ -187,6 +220,18 @@ function getPlugins(type,name)
return require(path); return require(path);
} }
function getInputData(obj)
{
if(obj.type == 'bsdata')
{
var inp = bsdata.parse(obj.value);
return inp.data;
}else{
return {};
}
}
function genTransactionId() function genTransactionId()
{ {
var id = crypto.randomBytes(3).toString("hex"); var id = crypto.randomBytes(3).toString("hex");
......
...@@ -46,7 +46,7 @@ JT.prototype.run = function (done) ...@@ -46,7 +46,7 @@ JT.prototype.run = function (done)
var task_prm = { var task_prm = {
'handle' : self.handle, 'handle' : self.handle,
'job_config' : jobCfg, 'job_config' : jobCfg,
'input_data' : self.input_data, 'input_data' : command.input_data,
'opt' : {'job_timeout' :60000} 'opt' : {'job_timeout' :60000}
} }
if(jobCfg.job_timeout){ if(jobCfg.job_timeout){
......
...@@ -39,7 +39,6 @@ JW.prototype.amqp_job_start = function () ...@@ -39,7 +39,6 @@ JW.prototype.amqp_job_start = function ()
}); });
self.amqp_server.set_execute_function(function(data,callback){ self.amqp_server.set_execute_function(function(data,callback){
var jt = new JobTransaction({'handle':self,'cmd':data}); var jt = new JobTransaction({'handle':self,'cmd':data});
jt.run(function(err){ jt.run(function(err){
if(err){ if(err){
......
...@@ -29,7 +29,7 @@ function perform_function(context,response){ ...@@ -29,7 +29,7 @@ function perform_function(context,response){
function extract_httpdata(dat) function extract_httpdata(dat)
{ {
if(dat.object_type == 'httpdata'){ if(dat.object_type && dat.object_type == 'httpdata'){
return dat.data; return dat.data;
}else{ }else{
return dat; return dat;
......
var ctx = require('./context');
var SchedulerService = ctx.getLib('coreservice/scheduler');
var ss = SchedulerService.create(ctx.config);
ss.start();
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment