Commit 1b860f3a authored by project's avatar project

--no commit message

--no commit message
parent 78a1b2ff
...@@ -71,7 +71,7 @@ SchedulerService.prototype._callJob = function(cron) ...@@ -71,7 +71,7 @@ SchedulerService.prototype._callJob = function(cron)
'object_type':'job_execute', 'object_type':'job_execute',
'source' : 'scheduler', 'source' : 'scheduler',
'jobId' : cron.jobid, 'jobId' : cron.jobid,
'option' : {}, 'option' : {'exe_level':'secondary'},
'input_data' : { 'input_data' : {
'type' : 'bsdata', 'type' : 'bsdata',
'value' : { 'value' : {
......
...@@ -5,8 +5,11 @@ var app = express(); ...@@ -5,8 +5,11 @@ var app = express();
var bodyParser = require('body-parser'); var bodyParser = require('body-parser');
var cfg = ctx.config; var cfg = ctx.config;
var ConnCtx = ctx.getLib('lib/conn/connection-context');
var HttpACL = ctx.getLib('lib/mems/http-acl'); var HttpACL = ctx.getLib('lib/mems/http-acl');
var EvenPub = ctx.getLib('lib/amqp/event-pub'); var EvenPub = ctx.getLib('lib/amqp/event-pub');
var QueueCaller = ctx.getLib('lib/amqp/queuecaller');
var EvenSub = ctx.getLib('lib/amqp/event-sub');
const JOBCHANEL = 'bs_job_cmd'; const JOBCHANEL = 'bs_job_cmd';
...@@ -20,13 +23,16 @@ function HTTPListener(cfg) ...@@ -20,13 +23,16 @@ function HTTPListener(cfg)
{ {
this.config = cfg; this.config = cfg;
this.httpacl = HttpACL.create({'conn':this.config.memstore.url}); this.httpacl = HttpACL.create({'conn':this.config.memstore.url});
this.evp = new EvenPub({'url':this.config.amqp.url,'name':JOBCHANEL}); this.jobcaller = new QueueCaller({'url':this.config.amqp.url,'name':'bs_jobs_cmd'});
this.evs = new EvenSub({'url':this.config.amqp.url,'name':'bs_trigger_cmd'});
//this.evp = new EvenPub({'url':this.config.amqp.url,'name':JOBCHANEL});
} }
HTTPListener.prototype.start = function() HTTPListener.prototype.start = function()
{ {
console.log('Starting HTTP Listener ...\n'); console.log('Starting HTTP Listener ...\n');
this.http_start(); this.http_start();
this._controller_start();
} }
HTTPListener.prototype.http_start = function() HTTPListener.prototype.http_start = function()
...@@ -51,7 +57,7 @@ HTTPListener.prototype.http_start = function() ...@@ -51,7 +57,7 @@ HTTPListener.prototype.http_start = function()
var context = require('./lib/http-context'); var context = require('./lib/http-context');
app.use(context.middleware({ app.use(context.middleware({
'httpacl' : self.httpacl, 'httpacl' : self.httpacl,
'evp' : self.evp 'jobcaller' : self.jobcaller
})); }));
app.use(require('./ws')); app.use(require('./ws'));
...@@ -62,5 +68,39 @@ HTTPListener.prototype.http_start = function() ...@@ -62,5 +68,39 @@ HTTPListener.prototype.http_start = function()
console.log('WWW:HTTP START\t\t[OK]'); console.log('WWW:HTTP START\t\t[OK]');
}); });
}
HTTPListener.prototype._controller_start = function ()
{
var self=this;
var topic = 'ctl.trigger.#';
self.evs.sub(topic,function(err,msg){
if(!msg){return;}
var ctl = msg.data;
if(ctl.trigger_type != 'http' && ctl.trigger_type != 'all')
{
return;
}
if(ctl.cmd == 'reload')
{
console.log('WWW:Reloading ACL\t[OK]');
self.reload();
}
});
}
HTTPListener.prototype.reload = function ()
{
var self = this;
self.httpacl.update(function(err){
if(!err){
console.log('WWW:ACL Update\t\t[OK]');
}else{
console.log('WWW:ACL Update\t\t[ERR]');
}
});
} }
...@@ -5,9 +5,11 @@ var express = require('express'); ...@@ -5,9 +5,11 @@ var express = require('express');
var router = express.Router(); var router = express.Router();
var cfg = ctx.config; var cfg = ctx.config;
var response = ctx.getLib('lib/ws/response'); var response = ctx.getLib('lib/ws/response');
var request = ctx.getLib('lib/ws/request'); var request = ctx.getLib('lib/ws/request');
var process_get = function(req, res) { var process_get = function(req, res) {
var reqHelper = request.create(req); var reqHelper = request.create(req);
var respHelper = response.create(res); var respHelper = response.create(res);
...@@ -15,7 +17,8 @@ var process_get = function(req, res) { ...@@ -15,7 +17,8 @@ var process_get = function(req, res) {
var ctx = req.context; var ctx = req.context;
var httpacl = req.context.httpacl; var httpacl = req.context.httpacl;
var evp = req.context.evp; //var evp = req.context.evp;
var jobcaller = req.context.jobcaller;
var j = httpacl.findJob(appkey,'get'); var j = httpacl.findJob(appkey,'get');
...@@ -45,7 +48,8 @@ var process_get = function(req, res) { ...@@ -45,7 +48,8 @@ var process_get = function(req, res) {
var msg = job_execute_msg; var msg = job_execute_msg;
msg.jobId = item.jobid; msg.jobId = item.jobid;
evp.send(topic,msg); jobcaller.send(msg);
//evp.send(topic,msg);
}); });
if(j.length > 0) if(j.length > 0)
......
var ctx = require('../context'); var ctx = require('../context');
var cfg = ctx.config; var cfg = ctx.config;
var QueueCaller = ctx.getLib('lib/amqp/queuecaller');
var QueueReceiver = ctx.getLib('lib/amqp/queuereceiver'); var QueueReceiver = ctx.getLib('lib/amqp/queuereceiver');
var ConnCtx = ctx.getLib('lib/conn/connection-context'); var ConnCtx = ctx.getLib('lib/conn/connection-context');
var JobRegistry = ctx.getLib('lib/mems/job-registry'); var JobRegistry = ctx.getLib('lib/mems/job-registry');
...@@ -25,32 +26,71 @@ var JW = function JobWorker (prm) ...@@ -25,32 +26,71 @@ var JW = function JobWorker (prm)
JW.prototype.start = function () JW.prototype.start = function ()
{ {
this.amqp_job_start(); this.amqp_pmr_start();
this.amqp_snd_start();
} }
JW.prototype.amqp_job_start = function () JW.prototype.amqp_pmr_start = function ()
{ {
var self=this; var self=this;
if(self.amqp_server){return;} if(self.amqp_server_pmr){return;}
self.amqp_server = new QueueReceiver({ if(!self.QCaller){
self.QCaller = new QueueCaller({'url':self.conn.getAmqpUrl(),'name':'bs_jobs_queue'});
}
self.amqp_server_pmr = new QueueReceiver({
url : self.conn.getAmqpUrl(), url : self.conn.getAmqpUrl(),
name : 'bs_jobs_queue' name : 'bs_jobs_cmd'
}); });
self.amqp_server.set_execute_function(function(data,callback){ self.amqp_server_pmr.set_execute_function(function(data,callback){
var jt = new JobTransaction({'handle':self,'cmd':data}); if(data.option && data.option.exe_level && data.option.exe_level=='secondary')
jt.run(function(err){ {
if(err){ console.log('WORKER:Forword job[' + data.jobId + '] to SJW');
console.log(err); self.QCaller.send(data);
} }else{
callback(); self._execute_job(data,function (err) {
});
});
}
callback();
});
self.amqp_server_pmr.start(function(err){
console.log('WORKER:Primary Start\t\t[OK]');
})
}
JW.prototype.amqp_snd_start = function ()
{
var self=this;
if(self.amqp_server_snd){return;}
self.amqp_server_snd = new QueueReceiver({
url : self.conn.getAmqpUrl(),
name : 'bs_jobs_queue'
});
self.amqp_server_snd.set_execute_function(function(data,callback){
self._execute_job(data,callback);
}); });
self.amqp_server.start(function(err){ self.amqp_server_snd.start(function(err){
console.log('worker start'); console.log('WORKER:Secondary Start\t\t[OK]');
}) })
} }
JW.prototype._execute_job = function (data,callback)
{
var self=this;
var jt = new JobTransaction({'handle':self,'cmd':data});
jt.run(function(err){
if(err){
console.log(err);
}
callback();
});
}
var Redis = require('ioredis'); var Redis = require('redis');
const PREFIX = 'bs:http:acl'; const PREFIX = 'bs:http:acl';
module.exports.create = function(cfg) module.exports.create = function(cfg)
...@@ -22,7 +22,7 @@ function HttpACL(cfg) ...@@ -22,7 +22,7 @@ function HttpACL(cfg)
this.config = cfg; this.config = cfg;
if(cfg.conn){ if(cfg.conn){
this.mem = new Redis(cfg.conn); this.mem = Redis.createClient(cfg.conn);
}else if(cfg.redis){ }else if(cfg.redis){
this.mem = cfg.redis; this.mem = cfg.redis;
}else{ }else{
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment