Commit 6d57dd7e authored by project's avatar project

--no commit message

--no commit message
parent 492e6d41
......@@ -8,7 +8,7 @@ var cfg = ctx.config;
var HttpACL = ctx.getLib('lib/mems/http-acl');
var EvenPub = ctx.getLib('lib/amqp/event-pub');
const JOBCHANEL = 'job_trigger';
const JOBCHANEL = 'bs_job_cmd';
module.exports.create = function(cfg)
{
......
var ctx = require('../../context');
var async = require('async');
var express = require('express');
var router = express.Router();
......@@ -7,17 +8,55 @@ var cfg = ctx.config;
var response = ctx.getLib('lib/ws/response');
var request = ctx.getLib('lib/ws/request');
var process_get = function(req, res) {
var reqHelper = request.create(req);
var respHelper = response.create(res);
var appkey = req.params.akey;
var ctx = req.context;
router.get('/:akey',function (req, res) {
var reqHelper = request.create(req);
var respHelper = response.create(res);
var appkey = req.params.akey;
var ctx = req.context;
var httpacl = req.context.httpacl;
var evp = req.context.evp;
var j = httpacl.findJob(appkey,'get');
var topic_prex = 'cmd.execute.';
var httpdata = {
'object_type' : 'httpdata',
'method' : 'get',
'data' : reqHelper.getQuery()
}
var job_execute_msg = {
'object_type':'job_execute',
'source' : 'http_listener',
'jobId' : '',
'option' : {},
'input_data' : {
'type' : 'bsdata',
'value' : {
'data_type' : 'object',
'data' : httpdata
}
}
}
j.forEach(function(item){
var topic = topic_prex + item.jobid;
var msg = job_execute_msg;
msg.jobId = item.jobid;
evp.send(topic,JSON.stringify(msg));
});
if(j.length > 0)
{
respHelper.responseOK({'status':'OK'});
}else{
respHelper.response403();
}
}
router.get('/:akey',process_get);
var httpacl = req.context.httpacl;
var evp = req.context.evp;
var j = httpacl.findJob(appkey,'get');
respHelper.responseOK({'status':'OK','appkey':appkey,'res':j});
});
module.exports = router;
var amqp = require('amqplib/callback_api');
var thunky = require('thunky');
function EventSub(config)
{
this.config = config;
this.url = config.url;
this.name = config.name || "event_group";
this.conn = null;
this.ch = null;
var self = this;
this.opened = false;
this.open = thunky(open);
this.open();
function open (cb) {
//console.log('connecting');
amqp.connect(self.url, function(err, conn) {
if (err){return cb(err)}
conn.createChannel(function(err, ch) {
if (err){return cb(err)}
ch.assertExchange(self.name, 'topic', {durable: false});
self.opened = true;
self.conn = conn;
self.ch = ch;
cb();
});
});
}
}
EventSub.prototype.sub = function(topic,cb)
{
var self=this;
this.open(function(err){
if(err){return cb(err);}
self.ch.assertQueue('', {exclusive: true}, function(err, q) {
//console.log(' [*] Waiting for logs. To exit press CTRL+C');
if(err){return cb(err);}
self.ch.bindQueue(q.queue, self.name, topic);
self.ch.consume(q.queue, function(msg) {
var data = JSON.parse(msg.content.toString())
var topp = msg.fields.routingKey
cb(null,{'topic':topp,'data':data});
}, {noAck: true});
});
});
}
EventSub.prototype.close = function(cb)
{
var self=this;
self.conn.close(cb);
}
module.exports = EventSub;
......@@ -35,8 +35,8 @@ var httpdata = {
'data' : {}
}
var job_trigger = {
'object_type':'job_trigger',
var job_execute = {
'object_type':'job_execute',
'source' : 'http_listener',
'jobId' : 'jobid',
'option' : {},
......
......@@ -55,6 +55,14 @@ responseHelper.prototype.response400 = function(msg){
}
}
responseHelper.prototype.response403 = function(msg){
if(msg){
this.response.status(403).json({response:'ERROR',message:msg});
}else{
this.response.status(403).send('forbidden');
}
}
responseHelper.prototype.response404 = function(msg){
if(msg){
this.response.status(404).json({response:'ERROR',message:msg});
......
var util = require('util');
var DIPlugin = require('../di-plugin');
function DITask(context){
DIPlugin.call(this,context);
this.name = "input";
this.output_type = "object";
}
util.inherits(DITask,DIPlugin);
DITask.prototype.perform = require('./perform');
module.exports = DITask;
function perform_function(context,response){
var job_id = context.jobconfig.job_id;
var transaction_id = context.transaction.id;
var param = context.jobconfig.data_in.param || {};
var memstore = context.task.memstore;
var input_data = context.input_data;
var output_type = 'object'
var data = input_data;
if(param.object == 'httpdata')
{
data = extract_httpdata(input_data);
}
// memstore.setItem('lasttransaction',transaction_id,function(err){
// response.success(data);
// });
// memstore.getItem('lasttransaction',function(err,value){
// response.success(value);
// });
response.success(data,output_type);
//response.reject();
//response.error("error message")
}
function extract_httpdata(dat)
{
if(dat.object_type == 'httpdata'){
return dat.data;
}else{
return dat;
}
}
module.exports = perform_function;
{
"job_id" : "job1",
"active" : true,
"trigger" : {
"type": "http",
"appkey": "app1",
"method": "get"
},
"data_in" : {
"type": "input",
"param":{
"object":"httpdata"
}
},
"data_transform" : {
"type": "noop"
},
"data_out" : {
"type": "console"
}
}
......@@ -2,6 +2,12 @@ var async = require('async');
var domain = require('domain');
var schedule = require('node-schedule');
var storage = require('node-persist');
var ctx = require('../context');
var bscfg = ctx.config;
var bsdata = ctx.getLib('lib/model/bsdata');
var EvenSub = ctx.getLib('lib/amqp/event-sub');
storage.initSync({
dir:'db'
});
......@@ -40,23 +46,57 @@ if(args.length > 0){
var jobcfg = require(CFG_FILE);
var no_input_data = {};
track('[SETTING UP JOB] : ' + CFG_FILE,TRACKING>0);
if(jobcfg.trigger && jobcfg.trigger.type == 'cron' && ONTRIGGER)
if(jobcfg.trigger && ONTRIGGER)
{
var triggercfg = jobcfg.trigger;
var triggertpy = jobcfg.trigger.type
if(triggertpy == 'cron'){
trigger_cron(jobcfg,no_input_data);
}else if(triggertpy == 'http'){
trigger_http(jobcfg);
}else{
run_job(jobcfg,no_input_data);
}
}else{
run_job(jobcfg,no_input_data);
}
function trigger_cron(jobcfg,input)
{
var cron = jobcfg.trigger.cmd;
track('[SCHEDULE MODE]',TRACKING>0);
track('[CRON]\t\t: ' + cron,TRACKING>0);
var j = schedule.scheduleJob(cron, function(){
run_job(jobcfg);
run_job(jobcfg,input);
});
}else{
run_job(jobcfg);
}
function trigger_http(jobcfg)
{
var jobId = jobcfg.job_id;
var appkey = jobcfg.trigger.appkey;
track('[HTTP MODE]',TRACKING>0);
track('[APPKEY]\t\t: ' + appkey,TRACKING>0);
var evs = new EvenSub({'url':bscfg.amqp.url,'name':'bs_job_cmd'});
evs.sub('cmd.execute.' + jobId ,function(err,msg){
var input_data = msg.data.input_data;
if(input_data.type == 'bsdata')
{
var inp = bsdata.parse(input_data.value);
run_job(jobcfg,inp.data);
}else{
run_job(jobcfg,no_input_data);
}
});
}
function run_job(cfg)
function run_job(cfg,input)
{
var jobconfig = cfg;
var tranId = "TR" + (new Date).getTime();
......@@ -74,6 +114,7 @@ function run_job(cfg)
var context = {
"jobconfig" : jobconfig,
"transaction" : transaction,
"input_data" : input,
"job" : ctxJob
}
......
......@@ -78,20 +78,20 @@ const crypto = require("crypto");
// atext = JSON.stringify(a);
// redis.set('a',atext)
var HttpACL = ctx.getLib('lib/mems/http-acl');
var httpacl = HttpACL.create({'conn':'redis://:@bigmaster.igridproject.info:6379/1'});
httpacl.add({'appkey':'app1','method':'get','jobid':'job1'})
httpacl.add({'appkey':'app2','method':'get','jobid':'job2'})
httpacl.add({'appkey':'app1','method':'get','jobid':'job3'})
httpacl.commit();
httpacl.update(function(err){
//console.log(httpacl.acl);
var j = httpacl.findJob('app1','get');
console.log(j);
});
// var HttpACL = ctx.getLib('lib/mems/http-acl');
//
// var httpacl = HttpACL.create({'conn':'redis://:@bigmaster.igridproject.info:6379/1'});
//
// httpacl.add({'appkey':'app1','method':'get','jobid':'job1'})
// httpacl.add({'appkey':'app2','method':'get','jobid':'job2'})
// httpacl.add({'appkey':'app1','method':'get','jobid':'job3'})
// httpacl.commit();
//
// httpacl.update(function(err){
// //console.log(httpacl.acl);
// var j = httpacl.findJob('app1','get');
// console.log(j);
// });
// var EvenPub = ctx.getLib('lib/amqp/event-pub');
//
......@@ -101,3 +101,11 @@ httpacl.update(function(err){
// evp.send('q.test.t1','kamron aroonrua aaa');
//
// setTimeout(function() { evp.close(function(err){console.log('close');}); }, 1500);
var EvenSub = ctx.getLib('lib/amqp/event-sub');
var evs = new EvenSub({'url':'amqp://bigmaster.igridproject.info','name':'bs_job_cmd'});
evs.sub('#',function(err,msg){
console.log(msg);
});
......@@ -8,18 +8,21 @@ var amqp = require('amqplib/callback_api');
amqp.connect('amqp://bigmaster.igridproject.info', function(err, conn) {
conn.createChannel(function(err, ch) {
var ex = 'topic_logs';
var ex = 'bs_job_cmd';
ch.assertExchange(ex, 'topic', {durable: false});
ch.assertQueue('', {exclusive: true}, function(err, q) {
console.log(' [*] Waiting for logs. To exit press CTRL+C');
ch.bindQueue(q.queue, ex, 'q.test.t1');
ch.bindQueue(q.queue, ex, 'cmd.#');
ch.consume(q.queue, function(msg) {
console.log(" [x] %s:'%s'", msg.fields.routingKey, msg.content.toString());
console.log(JSON.stringify(msg));
console.log(msg.fields.routingKey + '\n');
console.log(msg.content.toString() + '\n');
console.log('----------------------------------');
}, {noAck: true});
});
});
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment