Commit 23641578 authored by Kamron Aroonrua's avatar Kamron Aroonrua 💬

Merge branch 'covid' into 'dev'

Covid

See merge request !23
parents 4db7e525 fd830903
......@@ -4,7 +4,7 @@
- BS :: AMQP RPC Singleton Connection ,auto ack
- STORAGE :: 50x speedup
### Added
- BS :: env configuration context
- PLUGIN :: dt-transform fn extension
- PLUGIN :: dt-transform register
- BS :: Configuration Context with ENV
- PLUGIN :: do-http
......
{
"name": "node-bigstream",
"description": "",
"version": "0.0.1",
"version": "1.2.2",
"main": "./bigstream.js",
"author": {
"name": "Kamron Aroonrua",
......
var util = require('util');
var DTPlugin = require('../dt-plugin');
function DTTask(context,request){
DTPlugin.call(this,context,request);
this.name = "http";
this.output_type = "";
}
util.inherits(DTTask,DTPlugin);
DTTask.prototype.perform = require('./perform');
module.exports = DTTask;
\ No newline at end of file
var ctx = require('../../../context');
var Utils = ctx.getLib('lib/util/plugin-utils');
var request = require("request").defaults({ encoding: null });
function perform_function(context,request,response){
var job_id = context.jobconfig.job_id;
var transaction_id = context.transaction.id;
var param = context.task.config.param || {};
var memstore = context.task.memstore
var output_type = request.input_type;
var data = request.data;
var meta = request.meta || {};
var req_url = param.url || "";
var req_method = param.method || "GET";
var req_headers = param.headers || {};
var req_body_type = param.body_type || "json";
var resp_encode = param.encoding || "text";
var env = {
'type' : output_type,
'data' : data,
'meta' : meta
}
req_url = Utils.vm_execute_text(env,req_url);
//parsing param from meta
if(typeof meta._param == 'object')
{
var _prm = meta._param;
req_url = (_prm.url)?_prm.url:req_url;
req_method = (_prm.method)?_prm.method:req_method;
req_headers = (_prm.headers)?_prm.headers:req_headers;
req_body_type = (_prm.body_type)?_prm.body_type:req_body_type;
resp_encode = (_prm.encoding)?_prm.encoding:resp_encode;
}
send_request({'url':req_url,
'method':req_method,
'headers':req_headers,
'body_type':req_body_type,
'body':data,
'resp_encode':resp_encode},function(err,resp,body){
var respmeta = meta;
Object.keys(respmeta).forEach((k)=>{
if(k.startsWith('_')){delete respmeta[k];}
});
respmeta['_status_code'] = (err)?0:resp.statusCode;
respmeta['_error'] = (err)?true:false;
response.meta = respmeta;
if(!err){
if(resp_encode=='json'){
try{
var j = JSON.parse(body);
response.success(j,output_type);
}catch(err){
response.success({},output_type);
}
}else{
response.success(body,output_type);
}
}else{
response.success(null,output_type);
}
});
//response.success();
//response.reject();
//response.error("error message")
}
function send_request(prm,cb)
{
var options = { method: 'GET',
url: prm.url,
headers:
{ 'cache-control': 'no-cache' }
};
if(prm.method.toLowerCase()=='post' || prm.method.toLowerCase()=='put')
{
options.method = prm.method.toUpperCase();
if(prm.body_type=='json' && typeof prm.body == 'object'){
options.headers['content-type'] = 'application/json';
options.json = prm.body;
}else if(prm.body_type=='text' || typeof prm.body == 'string'){
options.headers['content-type'] = 'text/plain';
options.body = prm.body;
}else{
options.body = prm.body;
}
}
if(typeof prm.headers == 'object')
{
options.headers = Object.assign(options.headers,prm.headers)
}
options.encoding = (prm.resp_encode == 'binary')?null:'utf8';
request(options, function (err, resp, body) {
if (!err) {
cb(err, resp, body);
}else{
cb(new Error("request error"));
}
});
}
module.exports = perform_function;
var crypto = require("crypto");
var publicEncrypt = function(data, publicKey) {
var buffer = Buffer.from(data);
var encrypted = crypto.publicEncrypt(publicKey, buffer);
return encrypted.toString("base64");
};
var privateDecrypt = function(crypted_data, privateKey) {
var buffer = Buffer.from(crypted_data, "base64");
var decrypted = crypto.privateDecrypt(privateKey, buffer);
return decrypted.toString("utf8");
};
module.exports = {
publicEncrypt: publicEncrypt,
privateDecrypt: privateDecrypt
}
\ No newline at end of file
var DomParser = require('dom-parser');
var parser = new DomParser();
module.exports = function (html) {
return parser.parseFromString(html);
}
\ No newline at end of file
var crypto = require('crypto');
module.exports.sha256 = function (text) {
var dat = (typeof text == 'string')?text:String(text);
return crypto.createHash('sha256').update(dat).digest('hex');
}
\ No newline at end of file
{
"name": "dt-transform",
"version": "1.0.0",
"description": "",
"main": "index.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
},
"author": "",
"license": "ISC",
"dependencies": {
"dom-parser": "^0.1.6"
}
}
......@@ -11,7 +11,7 @@ function perform_function(context,request,response){
var in_type = request.input_type;
var in_data = request.data;
var in_meta = request.meta;
var in_meta = request.meta || {};
var mapenv = {
'src' : {
......@@ -20,11 +20,21 @@ function perform_function(context,request,response){
'meta' : in_meta
},
'_env':{},
'_fn':{},
'type' : in_type,
'data' : in_data,
'meta' : in_meta
}
if(param.use_function){
var fns = (Array.isArray(param.use_function))?param.use_function:Array.of(param.use_function);
fns.forEach((fname)=>{
if(typeof fname == 'string' && fname.length>0){
mapenv._fn[fname] = _loadfunc(fname);
}
});
}
if(param.use_register){
memstore.getItem('register',function(err,value){
if(err){return response.error("memstore error");}
......@@ -84,4 +94,15 @@ function _compile(mape,param)
return mapenv;
}
function _loadfunc(name)
{
var f = null;
try {
f = require('./fn/' + name);
} catch (error) {
}
return f;
}
module.exports = perform_function;
......@@ -4,6 +4,7 @@ var ctx = require('../../context');
var BinStream = ctx.getLib('lib/bss/binarystream_v1_1');
var ObjId = ctx.getLib('lib/bss/objid');
var bsdata = ctx.getLib('lib/model/bsdata');
var thunky = require('thunky');
var importer = require('./importer');
var dataevent = require('./dataevent');
......@@ -21,40 +22,29 @@ module.exports.create = function(prm)
function BSSEngine(prm)
{
var self = this;
if(typeof prm == 'string'){
prm = {'file':prm,'context':null};
}
// this.repos_dir = prm.repos_dir;
// this.name = prm.name;
this.file = prm.file;
this.name = prm.name;
this.context = (prm.context)?prm.context:null;
this.concurrent = 0;
this.serial=prm.serial||'';
this.outdate=false;
}
BSSEngine.prototype.filepath = function()
{
//return this.repos_dir + '/' + name2path(this.name) + '.bss';
return this.file;
}
this.bss=null;
this.open = thunky(openbss);
this.open();
BSSEngine.prototype.exists = function()
{
var fp = this.filepath();
return fs.existsSync(fp);
}
BSSEngine.prototype.open = function(cb)
{
var self = this;
function openbss (cb) {
if(self.exists())
if(fs.existsSync(self.file))
{
open()
}else{
BinStream.format(self.filepath(),function(err){
BinStream.format(self.file,function(err){
if(!err){
open()
}else{
......@@ -64,30 +54,45 @@ BSSEngine.prototype.open = function(cb)
}
function open(){
BinStream.open(self.filepath(),function(err,bss){
BinStream.open(self.file,function(err,bss){
if(!err){
self.bss = bss;
}
cb(err);
cb(err,bss);
});
}
}
}
BSSEngine.prototype.filepath = function()
{
return this.file;
}
BSSEngine.prototype.exists = function()
{
var fp = this.filepath();
return fs.existsSync(fp);
}
BSSEngine.prototype.close = function(cb)
{
this.bss.close(cb);
var self = this;
self.open((err,bss)=>{
bss.close(cb);
});
}
BSSEngine.prototype.cmd = function(cmd,cb)
{
var self = this
var command = cmd.command;
var param = cmd.param;
var self=this;
switch (command) {
case 'write':
self.cmd_write(param,cb);
......@@ -105,7 +110,8 @@ BSSEngine.prototype.cmd_write = function(prm,cb)
if(!data){return cb("null data")}
this.bss.write(data,{'meta':meta},function(err,obj){
self.open((err,bss)=>{
bss.write(data,{'meta':meta},function(err,obj){
if(!err){
var head = obj.getHeader();
var obj_id = new ObjId(head.ID);
......@@ -124,6 +130,9 @@ BSSEngine.prototype.cmd_write = function(prm,cb)
cb("write error");
}
});
});
}
function newdata_event(ctx,prm)
......
......@@ -32,18 +32,14 @@ BSSPool.prototype.get = function(name,opt,cb)
'name' : bssname,
'newInstance':opt.newInstance
});
bss_engine.open(function(err){
if(!err){
self.pool.push({
'name' : name,
'engine':bss_engine
});
}
self.clean(function(err){
cb(err,bss_engine);
});
});
}
}
......
......@@ -36,7 +36,24 @@ router.get('/:id/data',function (req, res) {
var opt = {
'field' : 'data'
}
opt.filetype = (query.filetype)?query.filetype:null
opt.filetype = (query.file_type || query.filetype)?query.file_type || query.filetype:null;
get_object(reqHelper,respHelper,{'oid':oid,'opt':opt});
});
router.get('/:id/file',function (req, res) {
var reqHelper = request.create(req);
var respHelper = response.create(res);
var query = reqHelper.getQuery();
var oid = req.params.id;
var opt = {
'field' : 'file'
}
opt.filetype = (query.file_type || query.filetype)?query.file_type || query.filetype:null;
opt.filename = (query.file_name || query.filename)?query.file_name || query.filename:null;
opt.download = (query.download)?true:null;
get_object(reqHelper,respHelper,{'oid':oid,'opt':opt});
});
......@@ -177,6 +194,9 @@ function output(resp,obj,opt)
if(opt.field=='data')
{
data_out(resp,obj,opt);
}else if(opt.field=='file')
{
file_out(resp,obj,opt);
}else{
obj_out(resp,obj,opt);
}
......@@ -218,5 +238,39 @@ function data_out(resp,obj,opt)
}
function file_out(resp,obj,opt)
{
var objType = obj.header.TY;
var objId = (new ObjId(obj.header.ID)).toString();
var meta = obj.meta || {};
var defName=null;
var defType=null;
if(objType == BinStream.BINARY_TYPE){
defType = "application/octet-stream";
defName = (opt.filetype)?objId + "." + opt.filetype:objId + ".out";
}else if(objType == BinStream.STRING_TYPE){
defType = "text";
defName = (opt.filetype)?objId + "." + opt.filetype:objId + ".out";
}else{
defType = "json";
defName = (opt.filetype)?objId + "." + opt.filetype:objId + ".json";
}
var file_name = opt.filename || meta.file_name || defName;
var file_type = opt.filetype || meta.file_type || defType;
resp.response.type(file_type);
if(opt.download){
resp.response.set('Content-Disposition', 'attachment; filename="' + file_name + '"');
}else{
resp.response.set('Content-Disposition', 'filename="' + file_name + '"');
}
resp.response.send(obj.data);
}
module.exports = router;
var thunky = require('thunky')
var _self = null;
function TestThunk()
{
_self = this;
this.rnumber = 0;
}
TestThunk.prototype.init = function()
{
console.log('waiting 1s and returning random number');
}
TestThunk.prototype.open = thunky(function (callback) { // the inner function should only accept a callback
_self.init();
setTimeout(function () {
var ran = Math.random();
_self.rnumber = ran;
callback(null,ran)
}, 1000)
})
TestThunk.prototype.test = function(x){
_self.open((err,num)=>{
console.log(_self.rnumber + ' ' + x)
});
}
var tt = new TestThunk();
tt.test(1);
tt.test(2);
tt.test(3);
tt.test(4);
tt.test(5);
\ No newline at end of file
{
"version":"1.2.3",
"build":"202004111200"
"build":"202005181700"
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment