Commit 9a1d011b authored by Kamron Aroonrua's avatar Kamron Aroonrua 💬

new plugins

parent bd55f70d
{
"api_hostname" : "http://localhost:19080",
"repository" : "/tmp/data"
"repository" : "/var/bigstream/data"
}
......@@ -89,7 +89,6 @@ JobTask.prototype.run = function ()
}
var task_dt = function (request,callback) {
var dt_request = {'input_type':request.type,'meta':request.meta,'data':request.data}
var dm_t = domain.create();
dm_t.on('error', function(err) {
......@@ -99,14 +98,40 @@ JobTask.prototype.run = function ()
});
dm_t.run(function() {
perform_dt({'context':context,'request':dt_request,'handle':self},function(err,dt_resp){
if(dt_resp){console.log('[DT STATUS]\t\t: ' + dt_resp.status);}
var dt_cfg = context.jobconfig.data_transform;
if(!Array.isArray(dt_cfg)){
dt_cfg = [dt_cfg];
}
idx=0;
async.reduce(dt_cfg, request, function(req, cur_cfg, cb) {
var dt_name = (cur_cfg.tag)?cur_cfg.tag:String(idx);
var dt_request = {'input_type':req.type,'meta':req.meta,'data':req.data};
context.jobconfig.data_transform = cur_cfg;
perform_dt({'cfg':cur_cfg,'name':dt_name,'context':context,'request':dt_request,'handle':self},function(err,dt_resp){
if(dt_resp){console.log('[DT:' + dt_name + ' STATUS]\t\t: ' + dt_resp.status);}
idx++;
if(dt_resp.status == 'success'){
callback(null,dt_resp);
cb(null,dt_resp);
}else {
callback(dt_resp);
cb(dt_resp);
}
});
}, function(err, result) {
if(!err && result.status == 'success'){
callback(null,result);
}else {
callback(err);
}
});
//end dm_t run
});
}
......@@ -182,11 +207,14 @@ function perform_dt(prm,cb)
var dt_context = prm.context
var job_id = dt_context.jobconfig.job_id;
var dt_cfg = dt_context.jobconfig.data_transform;
var dt_cfg = prm.cfg;
var dt_name=prm.name;
//var dt_cfg = dt_context.jobconfig.data_transform;
var DTTask = getPlugins('dt',dt_cfg.type);
var dtMem = new memstore({'job_id':job_id,'cat':'dt','mem':prm.handle.mem})
var dtMem = new memstore({'job_id':job_id,'cat':'dt:'+ dt_name,'mem':prm.handle.mem})
dt_context.task = {
"config" : dt_cfg,
"memstore" : dtMem
}
......@@ -196,6 +224,7 @@ function perform_dt(prm,cb)
dt.on('done',function(resp){
cb(null,resp);
});
}
function perform_do(prm,cb)
......
......@@ -26,6 +26,7 @@
"node-persist": "^2.0.7",
"node-schedule": "^1.2.0",
"node-uuid": "^1.4.7",
"object-hash": "^1.1.8",
"pm2": "^2.4.0",
"query-string": "^4.2.3",
"quickq": "^0.8.1",
......
......@@ -15,9 +15,17 @@ function execute_function(context,response){
output_type = 'binary'
}
request({'url':url, 'encoding':null}, function (error, resp, body) {
request({'url':url, 'encoding':encode}, function (error, resp, body) {
if (!error && resp.statusCode == 200) {
response.success(body);
if(param.encoding=='json'){
try{
var j = JSON.parse(body);
response.success(j);
}catch(err){
response.error(new Error('JSON Parsing Error'));
}
}
}else{
response.error(error);
}
......
var util = require('util');
var DOPlugin = require('../do-plugin');
function DOTask(context,request){
DOPlugin.call(this,context,request);
this.name = "dweet";
}
util.inherits(DOTask,DOPlugin);
DOTask.prototype.perform = require('./perform');
module.exports = DOTask;
var request = require("request");
function perform_function(context,request,response){
var job_id = context.jobconfig.job_id;
var transaction_id = context.transaction.id;
var param = context.jobconfig.data_out.param;
var memstore = context.task.memstore
var output_type = request.input_type;
var data = request.data;
var meta = request.meta;
var thing = param.thing;
post_to_dweet({'thing':thing,'data':data},function(err){
if(!err){
response.success();
}else{
response.error(err);
}
})
//response.success();
//response.reject();
//response.error("error message")
}
function post_to_dweet(prm,cb)
{
var thing = prm.thing;
var data = prm.data;
var key = prm.key;
var options = {
'method': 'POST',
'url': 'https://dweet.io/dweet/for/' + thing,
'headers':
{ 'cache-control': 'no-cache',
'content-type': 'application/json' },
'body': data,
'json':true
};
request(options, function (err, resp, body) {
if (!err && resp.statusCode==200) {
var r = body;
if(r.this=='succeeded'){
cb();
}else{
cb(new Error("dweet send error"));
}
}else{
cb(new Error("dweet error"));
}
});
}
module.exports = perform_function;
var util = require('util');
var DOPlugin = require('../do-plugin');
function DOTask(context,request){
DOPlugin.call(this,context,request);
this.name = "line-notify";
}
util.inherits(DOTask,DOPlugin);
DOTask.prototype.perform = require('./perform');
module.exports = DOTask;
var vm = require('vm');
var request = require("request");
function perform_function(context,request,response){
var job_id = context.jobconfig.job_id;
var transaction_id = context.transaction.id;
var param = context.jobconfig.data_out.param;
var memstore = context.task.memstore
var output_type = request.input_type;
var data = request.data;
var meta = request.meta;
var token = param.token;
var message = data;
if(param.message){
var env = {
'type' : output_type,
'data' : data,
'meta' : meta,
'msg' : data
}
var script = new vm.Script("msg=`" + param.message + "`");
var context = new vm.createContext(env);
script.runInContext(context);
message = env.msg;
}
post_to_line(token,message,function(err){
if(!err){
response.success();
}else{
response.error(err);
}
})
//response.success();
//response.reject();
//response.error("error message")
}
function post_to_line(token,msg,cb)
{
var options = { method: 'POST',
url: 'https://notify-api.line.me/api/notify',
headers:
{ 'cache-control': 'no-cache',
'authorization' : 'Bearer ' + token,
'content-type': 'multipart/form-data' },
formData: { message: String(msg) } };
request(options, function (err, resp, body) {
if (!err && resp.statusCode==200) {
var r = JSON.parse(body);
if(r.status==200){
cb();
}else{
cb(new Error("line send error"));
}
}else{
cb(new Error("line error"));
}
});
}
module.exports = perform_function;
var util = require('util');
var DTPlugin = require('../dt-plugin');
function DTTask(context,request){
DTPlugin.call(this,context,request);
this.name = "filter-duplicate";
this.output_type = "";
}
util.inherits(DTTask,DTPlugin);
DTTask.prototype.perform = require('./perform');
module.exports = DTTask;
var vm = require('vm');
var hash = require('object-hash');
function perform_function(context,request,response){
var job_id = context.jobconfig.job_id;
var transaction_id = context.transaction.id;
var param = context.task.config.param;
var memstore = context.task.memstore
var output_type = request.input_type;
var data = request.data;
var meta = request.meta;
var datakey = data;
if(param.key){
var env = {
'type' : output_type,
'data' : data,
'meta' : meta,
'key' : data
}
var script = new vm.Script("key=`" + param.key + "`");
var context = new vm.createContext(env);
script.runInContext(context);
datakey = env.key;
data = env.data;
meta = env.meta;
output_type = env.type;
}
var hash_key = hash(datakey);
memstore.getItem('dupkey',function(err,value){
if(value && value==hash_key)
{
response.reject();
}else{
memstore.setItem('dupkey',hash_key,function(err){})
response.success(data,{'meta':meta,'output_type':output_type});
}
});
// memstore.setItem('lasttransaction',transaction_id,function(err){
// response.success(data);
// });
// memstore.getItem('lasttransaction',function(err,value){
// response.success(value);
// });
//response.reject();
//response.error("error message")
}
module.exports = perform_function;
......@@ -3,7 +3,7 @@ var vm = require('vm');
function perform_function(context,request,response){
var job_id = context.jobconfig.job_id;
var transaction_id = context.transaction.id;
var param = context.jobconfig.data_transform.param;
var param = context.task.config.param;
var memstore = context.task.memstore
var in_type = request.input_type;
......
function perform_function(context,request,response){
var job_id = context.jobconfig.job_id;
var transaction_id = context.transaction.id;
var param = context.jobconfig.data_transform.param;
var param = context.task.config.param;
var memstore = context.task.memstore
var output_type = request.input_type;
......@@ -16,7 +16,6 @@ function perform_function(context,request,response){
// memstore.getItem('lasttransaction',function(err,value){
// response.success(value);
// });
//data = data.a.b + "--DT--";
response.success(data,{'meta':meta,'output_type':output_type});
//response.reject();
......
......@@ -202,12 +202,8 @@ var job_config = {
// async.reduce([1,2,3], 0, function(memo, item, callback) {
// // pointless async:
// process.nextTick(function() {
// console.log(item);
// if(item==8){
// callback('err')
// }else{
// callback(null,item)
// }
// console.log(String(memo) + ' ' + String(item));
// callback(null, memo + item)
// });
// }, function(err, result) {
//
......@@ -241,4 +237,8 @@ var job_config = {
// console.log(crons.list);
// });
var dat = 'hello';
var hash = require('object-hash');
var dat = {'a':'hello','b':10};
var dat2 = new Buffer(10);
console.log(dat2);
console.log(hash(dat2));
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment