Commit 4c843232 authored by Kamron Aroonrua's avatar Kamron Aroonrua 💬

new plugin

parent 21cce236
......@@ -22,9 +22,14 @@ memstore.prototype.getItem = function(k,cb)
var key = this.prefix + ":" + k;
this.mem.get(key,function(err,v){
var value = null;
if(!err){
if(!err && v){
if(typeof v == 'object' && v.type == 'Buffer')
{
value = new Buffer(v.data);
}else{
value = JSON.parse(v);
}
}
cb(err,value);
});
}
......
module.exports.parse_script_param = function(param)
{
if(!param)
{
return "";
}else{
var scrp = (Array.isArray(param))?param.join(';'):param;
return scrp;
}
}
......@@ -11,6 +11,8 @@ function perform_function(context,request,response){
var data = request.data;
var meta = request.meta;
var prm_name = (param.name)?'dupkey-'+param.name:'dupkey';
var datakey = data;
if(param.key){
......@@ -32,12 +34,12 @@ function perform_function(context,request,response){
}
var hash_key = hash(datakey);
memstore.getItem('dupkey',function(err,value){
memstore.getItem(prm_name,function(err,value){
if(value && value==hash_key)
{
response.reject();
}else{
memstore.setItem('dupkey',hash_key,function(err){})
memstore.setItem(prm_name,hash_key,function(err){})
response.success(data,{'meta':meta,'output_type':output_type});
}
});
......
var util = require('util');
var DTPlugin = require('../dt-plugin');
function DTTask(context,request){
DTPlugin.call(this,context,request);
this.name = "map";
this.output_type = "";
}
util.inherits(DTTask,DTPlugin);
DTTask.prototype.perform = require('./perform');
module.exports = DTTask;
var vm = require('vm');
var ctx = require('../context');
var Utils = ctx.getLib('lib/util/plugin-utils');
function perform_function(context,request,response){
var job_id = context.jobconfig.job_id;
var transaction_id = context.transaction.id;
var param = context.task.config.param;
var memstore = context.task.memstore
var in_type = request.input_type;
var in_data = request.data;
var in_meta = request.meta;
var mapscr = Utils.parse_script_param(param.script);
var map_data = (Array.isArray(in_data))?in_data:[in_data];
var out_data = [];
var script = new vm.Script(mapscr);
map_data.forEach(function(dat){
var env = {
'src' : {
'data' : dat,
'meta' : in_meta,
'type' : in_type
},
'data' : dat
}
var context = new vm.createContext(env);
script.runInContext(context);
out_data.push(env.data)
});
response.success(out_data,{'meta':in_meta,'output_type':in_type});
}
module.exports = perform_function;
var util = require('util');
var DTPlugin = require('../dt-plugin');
function DTTask(context,request){
DTPlugin.call(this,context,request);
this.name = "reduce";
this.output_type = "";
}
util.inherits(DTTask,DTPlugin);
DTTask.prototype.perform = require('./perform');
module.exports = DTTask;
var vm = require('vm');
var ctx = require('../context');
var Utils = ctx.getLib('lib/util/plugin-utils');
function perform_function(context,request,response){
var job_id = context.jobconfig.job_id;
var transaction_id = context.transaction.id;
var param = context.task.config.param;
var memstore = context.task.memstore
var in_type = request.input_type;
var in_data = request.data;
var in_meta = request.meta;
var init_scr = Utils.parse_script_param(param.init);
var red_scr = Utils.parse_script_param(param.script);
var red_data = (Array.isArray(in_data))?in_data:[in_data];
var out_data = {};
var initenv = {
'src' : {
'type' : in_type,
'data' : in_data,
'meta' : in_meta
},
'data' : {}
}
var init_script = new vm.Script(init_scr);
init_script.runInContext(new vm.createContext(initenv));
var script = new vm.Script(red_scr);
out_data = red_data.reduce(function(memo,item,index){
var env = {
'src' : {
'data' : item,
'meta' : in_meta,
'type' : in_type
},
'index':index,
'item' : item,
'data' : memo
}
var context = new vm.createContext(env);
script.runInContext(context);
return env.data;
},initenv.data);
response.success(out_data,{'meta':in_meta,'output_type':in_type});
}
module.exports = perform_function;
var util = require('util');
var DTPlugin = require('../dt-plugin');
function DTTask(context,request){
DTPlugin.call(this,context,request);
this.name = "transform";
this.output_type = "";
}
util.inherits(DTTask,DTPlugin);
DTTask.prototype.perform = require('./perform');
module.exports = DTTask;
var vm = require('vm');
var ctx = require('../context');
var Utils = ctx.getLib('lib/util/plugin-utils');
function perform_function(context,request,response){
var job_id = context.jobconfig.job_id;
var transaction_id = context.transaction.id;
var param = context.task.config.param;
var memstore = context.task.memstore
var in_type = request.input_type;
var in_data = request.data;
var in_meta = request.meta;
var mapscr = Utils.parse_script_param(param.script);
var datascr = param.text ;
if(param.text){
mapscr = mapscr + "; data=`" + param.text + "`";
}
var mapenv = {
'src' : {
'type' : in_type,
'data' : in_data,
'meta' : in_meta
},
'type' : in_type,
'data' : in_data,
'meta' : in_meta
}
var script = new vm.Script(mapscr);
var context = new vm.createContext(mapenv);
script.runInContext(context);
var data = mapenv.data;
var meta = mapenv.meta;
var output_type = mapenv.type;
response.success(data,{'meta':meta,'output_type':output_type});
}
module.exports = perform_function;
var util = require('util');
var DTPlugin = require('../dt-plugin');
function DTTask(context,request){
DTPlugin.call(this,context,request);
this.name = "window-event";
this.output_type = "";
}
util.inherits(DTTask,DTPlugin);
DTTask.prototype.perform = require('./perform');
module.exports = DTTask;
//di-window-event
function perform_function(context,request,response){
var job_id = context.jobconfig.job_id;
var transaction_id = context.transaction.id;
var param = context.task.config.param;
var memstore = context.task.memstore
var in_type = request.input_type;
var in_data = request.data;
var in_meta = request.meta;
//parameter
var prm_size = (param.size && Number(param.size)>0)?Number(param.size):1;
var prm_reject = (param.reject==false)?false:true;
var prm_name = (param.name)?'windw-'+param.name:'windw';
var obj = {
'type' : in_type,
'meta' : in_meta,
'data' : in_data
}
var ret = [];
memstore.getItem(prm_name,function(err,val){
if(val && Array.isArray(val))
{
val.forEach(function(v){
if(typeof v.data == 'object' && v.data.type == 'Buffer' && Array.isArray(v.data.data))
{
v.data = new Buffer(v.data.data);
}
ret.push(v);
});
}
ret.push(obj);
if(prm_reject && ret.length < prm_size)
{
memstore.setItem(prm_name,ret);
response.reject();
}else{
ret = ret.slice(prm_size * -1);
var meta = {'_size':ret.length};
memstore.setItem(prm_name,ret);
response.success(ret,{'meta':meta,'in_meta':in_type});
}
});
// memstore.setItem('lasttransaction',transaction_id,function(err){
// response.success(data);
// });
// memstore.getItem('lasttransaction',function(err,value){
// response.success(value);
// });
//response.reject();
//response.error("error message")
}
module.exports = perform_function;
<h2>Test Download</h2>
<a href="http://bigmaster.igridproject.info:19080/v0.1/storage2/igrid.env/objects?sizelimit=200&output=stream">bigmaster</a>
<a href="http://lab1.igridproject.info:19080/v0.1/storage/igrid.env/objects?sizelimit=200&output=stream">lab1</a>
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment