Commit 3bb41af4 authored by project's avatar project

--no commit message

--no commit message
parent e1374e09
var async = require('async');
var schedule = require('node-schedule');
var storage = require('node-persist');
storage.initSync({
......@@ -6,21 +7,45 @@ storage.initSync({
var memstore = require('./lib/memstore');
var CFG_FILE = "./jobs/example.json";
var TRACKING = 2;
var ONTRIGGER = true;
var args = process.argv.slice(2);
if(args.length > 0){
CFG_FILE = "./jobs/" + args[0];
if(args.length==1){
CFG_FILE = "./jobs/" + args[0];
}else{
for(var i=0;i<args.length-1;i++){
switch (args[i]) {
case '-log0':
TRACKING = 0;
break;
case '-log1' :
TRACKING = 1;
break;
case '-log2' :
TRACKING = 2;
break;
case '-notrigger' :
ONTRIGGER=false;
break;
}
}
}
}
var jobcfg = require(CFG_FILE);
if(jobcfg.trigger && jobcfg.trigger.type == 'cron')
track('[SETTING UP JOB] : ' + CFG_FILE,TRACKING>0);
if(jobcfg.trigger && jobcfg.trigger.type == 'cron' && ONTRIGGER)
{
var triggercfg = jobcfg.trigger;
var cron = jobcfg.trigger.cmd;
console.log('[SCHEDULE MODE]');
console.log('[CRON]\t\t: ' + cron);
track('[SCHEDULE MODE]',TRACKING>0);
track('[CRON]\t\t: ' + cron,TRACKING>0);
var j = schedule.scheduleJob(cron, function(){
run_job(jobcfg);
......@@ -44,32 +69,58 @@ function run_job(cfg)
"transaction" : transaction
}
console.log('***** JOB RUNNING *****\n[TRANSACTION ID]\t: ' + transaction.id + '\n');
//process di
perform_di(context,function(err,resp){
if(resp.status == 'success' && context.jobconfig.data_transform){
var dt_request = {'type':resp.type,'data':resp.data}
perform_dt(context,dt_request,function(err,dt_resp){
console.log('***** JOB DONE *****\n\n');
});
}else{
console.log('***** JOB DONE *****\n\n');
}
track('***** JOB RUNNING *****\n[TRANSACTION ID]\t: ' + transaction.id + '\n',TRACKING>0);
async.waterfall([
function(callback){
perform_di(context,function(err,resp){
if(resp.status == 'success'){
callback(null,resp);
}else{
callback(resp);
}
});
},
function(request,callback){
var dt_request = {'type':request.type,'data':request.data}
perform_dt(context,dt_request,function(err,dt_resp){
if(dt_resp.status == 'success'){
callback(null,dt_resp);
}else {
callback(dt_resp);
}
});
},
function(request,callback){
var do_request = {'type':request.type,'data':request.data}
perform_do(context,do_request,function(err,do_resp){
if(do_resp.status == 'success'){
callback(null,do_resp);
}else {
callback(dt_resp);
}
});
}
],
function(err,resp){
if(err){
track('\n***** JOB UNSUCCESSFULLY DONE *****\n\n',TRACKING>0);
}else{
track('\n***** JOB SUCCESSFULLY DONE *****\n\n',TRACKING>0);
}
});
}
function perform_di(context,cb)
{
console.log('[RUNNING DI]');
track('[RUNNING DI]',TRACKING>1);
var di_context = context;
var jobId = di_context.jobconfig.job_id;
var di_cfg = di_context.jobconfig.data_in;
console.log('[DI_PLUGIN]\t\t: ' + di_cfg.type);
track('[DI_PLUGIN]\t\t: ' + di_cfg.type,TRACKING>1);
var DITask = getPlugins('di',di_cfg.type);
var mempref = "ms." + jobId + '.di';
var diMem = new memstore(mempref,storage);
......@@ -80,22 +131,22 @@ function perform_di(context,cb)
var di = new DITask(di_context);
di.run();
di.on('done',function(resp){
console.log('[DI_OUTPUT_TYPE]\t: ' + resp.type);
console.log('[DI_STATUS]\t\t: ' + resp.status);
console.log('>>' + resp.data);
track('[DI_OUTPUT_TYPE]\t: ' + resp.type,TRACKING>1);
track('[DI_STATUS]\t\t: ' + resp.status,TRACKING>1);
track('DATA>>' + resp.data,TRACKING>1);
cb(null,resp);
});
}
function perform_dt(context,request,cb)
{
console.log('\n\n[RUNNING DT]');
track('\n[RUNNING DT]',TRACKING>1);
var dt_context = context
var jobId = dt_context.jobconfig.job_id;
var dt_cfg = dt_context.jobconfig.data_transform;
console.log('[DT_PLUGIN]\t\t: ' + dt_cfg.type);
track('[DT_PLUGIN]\t\t: ' + dt_cfg.type,TRACKING>1);
var DITask = getPlugins('dt',dt_cfg.type);
var mempref = "ms." + jobId + '.dt';
var dtMem = new memstore(mempref,storage);
......@@ -106,22 +157,22 @@ function perform_dt(context,request,cb)
var dt = new DITask(dt_context,request);
dt.run();
dt.on('done',function(resp){
console.log('[DT_OUTPUT_TYPE]\t: ' + resp.type);
console.log('[DT_STATUS]\t\t: ' + resp.status);
console.log('>>' + resp.data);
track('[DT_OUTPUT_TYPE]\t: ' + resp.type,TRACKING>1);
track('[DT_STATUS]\t\t: ' + resp.status,TRACKING>1);
track('DATA>>' + resp.data,TRACKING>1);
cb(null,resp);
});
}
function perform_do(context,request,cb)
{
console.log('\n\n[RUNNING DO]');
track('\n[RUNNING DO]',TRACKING>1);
var do_context = context
var jobId = do_context.jobconfig.job_id;
var do_cfg = do_context.jobconfig.data_out;
console.log('[DO_PLUGIN]\t\t: ' + do_cfg.type);
track('[DO_PLUGIN]\t\t: ' + do_cfg.type,TRACKING>1);
var DOTask = getPlugins('do',do_cfg.type);
var mempref = "ms." + jobId + '.do';
var doMem = new memstore(mempref,storage);
......@@ -132,12 +183,19 @@ function perform_do(context,request,cb)
var dout = new DOTask(do_context,request);
dout.run();
dout.on('done',function(resp){
console.log('[DO_STATUS]\t\t: ' + resp.status);
console.log('>>' + resp.data);
track('[DO_STATUS]\t\t: ' + resp.status,TRACKING>1);
//console.log('>>' + resp.data);
cb(null,resp);
});
}
function track(str,is_print)
{
if(is_print){
console.log(str);
}
}
function getPlugins(type,name)
{
var path = '../plugins/' + type + '/' + type + '-' +name;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment