Commit 3262a392 authored by project's avatar project

--no commit message

--no commit message
parent e223761a
var schedule = require('node-schedule');
var ctx = require('../context');
var cfg = ctx.config;
var ConnCtx = ctx.getLib('lib/conn/connection-context');
var CronList = ctx.getLib('lib/mems/cronlist');
var QueueCaller = ctx.getLib('lib/amqp/queuecaller');
function SchedulerService(cfg)
{
this.config = cfg;
this.conn = ConnCtx.create(this.config);
this.mem = this.conn.getMemstore();
this.crons = CronList.create({'redis':this.mem});
this.engine = [];
}
SchedulerService.prototype.start = function ()
{
}
SchedulerService.prototype.reload = function ()
{
var self = this;
self.crons.update(function(err){
var cl = self.crons.list;
for(var i=0;i<cl.length;i++)
{
}
});
}
SchedulerService.prototype.clean = function ()
{
var arrEngine = this.engine;
for(var i=0;i<arrEngine.length;i++)
{
arrEngine[i].cancel();
}
this.engine = [];
}
......@@ -22,14 +22,14 @@ function JobTask (prm)
this.jobcfg = prm.job_config;
this.input_data = prm.input_data;
this.transaction_id = prm.transaction_id;
this.job_timeout = prm.opt.job_timeout || 3000;
this.job_timeout = prm.opt.job_timeout || 60000;
//0=>IDLE,1=>RUNNING,2=>DONE
this.state = 0;
};
util.inherits(JobTask, EventEmitter);
//handle.emit('done',{'status':'error','data':err});
JobTask.prototype.stop = function (status)
{
......
var util = require('util');
var crypto = require("crypto");
var EventEmitter = require('events').EventEmitter;
var async = require('async');
var ctx = require('../../context');
var cfg = ctx.config;
var JobTask = ctx.getLib('jobworker/lib/jobtask');
var JT = function JobTransaction(prm)
{
EventEmitter.call(this);
this.handle = prm.handle;
this.transaction_id = genTransactionId();
this.cmd = prm.cmd;
}
module.exports = JT;
util.inherits(JT, EventEmitter);
JT.prototype.run = function (done)
{
var self = this;
var job_registry = self.handle.job_registry;
var command = self.cmd;
if(!validate_execute_cmd(command)){
return done("invalid command");
}
var jobId = command.jobId;
var getJobCfg = function(callback){
//red jobconfig
job_registry.getJob(jobId,function(err,data){
if(!data){
callback('job ' + jobId + ' does not exits');
}else{
callback(err,data);
}
})
}
var runJobTask = function(jobCfg,callback){
var task_prm = {
'handle' : self.handle,
'job_config' : jobCfg,
'input_data' : self.input_data,
'opt' : {'job_timeout' :60000}
}
if(jobCfg.job_timeout){
task_prm.opt.job_timeout = jobCfg.job_timeout;
}
var job = new JobTask(task_prm);
job.on('done',function(res){
callback(null)
});
job.run();
}
async.waterfall([getJobCfg,runJobTask]
, function (err) {
if(!err){
done(null);
}else{
done(err);
}
});
}
function validate_execute_cmd(cmd)
{
if(cmd.object_type && cmd.object_type == 'job_execute')
{
return true;
}
return false;
}
function genTransactionId()
{
var id = crypto.randomBytes(3).toString("hex");
return "TR" + (new Date).getTime() + id;
}
......@@ -2,7 +2,10 @@ var ctx = require('../context');
var cfg = ctx.config;
var QueueReceiver = ctx.getLib('lib/amqp/queuereceiver');
var ConnCtx = ctx.getLib('lib/conn/connection-context');
var JobRegistry = ctx.getLib('lib/mems/job-registry');
var JobTransaction = require('./lib/jobtransaction')
module.exports.create = function(prm)
{
var jw = new JW(prm);
......@@ -15,6 +18,9 @@ var JW = function JobWorker (prm)
this.config = param.config || cfg;
this.instance_name = param.name;
this.conn = ConnCtx.create(this.config);
this.mem = this.conn.getMemstore()
this.job_registry = JobRegistry.create({'redis':this.mem});
}
JW.prototype.start = function ()
......@@ -24,5 +30,28 @@ JW.prototype.start = function ()
JW.prototype.amqp_job_start = function ()
{
var self=this;
if(self.amqp_server){return;}
self.amqp_server = new QueueReceiver({
url : self.conn.getAmqpUrl(),
name : 'bs_jobs_queue'
});
self.amqp_server.set_execute_function(function(data,callback){
var jt = new JobTransaction({'handle':self,'cmd':data});
jt.run(function(err){
if(err){
console.log(err);
}
callback();
});
});
self.amqp_server.start(function(err){
console.log('worker start');
})
}
......@@ -34,10 +34,13 @@ function QueueCaller(config)
}
QueueCaller.prototype.send = function(msg,cb)
QueueCaller.prototype.send = function(msg)
{
var self=this;
this.open(function(err){
if(err){
console.log(err);
}
self.ch.sendToQueue(self.name, new Buffer(JSON.stringify(msg)), {persistent: true});
});
}
......@@ -45,7 +48,9 @@ QueueCaller.prototype.send = function(msg,cb)
QueueCaller.prototype.close = function(cb)
{
var self=this;
this.open(function(err){
self.conn.close(cb);
});
}
module.exports = QueueCaller;
var Redis = require('ioredis');
var Redis = require('redis');
module.exports.create = function(cfg,opt)
{
var conn = new ConnectionContext(cfg,opt);
var conn = new ConnCtx(cfg,opt);
return conn;
}
function ConnCtx(cfg,opt)
var ConnCtx = function ConnectionContext(cfg,opt)
{
this.config = cgf;
this.config = cfg;
}
ConnCtx.prototype.getMemstore = function(opt)
{
if(!this.redis){
this.redis = new Redis(this.config.memstore.url);
this.redis = Redis.createClient(this.config.memstore.url);
}
return this.redis;
}
ConnCtx.prototype.getAmqpUrl = function()
{
return this.config.amqp.url;
}
var redis = require('redis');
const PREFIX = 'bs:scheduler:cronlist';
module.exports.create = function(cfg)
{
return new CronList(cfg);
}
module.exports.mkCron = function(name,cmd,jobid,opt)
{
var a = {
'name' : name,
'cmd' : cmd,
'jobid' : jobid
}
if(opt){a.opt = opt}
return a;
}
function CronList(cfg)
{
this.config = cfg;
if(cfg.conn){
this.mem = redis.createClient(cfg.conn);
}else if(cfg.redis){
this.mem = cfg.redis;
}else{
this.mem = null;
}
this.list = [];
}
CronList.prototype.add = function(cron)
{
var found = false;
this.list.forEach( function (val) {
if(val.name == cron.name){
found = true;
}
});
if(!found){
this.list.push(cron);
}
}
CronList.prototype.clean = function()
{
this.list = [];
}
CronList.prototype.update = function(cb)
{
var self=this;
this.mem.get(PREFIX, function (err, result) {
if(!err && result){
self.list = JSON.parse(result);
}
cb(err);
});
}
CronList.prototype.commit = function(cb)
{
var strlist = JSON.stringify(this.list);
this.mem.set(PREFIX,strlist);
if(typeof cb == 'function'){
cb();
}
}
CronList.prototype.getList = function()
{
return this.list;
}
var redis = require('redis');
const PREFIX = 'bs:jobs';
module.exports.create = function(cfg)
{
return new JobRegistry(cfg);
}
function JobRegistry(cfg)
{
this.config = cfg;
if(cfg.conn){
this.mem = redis.createClient(cfg.conn);
}else if(cfg.redis){
this.mem = cfg.redis;
}else{
this.mem = null;
}
}
JobRegistry.prototype.getJob = function(jobid,cb)
{
var self = this;
var jobKey = PREFIX + ':' + jobid;
this.mem.get(jobKey,function(err,data){
cb(err,JSON.parse(data));
});
}
......@@ -48,3 +48,9 @@ var job_execute = {
}
}
}
var cron = {
'name':'job01',
'cron':'*/10 * * * * *',
'jobid':'job01'
}
......@@ -17,9 +17,16 @@ function perform_function(context,response){
// response.success(value);
// });
var ts = (new Date).getTime();
var delay = 0;
if(ts%9==0){
//delay = 10000;
}
setTimeout(function(){
response.success(data,output_type);
},2000)
},delay)
//response.success(data,output_type);
//response.reject();
//response.error("error message")
......
This diff is collapsed.
var ctx = require('../context');
var amqp_cfg = ctx.config.amqp;
var QueueCaller = ctx.getLib('lib/amqp/queuecaller');
var qc = new QueueCaller({'url':'amqp://bigmaster.igridproject.info','name':'bs_jobs_queue'});
var cmd = {
'object_type':'job_execute',
'source' : 'http_listener',
'jobId' : 'job01',
'option' : {},
'input_data' : {
'type' : 'bsdata',
'value' : {
'data_type' : 'object',
'data' : {'name':'gcs'}
}
}
}
qc.send(cmd);
qc.close(function(){
console.log('closed');
})
......@@ -186,18 +186,18 @@ var ag = {
}
var JobTask = ctx.getLib('jobworker/lib/jobtask');
var job = new JobTask({
'handle' : handle,
'job_config' : job_config,
'input_data' : input_data,
'opt' : {'job_timeout' :30000}
});
job.on('done',function(res){
console.log(res);
});
job.run();
// var JobTask = ctx.getLib('jobworker/lib/jobtask');
//
// var job = new JobTask({
// 'handle' : handle,
// 'job_config' : job_config,
// 'input_data' : input_data,
// 'opt' : {'job_timeout' :30000}
// });
// job.on('done',function(res){
// console.log(res);
// });
//job.run();
// async.reduce([1,2,3], 0, function(memo, item, callback) {
// // pointless async:
......@@ -217,3 +217,37 @@ job.run();
// console.log(result);
// }
// });
var client = redis.createClient('redis://bigmaster.igridproject.info:6379/1');
//
// client.keys('bs:jobs:*', function (err, keys) {
// if (err) return console.log(err);
//
// for(var i = 0, len = keys.length; i < len; i++) {
// console.log(keys[i]);
// client.get(keys[i], function(err, data){
// console.log(data);
// });
// }
// });
// var CronList = ctx.getLib('lib/mems/cronlist');
//
// var crons = CronList.create({'conn':'redis://:@bigmaster.igridproject.info:6379/1'});
//
// crons.add({'name':'job01','cmd':'*/10 * * * * *','jobid':'job01'})
// crons.commit();
//
// crons.update(function(err){
// console.log(crons.list);
// });
var schedule = require('node-schedule');
var cron = '*/10 * * * * *';
var x = 'Tada!';
var j = schedule.scheduleJob(cron, function(y){
console.log(y);
}.bind(null,x));
x = 'Changing Data';
console.log(x);
var ctx = require('../context');
var amqp_cfg = ctx.config.amqp;
var QueueCaller = ctx.getLib('lib/amqp/queuecaller');
var qc = new QueueCaller({'url':'amqp://bigmaster.igridproject.info','name':'bs_jobs_queue'});
var cmd = {
'object_type':'job_execute',
'source' : 'http_listener',
'jobId' : 'job02',
'option' : {},
'input_data' : {
'type' : 'bsdata',
'value' : {
'data_type' : 'object',
'data' : {'name':'gcs'}
}
}
}
qc.send(cmd);
setTimeout(function(){
qc.close(function(){
console.log('closed');
})
},500)
......@@ -3,6 +3,26 @@ var amqp_cfg = ctx.config.amqp;
var QueueCaller = ctx.getLib('lib/amqp/queuecaller');
var qc = new QueueCaller({'url':'amqp://bigmaster.igridproject.info'});
var qc = new QueueCaller({'url':'amqp://bigmaster.igridproject.info','name':'bs_jobs_queue'});
qc.send({'name':'kamron'});
var job_execute_cmd = {
'object_type':'job_execute',
'source' : 'http_listener',
'jobId' : 'job01',
'option' : {},
'input_data' : {
'type' : 'bsdata',
'value' : {
'data_type' : 'object',
'data' : {'name':'gcs'}
}
}
}
// setInterval(function(){
// console.log('send');
// qc.send(job_execute_cmd);
// },1000)
qc.send(job_execute_cmd);
var ctx = require('./context');
var JobWorker = ctx.getLib('jobworker/worker');
var worker = JobWorker.create({'config':ctx.config,'name':'worker'});
worker.start();
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment