Commit 0d26b470 authored by Kamron Aroonrua's avatar Kamron Aroonrua 💬

Merge branch 'covid' into 'ondev-gcs'

Covid

See merge request !24
parents 88573115 0db51b34
BIGSTREAM_TAG=dev
BS_SECRET=bigstream-server
REDIS_TAG=3.2.11
PREFIX_NO=19
VOLUME=../volume
\ No newline at end of file
#Changelog #Changelog
## [1.2.3] - 2019-11-19 ## [1.2.3-UR]
### Fixed
- BS :: AMQP RPC Singleton Connection ,auto ack
- STORAGE :: 50x speedup
### Added ### Added
- PLUGIN :: dt-transform register
- BS :: Configuration Context with ENV - BS :: Configuration Context with ENV
- BS :: keystore env
- BS :: httplistener headers
- PLUGIN :: dt-pgpcrypt
- PLUGIN :: dt-transform fn extension
- PLUGIN :: dt-transform register
- PLUGIN :: do-http - PLUGIN :: do-http
- PLUGIN :: do-bsspeak - PLUGIN :: do-bsspeak
- PLUGIN :: do-mqtt - PLUGIN :: do-mqtt
......
FROM node:lts-alpine
RUN apk add --no-cache make gcc g++ python linux-headers udev
COPY . /app/node-bigstream
WORKDIR /app/node-bigstream
RUN npm install
FROM node:lts-alpine
COPY --from=0 /app/node-bigstream /app/node-bigstream
RUN npm install -y pm2@latest -g
RUN mkdir -p /var/bigstream/data
EXPOSE 19980 19080 19180
# start server
WORKDIR /app/bigstream-edge
CMD ["pm2-runtime", "start", "pm2-default.json"]
\ No newline at end of file
...@@ -5,6 +5,7 @@ var cfg = { ...@@ -5,6 +5,7 @@ var cfg = {
'mqtt' : cfg_load('mqtt.json'), 'mqtt' : cfg_load('mqtt.json'),
'memstore' : cfg_load('memstore.json'), 'memstore' : cfg_load('memstore.json'),
'storage' : cfg_load('storage.json'), 'storage' : cfg_load('storage.json'),
'keystore': cfg_load('keystore.json'),
'auth' : { 'auth' : {
'secret': cfg_load('secret.json'), 'secret': cfg_load('secret.json'),
'acl' : cfg_load('acl.json') 'acl' : cfg_load('acl.json')
......
{
"dir":"./keys"
}
\ No newline at end of file
version: '3'
services:
bigstream:
image: "bigstream:dev"
build: ./
container_name: bs_bigstream
restart: always
networks:
- bsnet
ports:
- "19980:19980"
- "19080:19080"
- "19180:19180"
- "19150:19150/udp"
environment:
- "BS_SECRET=${BS_SECRET}"
volumes:
- ${VOLUME}/bigstream/data:/var/bigstream/data
redis:
image: "redis:${REDIS_TAG}"
command: redis-server --appendonly yes
container_name: bs_redis_server
restart: always
networks:
bsnet:
aliases:
- redis-server
ports:
- "6379:6379"
volumes:
- ${VOLUME}/redis/data:/data
rabbitmq:
image: "igridproject/rabbitmq"
command: rabbitmq-server --hostname rabbitmq-server
container_name: bs_rabbitmq_server
restart: always
networks:
bsnet:
aliases:
- rabbitmq-server
ports:
- "5672:5672"
networks:
bsnet:
driver: bridge
\ No newline at end of file
...@@ -6,5 +6,6 @@ ...@@ -6,5 +6,6 @@
{"env":"BSCONFIG_MEMSTORE_URL","conf":"memstore.url"}, {"env":"BSCONFIG_MEMSTORE_URL","conf":"memstore.url"},
{"env":"BSCONFIG_STORAGE_REPOSITORY","conf":"storage.repository"}, {"env":"BSCONFIG_STORAGE_REPOSITORY","conf":"storage.repository"},
{"env":"BSCONFIG_STORAGE_APIHOSTNAME","conf":"storage.api_hostname"}, {"env":"BSCONFIG_STORAGE_APIHOSTNAME","conf":"storage.api_hostname"},
{"env":"BSCONFIG_SECRET_TEXT","conf":"auth.secret.value"} {"env":"BSCONFIG_SECRET_TEXT","conf":"auth.secret.value"},
{"env":"BSCONFIG_KEYSTORE_DIR","conf":"keystore.dir"}
] ]
\ No newline at end of file
...@@ -48,13 +48,13 @@ HTTPListener.prototype._http_start = function() ...@@ -48,13 +48,13 @@ HTTPListener.prototype._http_start = function()
} }
}); });
app.use(bodyParser.json({limit: '128mb'})); app.use(bodyParser.json({limit: '128mb',type:"*/json"}));
app.use(bodyParser.urlencoded({ app.use(bodyParser.urlencoded({
extended: true, extended: true,
limit: '128mb' limit: '128mb'
})); }));
app.use(bodyParser.raw({limit: '128mb'}));
app.use(bodyParser.text({limit: '128mb',type:"text/*"})); app.use(bodyParser.text({limit: '128mb',type:"text/*"}));
app.use(bodyParser.raw({limit: '128mb',type:"*/*"}));
var context = require('./lib/http-context'); var context = require('./lib/http-context');
app.use(context.middleware({ app.use(context.middleware({
......
...@@ -28,6 +28,7 @@ var process_req = function(req, res ,method) { ...@@ -28,6 +28,7 @@ var process_req = function(req, res ,method) {
j.forEach(function(item){ j.forEach(function(item){
var httpdata = { var httpdata = {
'object_type' : 'httpdata', 'object_type' : 'httpdata',
'headers': req.headers,
'method' : method, 'method' : method,
'data' : {} 'data' : {}
} }
......
...@@ -108,7 +108,7 @@ JobTask.prototype.run = function () ...@@ -108,7 +108,7 @@ JobTask.prototype.run = function ()
perform_di({'context':context,'handle':self} ,function(err,resp){ perform_di({'context':context,'handle':self} ,function(err,resp){
if(resp){ if(resp){
console.log('[DI STATUS]\t\t: ' + resp.status); // console.log('[DI STATUS]\t\t: ' + resp.status);
self.stats.di = resp; self.stats.di = resp;
} }
if(resp.status == 'success'){ if(resp.status == 'success'){
...@@ -149,7 +149,7 @@ JobTask.prototype.run = function () ...@@ -149,7 +149,7 @@ JobTask.prototype.run = function ()
perform_dt({'cfg':cur_cfg,'name':dt_name,'context':context,'request':dt_request,'handle':self},function(err,dt_resp){ perform_dt({'cfg':cur_cfg,'name':dt_name,'context':context,'request':dt_request,'handle':self},function(err,dt_resp){
if(dt_resp){ if(dt_resp){
console.log('[DT:' + dt_name + ' STATUS]\t\t: ' + dt_resp.status); // console.log('[DT:' + dt_name + ' STATUS]\t\t: ' + dt_resp.status);
} }
idx++; idx++;
...@@ -188,7 +188,9 @@ JobTask.prototype.run = function () ...@@ -188,7 +188,9 @@ JobTask.prototype.run = function ()
dm_o.run(function() { dm_o.run(function() {
perform_do({'context':context,'request':do_request,'handle':self},function(err,do_resp){ perform_do({'context':context,'request':do_request,'handle':self},function(err,do_resp){
if(do_resp){console.log('[DO STATUS]\t\t: ' + do_resp.status);} if(do_resp){
//console.log('[DO STATUS]\t\t: ' + do_resp.status);
}
if(do_resp.status == 'success'){ if(do_resp.status == 'success'){
callback(null,do_resp); callback(null,do_resp);
}else { }else {
...@@ -205,18 +207,21 @@ JobTask.prototype.run = function () ...@@ -205,18 +207,21 @@ JobTask.prototype.run = function ()
//self.emit('error',new Error('job execution timeout')) //self.emit('error',new Error('job execution timeout'))
},self.job_timeout); },self.job_timeout);
console.log('***** JOB RUNNING *****'); // console.log('***** JOB RUNNING *****');
console.log('[JOB ID]\t\t: ' + job_id); // console.log('[JOB ID]\t\t: ' + job_id);
console.log('[TRANSACTION ID]\t: ' + transaction_id); // console.log('[TRANSACTION ID]\t: ' + transaction_id);
async.waterfall([task_di,task_dt,task_do],function (err,resp) { async.waterfall([task_di,task_dt,task_do],function (err,resp) {
clearTimeout(jtimeout); clearTimeout(jtimeout);
//console.log('[JOB DONE] id=' + job_id + ' ,tr=' + transaction_id + '\t' + resp.status);
if(!err){ if(!err){
self.stop(resp) self.stop(resp)
console.log('***** JOB SUCCESSFULLY DONE *****\n'); // console.log('***** JOB SUCCESSFULLY DONE *****\n');
console.log('[JOB DONE] id=' + job_id + ' ,tr=' + transaction_id + '\tsuccess');
}else{ }else{
self.stop(err) self.stop(err)
console.log('***** JOB UNSUCCESSFULLY DONE *****\n'); // console.log('***** JOB UNSUCCESSFULLY DONE *****\n');
console.log('[JOB DONE] id=' + job_id + ' ,tr=' + transaction_id + '\tunsuccess');
} }
}); });
......
...@@ -25,7 +25,7 @@ memstore.prototype.getItem = function(k,cb) ...@@ -25,7 +25,7 @@ memstore.prototype.getItem = function(k,cb)
if(!err && v){ if(!err && v){
if(typeof v == 'object' && v.type == 'Buffer') if(typeof v == 'object' && v.type == 'Buffer')
{ {
value = new Buffer(v.data); value = Buffer.from(v.data);
}else{ }else{
value = JSON.parse(v); value = JSON.parse(v);
} }
......
...@@ -6,6 +6,7 @@ var QueueReceiver = ctx.getLib('lib/amqp/queuereceiver'); ...@@ -6,6 +6,7 @@ var QueueReceiver = ctx.getLib('lib/amqp/queuereceiver');
var ConnCtx = ctx.getLib('lib/conn/connection-context'); var ConnCtx = ctx.getLib('lib/conn/connection-context');
var JobRegistry = ctx.getLib('lib/mems/job-registry'); var JobRegistry = ctx.getLib('lib/mems/job-registry');
var SSCaller = ctx.getLib('lib/axon/rpccaller'); var SSCaller = ctx.getLib('lib/axon/rpccaller');
var RPCCaller = ctx.getLib('lib/amqp/rpccaller');
var ACLValidator = ctx.getLib('lib/auth/acl-validator'); var ACLValidator = ctx.getLib('lib/auth/acl-validator');
var JobTransaction = require('./lib/jobtransaction') var JobTransaction = require('./lib/jobtransaction')
...@@ -34,7 +35,10 @@ var JW = function JobWorker (prm) ...@@ -34,7 +35,10 @@ var JW = function JobWorker (prm)
/* Disable RPC Feature */ /* Disable RPC Feature */
//this.storagecaller = new SSCaller({'url':SS_URL}); //this.storagecaller = new SSCaller({'url':SS_URL});
this.storagecaller = null; this.storagecaller = new RPCCaller({
url : this.conn.getAmqpUrl(),
name :'storage_request'
});
} }
JW.prototype.start = function () JW.prototype.start = function ()
......
...@@ -40,7 +40,7 @@ EventPub.prototype.send = function(topic,msg) ...@@ -40,7 +40,7 @@ EventPub.prototype.send = function(topic,msg)
{ {
var self=this; var self=this;
this.open(function(err){ this.open(function(err){
self.ch.publish(self.name, topic, new Buffer(JSON.stringify(msg))); self.ch.publish(self.name, topic, Buffer.from(JSON.stringify(msg)));
}); });
} }
......
...@@ -41,7 +41,7 @@ QueueCaller.prototype.send = function(msg) ...@@ -41,7 +41,7 @@ QueueCaller.prototype.send = function(msg)
if(err){ if(err){
console.log(err); console.log(err);
} }
self.ch.sendToQueue(self.name, new Buffer(JSON.stringify(msg)), {persistent: true}); self.ch.sendToQueue(self.name, Buffer.from(JSON.stringify(msg)), {persistent: true});
}); });
} }
......
var amqp = require('amqplib/callback_api'); var amqp = require('amqplib/callback_api');
var EventEmitter = require('events').EventEmitter;
var thunky = require('thunky');
const REPLY_QUEUE = 'amq.rabbitmq.reply-to';
function RPCCaller(config)
{
this.config = config;
this.url = config.url;
this.name = config.name || "rpc_queue";
this.conn = null;
this.ch = null;
var self = this;
this.opened = false;
this.open = thunky(open);
this.open();
function open (cb) {
amqp.connect(self.url, function(err, conn) {
if (err){return cb(err)}
conn.createChannel(function(err, ch) {
if (err){return cb(err)}
ch.responseEmitter = new EventEmitter();
ch.responseEmitter.setMaxListeners(0);
ch.prefetch(4);
ch.consume(REPLY_QUEUE ,
(msg) => { ch.responseEmitter.emit(msg.properties.correlationId, JSON.parse(msg.content.toString()))},
{noAck: true});
self.opened = true;
self.conn = conn;
self.ch = ch;
cb();
});
});
}
}
RPCCaller.prototype.call = function(req,cb){
var self = this;
var corr = generateUuid();
self.open(function(err){
if(err){
console.log(err);
}
self.ch.responseEmitter.once(corr, (resp)=>{
cb(null,resp);
});
self.ch.sendToQueue(self.name, Buffer.from(JSON.stringify(req)), { correlationId: corr, replyTo: REPLY_QUEUE,persistent: false })
});
function generateUuid() {
return Math.random().toString() +
Math.random().toString() +
Math.random().toString();
}
}
/*
function RPCCaller(config) function RPCCaller(config)
{ {
this.config = config; this.config = config;
...@@ -35,5 +98,6 @@ RPCCaller.prototype.call = function(req,cb){ ...@@ -35,5 +98,6 @@ RPCCaller.prototype.call = function(req,cb){
Math.random().toString(); Math.random().toString();
} }
} }
*/
module.exports = RPCCaller; module.exports = RPCCaller;
...@@ -23,18 +23,18 @@ RPCServer.prototype.start = function(cb) ...@@ -23,18 +23,18 @@ RPCServer.prototype.start = function(cb)
var q = self.name; var q = self.name;
ch.assertQueue(q, {durable: false}); ch.assertQueue(q, {durable: false});
ch.prefetch(1); ch.prefetch(4);
//console.log(' [x] Awaiting RPC requests'); //console.log(' [x] Awaiting RPC requests');
ch.consume(q, function reply(msg) { ch.consume(q, function reply(msg) {
var req = JSON.parse(msg.content.toString()); var req = JSON.parse(msg.content.toString());
self.remote_function(req,function(err,resp){ self.remote_function(req,function(err,resp){
ch.sendToQueue(msg.properties.replyTo,new Buffer(JSON.stringify(resp)),{correlationId: msg.properties.correlationId}); ch.sendToQueue(msg.properties.replyTo,Buffer.from(JSON.stringify(resp)),{correlationId: msg.properties.correlationId,persistent: false});
}); });
ch.ack(msg); //ch.ack(msg);
}); }, {noAck: true});
cb(null); cb(null);
}); });
......
{ {
"name": "node-bigstream", "name": "node-bigstream",
"description": "", "description": "",
"version": "0.0.1", "version": "1.2.3",
"main": "./bigstream.js", "main": "./bigstream.js",
"author": { "author": {
"name": "Kamron Aroonrua", "name": "Kamron Aroonrua",
...@@ -42,5 +42,10 @@ ...@@ -42,5 +42,10 @@
"request": "^2.79.0", "request": "^2.79.0",
"thunky": "^1.0.2", "thunky": "^1.0.2",
"tiny-worker": "^2.1.1" "tiny-worker": "^2.1.1"
}, "scripts": {
"test": "echo \"Error: no test specified\" && exit 1",
"install": "node ./script/install_plugins.js",
"start": "pm2 start pm2-default.json",
"stop": "pm2 stop pm2-default.json"
} }
} }
...@@ -7,14 +7,26 @@ function perform_function(context,response){ ...@@ -7,14 +7,26 @@ function perform_function(context,response){
var param = context.jobconfig.data_in.param || {}; var param = context.jobconfig.data_in.param || {};
var memstore = context.task.memstore; var memstore = context.task.memstore;
var input_data = context.input.data; var input_data = context.input.data;
var input_meta = context.input.meta; var input_meta = context.input.meta || {};
var output_type = 'object' var output_type = 'object'
var data = input_data; var data = input_data;
if(param.object == 'httpdata') if(param.object == 'httpdata')
{ {
data = extract_httpdata(input_data); if(data.object_type && data.object_type == 'httpdata'){
var htdata = data;
if(param.http_meta){
input_meta.http_headers = htdata.headers;
input_meta.http_method = htdata.method;
}
if(typeof htdata.data == 'object' && htdata.data.type == 'Buffer'){
data = Buffer.from(htdata.data);
}else{
data = htdata.data;
}
}
} }
// memstore.setItem('lasttransaction',transaction_id,function(err){ // memstore.setItem('lasttransaction',transaction_id,function(err){
// response.success(data); // response.success(data);
...@@ -31,14 +43,14 @@ function perform_function(context,response){ ...@@ -31,14 +43,14 @@ function perform_function(context,response){
} }
function extract_httpdata(dat) // function extract_httpdata(dat)
{ // {
if(dat.object_type && dat.object_type == 'httpdata'){ // if(dat.object_type && dat.object_type == 'httpdata'){
return dat.data; // return dat.data;
}else{ // }else{
return dat; // return dat;
} // }
} // }
module.exports = perform_function; module.exports = perform_function;
...@@ -46,7 +46,6 @@ function perform_function(context,response){ ...@@ -46,7 +46,6 @@ function perform_function(context,response){
}).then(() => { }).then(() => {
return sftp.list(prm_dir + '/'); return sftp.list(prm_dir + '/');
}).then((fList) => { }).then((fList) => {
var f_target = null; var f_target = null;
var last_tts = 0; var last_tts = 0;
var sync_list = []; var sync_list = [];
...@@ -70,24 +69,24 @@ function perform_function(context,response){ ...@@ -70,24 +69,24 @@ function perform_function(context,response){
'filesize': f_target.size, 'filesize': f_target.size,
'modify_ts' : Math.round(f_target.modifyTime/1000) 'modify_ts' : Math.round(f_target.modifyTime/1000)
} }
return sftp.get(prm_dir + '/' + f_target.name,null,null); return sftp.get(prm_dir + '/' + f_target.name);
}else{ }else{
return null; return null;
} }
}).then((data) => { }).then((data) => {
if(data){ if(data){
data.on('data',(dat)=>{
var nb = Buffer.concat([buff_out,dat]); var nb = Buffer.concat([buff_out,data]);
buff_out = nb; buff_out = nb;
})
data.on('end',()=>{
sftp.end() sftp.end()
memstore.setItem('lastmodify',last_mod,function(err){ memstore.setItem('lastmodify',last_mod,function(err){
var result=(prm_encoding=='binary')?buff_out:buff_out.toString('utf8'); var result=(prm_encoding=='binary')?buff_out:buff_out.toString('utf8');
response.success(result, {"meta":meta,"continue": fs_continue}); response.success(result, {"meta":meta,"continue": fs_continue});
}); });
});
}else{ }else{
sftp.end(); sftp.end();
response.reject(); response.reject();
...@@ -95,7 +94,7 @@ function perform_function(context,response){ ...@@ -95,7 +94,7 @@ function perform_function(context,response){
}).catch((err) => { }).catch((err) => {
sftp.end(); sftp.end();
response.error(err); response.error(err);
console.log(err, 'catch error'); console.log(err);
}); });
} }
......
...@@ -24,10 +24,12 @@ function perform_function(context,request,response){ ...@@ -24,10 +24,12 @@ function perform_function(context,request,response){
var amqp_cfg = ctx.config.amqp; var amqp_cfg = ctx.config.amqp;
var storage_name = param.storage_name; var storage_name = param.storage_name;
var caller = new RPCCaller({ // var caller = new RPCCaller({
url : amqp_cfg.url, // url : amqp_cfg.url,
name :'storage_request' // name :'storage_request'
}); // });
var caller = storagecaller;
// if(param.channel!='ipc'){ // if(param.channel!='ipc'){
// caller = new RPCCaller({ // caller = new RPCCaller({
......
var util = require('util');
var DTPlugin = require('../dt-plugin');
function DTTask(context,request){
DTPlugin.call(this,context,request);
this.name = "http";
this.output_type = "";
}
util.inherits(DTTask,DTPlugin);
DTTask.prototype.perform = require('./perform');
module.exports = DTTask;
\ No newline at end of file
var ctx = require('../../../context');
var Utils = ctx.getLib('lib/util/plugin-utils');
var request = require("request").defaults({ encoding: null });
function perform_function(context,request,response){
var job_id = context.jobconfig.job_id;
var transaction_id = context.transaction.id;
var param = context.task.config.param || {};
var memstore = context.task.memstore
var output_type = request.input_type;
var data = request.data;
var meta = request.meta || {};
var req_url = param.url || "";
var req_method = param.method || "GET";
var req_headers = param.headers || {};
var req_body_type = param.body_type || "json";
var resp_encode = param.encoding || "text";
var env = {
'type' : output_type,
'data' : data,
'meta' : meta
}
req_url = Utils.vm_execute_text(env,req_url);
//parsing param from meta
if(typeof meta._param == 'object')
{
var _prm = meta._param;
req_url = (_prm.url)?_prm.url:req_url;
req_method = (_prm.method)?_prm.method:req_method;
req_headers = (_prm.headers)?_prm.headers:req_headers;
req_body_type = (_prm.body_type)?_prm.body_type:req_body_type;
resp_encode = (_prm.encoding)?_prm.encoding:resp_encode;
}
send_request({'url':req_url,
'method':req_method,
'headers':req_headers,
'body_type':req_body_type,
'body':data,
'resp_encode':resp_encode},function(err,resp,body){
var respmeta = meta;
Object.keys(respmeta).forEach((k)=>{
if(k.startsWith('_')){delete respmeta[k];}
});
respmeta['_status_code'] = (err)?0:resp.statusCode;
respmeta['_error'] = (err)?true:false;
response.meta = respmeta;
if(!err){
if(resp_encode=='json'){
try{
var j = JSON.parse(body);
response.success(j,output_type);
}catch(err){
response.success({},output_type);
}
}else{
response.success(body,output_type);
}
}else{
response.success(null,output_type);
}
});
//response.success();
//response.reject();
//response.error("error message")
}
function send_request(prm,cb)
{
var options = { method: 'GET',
url: prm.url,
headers:
{ 'cache-control': 'no-cache' }
};
if(prm.method.toLowerCase()=='post' || prm.method.toLowerCase()=='put')
{
options.method = prm.method.toUpperCase();
if(prm.body_type=='json' && typeof prm.body == 'object'){
options.headers['content-type'] = 'application/json';
options.json = prm.body;
}else if(prm.body_type=='text' || typeof prm.body == 'string'){
options.headers['content-type'] = 'text/plain';
options.body = prm.body;
}else{
options.body = prm.body;
}
}
if(typeof prm.headers == 'object')
{
options.headers = Object.assign(options.headers,prm.headers)
}
options.encoding = (prm.resp_encode == 'binary')?null:'utf8';
request(options, function (err, resp, body) {
if (!err) {
cb(err, resp, body);
}else{
cb(new Error("request error"));
}
});
}
module.exports = perform_function;
var util = require('util');
var DTPlugin = require('../dt-plugin');
function DTTask(context,request){
DTPlugin.call(this,context,request);
this.name = "pgpcrypt";
this.output_type = "";
}
util.inherits(DTTask,DTPlugin);
DTTask.prototype.perform = require('./perform');
module.exports = DTTask;
\ No newline at end of file
{
"name": "dt-pgpcrypt",
"version": "1.0.0",
"description": "",
"main": "index.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
},
"author": "",
"license": "ISC",
"dependencies": {
"openpgp": "^4.10.4"
}
}
var ctx = require('../../../context');
var Utils = ctx.getLib('lib/util/plugin-utils');
var pgplib = require('./pgp');
var path = require('path');
var fs = require('fs');
function perform_function(context,request,response){
var job_id = context.jobconfig.job_id;
var transaction_id = context.transaction.id;
var param = context.task.config.param || {};
var memstore = context.task.memstore
var output_type = request.input_type;
var data = request.data;
var meta = request.meta || {};
var req_function = param.function || "decrypt"
var req_publickey = param.publickey || ""
var req_privatekey = param.privatekey || ""
var req_passphrase = param.passphrase || ""
var req_output = param.output|| "binary"
var key_dir = ctx.getConfig('keystore.dir','./keys');
var fn_load_key = function (name) {
var k = "";
var fp = path.join(key_dir ,path.basename(name));
try{
k = fs.readFileSync(fp).toString('utf8');
}catch(e){
}
return k;
}
var env = {
'type' : output_type,
'data' : data,
'meta' : meta,
'_fn' : {
'load_key' : fn_load_key
}
}
req_publickey = Utils.vm_execute_text(env,req_publickey);
req_privatekey = Utils.vm_execute_text(env,req_privatekey);
req_passphrase = Utils.vm_execute_text(env,req_passphrase);
req_output = Utils.vm_execute_text(env,req_output);
//parsing param from meta
if(typeof meta._param == 'object')
{
var _prm = meta._param;
req_function = (_prm.function)?_prm.function:req_function;
req_publickey = (_prm.publickey)?_prm.publickey:req_publickey;
req_privatekey = (_prm.privatekey)?_prm.privatekey:req_privatekey;
req_passphrase = (_prm.passphrase)?_prm.passphrase:req_passphrase;
req_output = (_prm.output)?_prm.output:req_output;
}
if (['decrypt','dec'].indexOf(req_function) >= 0){
pgplib.decrypt({
private_key : req_privatekey,
passphrase : req_passphrase,
armor_in : (typeof data == 'string'),
data : data
}).then(d => {
var dout = d;
if(['utf8','text'].indexOf(req_output) >= 0){
dout = d.toString('utf8');
}else if(req_output == 'base64'){
dout = d.toString('base64');
}
ok_out(dout);
}).catch(e =>{
error_out('decrypt error');
})
} else if (['encrypt','enc'].indexOf(req_function) >= 0){
pgplib.encrypt({
public_key : req_publickey,
armor_out : (['armor','text'].indexOf(req_output) >= 0),
data : data
}).then(d => {
ok_out(d);
}).catch(e =>{
error_out('decrypt error');
})
} else {
error_out('invalid function')
}
function ok_out (out)
{
response.meta = meta;
response.success(out,output_type);
}
function error_out (msg)
{
response.error(msg);
}
//response.success();
//response.reject();
//response.error("error message")
}
module.exports = perform_function;
const openpgp = require('openpgp');
async function encrypt (opt) {
var publicKeyArmored = opt.public_key;
var datain = opt.data;
var armor = (opt.armor_out)?true:false;
var enc = await openpgp.encrypt({
message: openpgp.message.fromBinary(datain),
publicKeys: (await openpgp.key.readArmored(publicKeyArmored)).keys,
armor: armor
});
if(!armor){
enc = Buffer.from(enc.message.packets.write());
}
return enc;
}
async function decrypt (opt) {
var privateKeyArmored = opt.private_key;
var passphrase = opt.passphrase ;
var datain = opt.data;
var armor = (opt.armor_in)?true:false;
var { keys: [privateKey] } = await openpgp.key.readArmored(privateKeyArmored);
await privateKey.decrypt(passphrase);
var msg = null;
if(armor){
msg = await openpgp.message.readArmored(datain);
}else{
msg = await openpgp.message.read(datain);
}
const { data: decrypted } = await openpgp.decrypt({
message: msg,
privateKeys: [privateKey],
format:'binary'
});
return Buffer.from(decrypted);
}
module.exports.encrypt = encrypt;
module.exports.decrypt = decrypt;
\ No newline at end of file
var crypto = require("crypto");
var publicEncrypt = function(data, publicKey) {
var buffer = Buffer.from(data);
var encrypted = crypto.publicEncrypt(publicKey, buffer);
return encrypted.toString("base64");
};
var privateDecrypt = function(crypted_data, privateKey) {
var buffer = Buffer.from(crypted_data, "base64");
var decrypted = crypto.privateDecrypt(privateKey, buffer);
return decrypted.toString("utf8");
};
module.exports = {
publicEncrypt: publicEncrypt,
privateDecrypt: privateDecrypt
}
\ No newline at end of file
var DomParser = require('dom-parser');
var parser = new DomParser();
module.exports = function (html) {
return parser.parseFromString(html);
}
\ No newline at end of file
var crypto = require('crypto');
module.exports.sha256 = function (text) {
var dat = (typeof text == 'string')?text:String(text);
return crypto.createHash('sha256').update(dat).digest('hex');
}
\ No newline at end of file
{
"name": "dt-transform",
"version": "1.0.0",
"description": "",
"main": "index.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
},
"author": "",
"license": "ISC",
"dependencies": {
"dom-parser": "^0.1.6"
}
}
...@@ -11,7 +11,7 @@ function perform_function(context,request,response){ ...@@ -11,7 +11,7 @@ function perform_function(context,request,response){
var in_type = request.input_type; var in_type = request.input_type;
var in_data = request.data; var in_data = request.data;
var in_meta = request.meta; var in_meta = request.meta || {};
var mapenv = { var mapenv = {
'src' : { 'src' : {
...@@ -20,11 +20,21 @@ function perform_function(context,request,response){ ...@@ -20,11 +20,21 @@ function perform_function(context,request,response){
'meta' : in_meta 'meta' : in_meta
}, },
'_env':{}, '_env':{},
'_fn':{},
'type' : in_type, 'type' : in_type,
'data' : in_data, 'data' : in_data,
'meta' : in_meta 'meta' : in_meta
} }
if(param.use_function){
var fns = (Array.isArray(param.use_function))?param.use_function:Array.of(param.use_function);
fns.forEach((fname)=>{
if(typeof fname == 'string' && fname.length>0){
mapenv._fn[fname] = _loadfunc(fname);
}
});
}
if(param.use_register){ if(param.use_register){
memstore.getItem('register',function(err,value){ memstore.getItem('register',function(err,value){
if(err){return response.error("memstore error");} if(err){return response.error("memstore error");}
...@@ -84,4 +94,15 @@ function _compile(mape,param) ...@@ -84,4 +94,15 @@ function _compile(mape,param)
return mapenv; return mapenv;
} }
function _loadfunc(name)
{
var f = null;
try {
f = require('./fn/' + name);
} catch (error) {
}
return f;
}
module.exports = perform_function; module.exports = perform_function;
...@@ -4,6 +4,7 @@ var ctx = require('../../context'); ...@@ -4,6 +4,7 @@ var ctx = require('../../context');
var BinStream = ctx.getLib('lib/bss/binarystream_v1_1'); var BinStream = ctx.getLib('lib/bss/binarystream_v1_1');
var ObjId = ctx.getLib('lib/bss/objid'); var ObjId = ctx.getLib('lib/bss/objid');
var bsdata = ctx.getLib('lib/model/bsdata'); var bsdata = ctx.getLib('lib/model/bsdata');
var thunky = require('thunky');
var importer = require('./importer'); var importer = require('./importer');
var dataevent = require('./dataevent'); var dataevent = require('./dataevent');
...@@ -21,22 +22,53 @@ module.exports.create = function(prm) ...@@ -21,22 +22,53 @@ module.exports.create = function(prm)
function BSSEngine(prm) function BSSEngine(prm)
{ {
var self = this;
if(typeof prm == 'string'){ if(typeof prm == 'string'){
prm = {'file':prm,'context':null}; prm = {'file':prm,'context':null};
} }
// this.repos_dir = prm.repos_dir;
// this.name = prm.name;
this.file = prm.file; this.file = prm.file;
this.name = prm.name; this.name = prm.name;
this.context = (prm.context)?prm.context:null; this.context = (prm.context)?prm.context:null;
this.concurrent = 0; this.concurrent = 0;
this.serial=prm.serial||''; this.serial=prm.serial||'';
this.outdate=false; this.outdate=false;
this.bss=null;
this.open = thunky(openbss);
this.open();
function openbss (cb) {
if(fs.existsSync(self.file))
{
open()
}else{
BinStream.format(self.file,function(err){
if(!err){
open()
}else{
cb("format error")
}
});
}
function open(){
BinStream.open(self.file,function(err,bss){
if(!err){
self.bss = bss;
}
cb(err,bss);
});
}
}
} }
BSSEngine.prototype.filepath = function() BSSEngine.prototype.filepath = function()
{ {
//return this.repos_dir + '/' + name2path(this.name) + '.bss';
return this.file; return this.file;
} }
...@@ -46,48 +78,21 @@ BSSEngine.prototype.exists = function() ...@@ -46,48 +78,21 @@ BSSEngine.prototype.exists = function()
return fs.existsSync(fp); return fs.existsSync(fp);
} }
BSSEngine.prototype.open = function(cb)
{
var self = this;
if(self.exists())
{
open()
}else{
BinStream.format(self.filepath(),function(err){
if(!err){
open()
}else{
cb("format error")
}
});
}
function open(){
BinStream.open(self.filepath(),function(err,bss){
if(!err){
self.bss = bss;
}
cb(err);
});
}
}
BSSEngine.prototype.close = function(cb) BSSEngine.prototype.close = function(cb)
{ {
this.bss.close(cb); var self = this;
self.open((err,bss)=>{
bss.close(cb);
});
} }
BSSEngine.prototype.cmd = function(cmd,cb) BSSEngine.prototype.cmd = function(cmd,cb)
{ {
var self = this
var command = cmd.command; var command = cmd.command;
var param = cmd.param; var param = cmd.param;
var self=this;
switch (command) { switch (command) {
case 'write': case 'write':
self.cmd_write(param,cb); self.cmd_write(param,cb);
...@@ -105,25 +110,29 @@ BSSEngine.prototype.cmd_write = function(prm,cb) ...@@ -105,25 +110,29 @@ BSSEngine.prototype.cmd_write = function(prm,cb)
if(!data){return cb("null data")} if(!data){return cb("null data")}
this.bss.write(data,{'meta':meta},function(err,obj){ self.open((err,bss)=>{
if(!err){ bss.write(data,{'meta':meta},function(err,obj){
var head = obj.getHeader(); if(!err){
var obj_id = new ObjId(head.ID); var head = obj.getHeader();
var resp = { var obj_id = new ObjId(head.ID);
'resource_id' : obj_id.toString(), var resp = {
'storage_name' : self.name 'resource_id' : obj_id.toString(),
} 'storage_name' : self.name
}
//dataevent.newdata({'resourceId':obj_id.toString(),'storageId':self.name});
if(self.context){ //dataevent.newdata({'resourceId':obj_id.toString(),'storageId':self.name});
newdata_event(self.context,{'resourceId':obj_id.toString(),'storageId':self.name}); if(self.context){
newdata_event(self.context,{'resourceId':obj_id.toString(),'storageId':self.name});
}
cb(null,resp);
}else {
cb("write error");
} }
});
cb(null,resp);
}else {
cb("write error");
}
}); });
} }
function newdata_event(ctx,prm) function newdata_event(ctx,prm)
......
...@@ -12,7 +12,7 @@ function BSSPool(prm) ...@@ -12,7 +12,7 @@ function BSSPool(prm)
BSSPool.prototype.get = function(name,opt,cb) BSSPool.prototype.get = function(name,opt,cb)
{ {
if(!cb){cb=opt;opt={};} if(!cb){cb=opt;opt={};}
var self=this; var self=this;
var filepath = this.repos_dir + '/' + name2path(name) + '.bss' var filepath = this.repos_dir + '/' + name2path(name) + '.bss'
var bssname = name; var bssname = name;
...@@ -32,18 +32,14 @@ BSSPool.prototype.get = function(name,opt,cb) ...@@ -32,18 +32,14 @@ BSSPool.prototype.get = function(name,opt,cb)
'name' : bssname, 'name' : bssname,
'newInstance':opt.newInstance 'newInstance':opt.newInstance
}); });
self.pool.push({
bss_engine.open(function(err){ 'name' : name,
if(!err){ 'engine':bss_engine
self.pool.push({
'name' : name,
'engine':bss_engine
});
}
self.clean(function(err){
cb(err,bss_engine);
});
}); });
self.clean(function(err){
cb(err,bss_engine);
});
} }
} }
......
...@@ -29,7 +29,7 @@ module.exports.newdata = function(prm,cb){ ...@@ -29,7 +29,7 @@ module.exports.newdata = function(prm,cb){
var msg = JSON.stringify(objMsg); var msg = JSON.stringify(objMsg);
ch.assertExchange(ex, 'topic', {durable: false}); ch.assertExchange(ex, 'topic', {durable: false});
ch.publish(ex, key, new Buffer(msg)); ch.publish(ex, key, Buffer.from(msg));
//console.log("[AMQP] Sent %s:'%s'", key, msg); //console.log("[AMQP] Sent %s:'%s'", key, msg);
} }
}); });
......
...@@ -36,11 +36,28 @@ router.get('/:id/data',function (req, res) { ...@@ -36,11 +36,28 @@ router.get('/:id/data',function (req, res) {
var opt = { var opt = {
'field' : 'data' 'field' : 'data'
} }
opt.filetype = (query.filetype)?query.filetype:null opt.filetype = (query.file_type || query.filetype)?query.file_type || query.filetype:null;
get_object(reqHelper,respHelper,{'oid':oid,'opt':opt}); get_object(reqHelper,respHelper,{'oid':oid,'opt':opt});
}); });
router.get('/:id/file',function (req, res) {
var reqHelper = request.create(req);
var respHelper = response.create(res);
var query = reqHelper.getQuery();
var oid = req.params.id;
var opt = {
'field' : 'file'
}
opt.filetype = (query.file_type || query.filetype)?query.file_type || query.filetype:null;
opt.filename = (query.file_name || query.filename)?query.file_name || query.filename:null;
opt.download = (query.download)?true:null;
get_object(reqHelper,respHelper,{'oid':oid,'opt':opt});
});
function oid_parse(oid,caller,cb) function oid_parse(oid,caller,cb)
{ {
var ret = {'valid':true} var ret = {'valid':true}
...@@ -177,6 +194,9 @@ function output(resp,obj,opt) ...@@ -177,6 +194,9 @@ function output(resp,obj,opt)
if(opt.field=='data') if(opt.field=='data')
{ {
data_out(resp,obj,opt); data_out(resp,obj,opt);
}else if(opt.field=='file')
{
file_out(resp,obj,opt);
}else{ }else{
obj_out(resp,obj,opt); obj_out(resp,obj,opt);
} }
...@@ -218,5 +238,39 @@ function data_out(resp,obj,opt) ...@@ -218,5 +238,39 @@ function data_out(resp,obj,opt)
} }
function file_out(resp,obj,opt)
{
var objType = obj.header.TY;
var objId = (new ObjId(obj.header.ID)).toString();
var meta = obj.meta || {};
var defName=null;
var defType=null;
if(objType == BinStream.BINARY_TYPE){
defType = "application/octet-stream";
defName = (opt.filetype)?objId + "." + opt.filetype:objId + ".out";
}else if(objType == BinStream.STRING_TYPE){
defType = "text";
defName = (opt.filetype)?objId + "." + opt.filetype:objId + ".out";
}else{
defType = "json";
defName = (opt.filetype)?objId + "." + opt.filetype:objId + ".json";
}
var file_name = opt.filename || meta.file_name || defName;
var file_type = opt.filetype || meta.file_type || defType;
resp.response.type(file_type);
if(opt.download){
resp.response.set('Content-Disposition', 'attachment; filename="' + file_name + '"');
}else{
resp.response.set('Content-Disposition', 'filename="' + file_name + '"');
}
resp.response.send(obj.data);
}
module.exports = router; module.exports = router;
var ctx = require('../context'); var ctx = require('../context');
var async = require('async');
var amqp_cfg = ctx.config.amqp; var amqp_cfg = ctx.config.amqp;
var RPCCaller = ctx.getLib('lib/amqp/rpccaller'); var RPCCaller = ctx.getLib('lib/amqp/rpccaller');
var caller = new RPCCaller({ var caller = new RPCCaller({
url : amqp_cfg.url, url : amqp_cfg.url,
name :'storage_request' name :'test_request'
}); });
var req = { var req = {
'object_type' : 'storage_request', 'object_type' : 'test_request',
'command' : 'write', 'command' : 'write',
'param' : { 'param' : {
'storage_name' : 'gcs.file.test', 'storage_name' : 'gcs.file.test',
...@@ -24,10 +25,29 @@ var req = { ...@@ -24,10 +25,29 @@ var req = {
} }
} }
caller.call(req,function(err,resp){
console.log(resp);
});
var idx = 0;
async.whilst(
function() { return idx < 2000; },
function(callback) {
req.id = idx;
caller.call(req,function(err,resp){
if(idx%100==0){
console.log(resp)
}
idx++;
callback(null);
});
},
function (err) {
if(!err){
console.log('finish')
}else{
console.log(err);
}
}
);
......
...@@ -5,26 +5,14 @@ var rpcserver = ctx.getLib('lib/amqp/rpcserver'); ...@@ -5,26 +5,14 @@ var rpcserver = ctx.getLib('lib/amqp/rpcserver');
var server = new rpcserver({ var server = new rpcserver({
url : amqp_cfg.url, url : amqp_cfg.url,
name : 'test_request'
}); });
server.set_remote_function(function(req,callback){ server.set_remote_function(function(req,callback){
var n = parseInt(req.t); callback(null,{'cmd':req.command,'id':req.id});
console.log('REQUEST ' + req);
setTimeout(function(){
callback(null,{'time':n,'data':req.d});
},n);
}) })
server.start(function(err){ server.start(function(err){
console.log('server start'); console.log('server start');
}) })
var http = require('http');
http.createServer(function (req, res) {
res.writeHead(200, {
'Content-Type': 'text/plain; charset=UTF-8'
});
console.log(req.body);
res.end("req");
}).listen(9080, "");
var thunky = require('thunky')
var _self = null;
function TestThunk()
{
_self = this;
this.rnumber = 0;
}
TestThunk.prototype.init = function()
{
console.log('waiting 1s and returning random number');
}
TestThunk.prototype.open = thunky(function (callback) { // the inner function should only accept a callback
_self.init();
setTimeout(function () {
var ran = Math.random();
_self.rnumber = ran;
callback(null,ran)
}, 1000)
})
TestThunk.prototype.test = function(x){
_self.open((err,num)=>{
console.log(_self.rnumber + ' ' + x)
});
}
var tt = new TestThunk();
tt.test(1);
tt.test(2);
tt.test(3);
tt.test(4);
tt.test(5);
\ No newline at end of file
{ {
"version":"1.2.3", "version":"1.2.3",
"build":"202003061200" "build":"202006011700"
} }
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment