Commit 3fdbcdae authored by project's avatar project

--no commit message

--no commit message
parent 09ac83f1
var util = require('util'); var util = require('util');
var async = require('async');
var domain = require('domain');
var EventEmitter = require('events').EventEmitter; var EventEmitter = require('events').EventEmitter;
var ctx = require('../../context'); var ctx = require('../../context');
var memstore = ctx.getLib('jobexecutor/lib/memstore'); var memstore = ctx.getLib('jobexecutor/lib/memstore');
var bsdata = ctx.getLib('lib/model/bsdata');
function JobTask (prm) function JobTask (prm)
{ {
EventEmitter.call(this); EventEmitter.call(this);
this.handle = prm.handle; this.handle = prm.handle;
this.mem = prm.handle.mem;
this.jobcfg = prm.job_config; this.jobcfg = prm.job_config;
this.input_data = prm.input_data;
this.transaction_id = prm.transaction_id;
}; };
util.inherits(JobTask, EventEmitter); util.inherits(JobTask, EventEmitter);
...@@ -17,12 +24,33 @@ util.inherits(JobTask, EventEmitter); ...@@ -17,12 +24,33 @@ util.inherits(JobTask, EventEmitter);
JobTask.prototype.run = function () JobTask.prototype.run = function ()
{ {
var self=this;
var transaction_id = this.transaction_id || genTransactionId();
var input_data = this.input_data;
var job_tr_config = this.jobcfg;
var job_id = job_tr_config.job_id;
var ctx_transaction = {
"id" : tranId
}
var jobMem = new memstore({'job_id':job_id,'cat':'global','mem':self.mem})
var ctx_job = {
"memstore" : jobMem
}
var context = {
"jobconfig" : job_tr_config,
"transaction" : ctx_transaction,
"input_data" : input_data,
"job" : ctx_job
}
} }
function perform_di(context,cb) function perform_di(prm,cb)
{ {
var di_context = context; var di_context = prm.context;
var jobId = di_context.jobconfig.job_id; var jobId = di_context.jobconfig.job_id;
var di_cfg = di_context.jobconfig.data_in; var di_cfg = di_context.jobconfig.data_in;
...@@ -41,9 +69,9 @@ function perform_di(context,cb) ...@@ -41,9 +69,9 @@ function perform_di(context,cb)
}); });
} }
function perform_dt(context,request,cb) function perform_dt(prm,cb)
{ {
var dt_context = context var dt_context = prm.context
var jobId = dt_context.jobconfig.job_id; var jobId = dt_context.jobconfig.job_id;
var dt_cfg = dt_context.jobconfig.data_transform; var dt_cfg = dt_context.jobconfig.data_transform;
...@@ -55,7 +83,7 @@ function perform_dt(context,request,cb) ...@@ -55,7 +83,7 @@ function perform_dt(context,request,cb)
"memstore" : dtMem "memstore" : dtMem
} }
var dt = new DITask(dt_context,request); var dt = new DITask(dt_context,prm.request);
dt.run(); dt.run();
dt.on('done',function(resp){ dt.on('done',function(resp){
...@@ -63,9 +91,9 @@ function perform_dt(context,request,cb) ...@@ -63,9 +91,9 @@ function perform_dt(context,request,cb)
}); });
} }
function perform_do(context,request,cb) function perform_do(prm,cb)
{ {
var do_context = context var do_context = prm.context
var jobId = do_context.jobconfig.job_id; var jobId = do_context.jobconfig.job_id;
var do_cfg = do_context.jobconfig.data_out; var do_cfg = do_context.jobconfig.data_out;
...@@ -77,7 +105,7 @@ function perform_do(context,request,cb) ...@@ -77,7 +105,7 @@ function perform_do(context,request,cb)
"memstore" : doMem "memstore" : doMem
} }
var dout = new DOTask(do_context,request); var dout = new DOTask(do_context,prm.request);
dout.run(); dout.run();
dout.on('done',function(resp){ dout.on('done',function(resp){
cb(null,resp); cb(null,resp);
...@@ -89,3 +117,8 @@ function getPlugins(type,name) ...@@ -89,3 +117,8 @@ function getPlugins(type,name)
var path = '../plugins/' + type + '/' + type + '-' +name; var path = '../plugins/' + type + '/' + type + '-' +name;
return require(path); return require(path);
} }
function genTransactionId()
{
return "TR" + (new Date).getTime();
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment