Commit 7de3f23e authored by Kamron Aroonrua's avatar Kamron Aroonrua 💬

Merge branch 'ondev-gcs' into 'dev'

Ondev gcs

See merge request !26
parents 23641578 2ab40828
BIGSTREAM_TAG=dev
BS_SECRET=bigstream-server
REDIS_TAG=4
PREFIX_NO=19
VOLUME=../volume
\ No newline at end of file
......@@ -4,13 +4,17 @@
- BS :: AMQP RPC Singleton Connection ,auto ack
- STORAGE :: 50x speedup
### Added
- BS :: Configuration Context with ENV
- BS :: keystore env
- BS :: httplistener headers
- PLUGIN :: dt-pgpcrypt
- PLUGIN :: dt-transform fn extension
- PLUGIN :: dt-transform register
- BS :: Configuration Context with ENV
- PLUGIN :: do-http
- PLUGIN :: do-bsspeak
- PLUGIN :: do-mqtt
- PLUGIN :: do-ext-storage
- API :: BS Info
### Removed
- STORAGE :: remove ipc channel
......
FROM node:lts-alpine
RUN apk add --no-cache make gcc g++ python linux-headers udev
COPY . /app/node-bigstream
WORKDIR /app/node-bigstream
RUN npm install
FROM node:lts-alpine
COPY --from=0 /app/node-bigstream /app/node-bigstream
RUN npm install -y pm2@latest -g
RUN mkdir -p /var/bigstream/data
EXPOSE 19980 19080 19180
# start server
WORKDIR /app/node-bigstream
CMD ["pm2-runtime", "start", "pm2-default.json"]
\ No newline at end of file
......@@ -5,6 +5,7 @@ var cfg = {
'mqtt' : cfg_load('mqtt.json'),
'memstore' : cfg_load('memstore.json'),
'storage' : cfg_load('storage.json'),
'keystore': cfg_load('keystore.json'),
'auth' : {
'secret': cfg_load('secret.json'),
'acl' : cfg_load('acl.json')
......
{
"dir":"./keys"
}
\ No newline at end of file
......@@ -7,6 +7,15 @@ var cfg = require(CONFIG_PATH);
module.exports.config = cfg;
module.exports.getInfo = function (name)
{
var BSINFO = {
"version" : require('./version')
}
return BSINFO;
}
module.exports.getConfig = function(name,def,opt){
var option = {};
var def_val = def || '';
......@@ -36,18 +45,6 @@ var envcnf = function(init_obj){
var obj=init_obj || {};
var env = process.env;
var envmap = require(ENV_MAP);
// var name_pref = 'bs.config';
// if(name){name_pref=name_pref + '.' + name;}
// if(env[name_pref]){obj=env[name_pref];}
// var nfull = name_pref + '.';
// Object.keys(env).forEach((k)=>{
// if(k.startsWith(nfull)){
// var dotkey = k.substring(nfull.length);
// _dot.set(obj,dotkey,env[k]);
// }
// });
if(!Array.isArray(envmap)){return obj;}
envmap.forEach((em)=>{
......
......@@ -2,6 +2,7 @@ var express = require('express');
var router = express.Router();
router.use('/jobs',require('./ws-jobs'));
router.use('/info',require('./ws-info'));
module.exports = router;
var ctx = require('../../../context');
var express = require('express');
var router = express.Router();
var response = ctx.getLib('lib/ws/response');
var request = ctx.getLib('lib/ws/request');
var Tokenizer = ctx.getLib('lib/auth/tokenizer');
const ACL_SERVICE_NAME = "info";
router.get('/',function (req, res) {
var reqHelper = request.create(req);
var respHelper = response.create(res);
var result=ctx.getInfo();
respHelper.responseOK(result);
});
router.get('/version',function (req, res) {
var reqHelper = request.create(req);
var respHelper = response.create(res);
var info = ctx.getInfo();
var result=info.version;
respHelper.responseOK(result);
});
module.exports = router;
version: '3'
services:
bigstream:
image: "bigstream:${BIGSTREAM_TAG}"
build: ./
container_name: bs_bigstream
restart: always
networks:
- bsnet
ports:
- "19980:19980"
- "19080:19080"
- "19180:19180"
- "19150:19150/udp"
environment:
- "BS_SECRET=${BS_SECRET}"
volumes:
- bsdata:/var/bigstream/data
redis:
image: "redis:${REDIS_TAG}"
command: redis-server --appendonly yes
container_name: bs_redis_server
restart: always
networks:
bsnet:
aliases:
- redis-server
ports:
- "6379:6379"
volumes:
- bsredis:/data
rabbitmq:
image: "igridproject/rabbitmq"
command: rabbitmq-server --hostname rabbitmq-server
container_name: bs_rabbitmq_server
restart: always
networks:
bsnet:
aliases:
- rabbitmq-server
ports:
- "5672:5672"
volumes:
bsdata:
bsredis:
networks:
bsnet:
driver: bridge
\ No newline at end of file
......@@ -6,5 +6,6 @@
{"env":"BSCONFIG_MEMSTORE_URL","conf":"memstore.url"},
{"env":"BSCONFIG_STORAGE_REPOSITORY","conf":"storage.repository"},
{"env":"BSCONFIG_STORAGE_APIHOSTNAME","conf":"storage.api_hostname"},
{"env":"BSCONFIG_SECRET_TEXT","conf":"auth.secret.value"}
{"env":"BSCONFIG_SECRET_TEXT","conf":"auth.secret.value"},
{"env":"BSCONFIG_KEYSTORE_DIR","conf":"keystore.dir"}
]
\ No newline at end of file
......@@ -48,13 +48,13 @@ HTTPListener.prototype._http_start = function()
}
});
app.use(bodyParser.json({limit: '128mb'}));
app.use(bodyParser.json({limit: '128mb',type:"*/json"}));
app.use(bodyParser.urlencoded({
extended: true,
limit: '128mb'
}));
app.use(bodyParser.raw({limit: '128mb'}));
app.use(bodyParser.text({limit: '128mb',type:"text/*"}));
app.use(bodyParser.raw({limit: '128mb',type:"*/*"}));
var context = require('./lib/http-context');
app.use(context.middleware({
......
......@@ -28,6 +28,7 @@ var process_req = function(req, res ,method) {
j.forEach(function(item){
var httpdata = {
'object_type' : 'httpdata',
'headers': req.headers,
'method' : method,
'data' : {}
}
......
......@@ -25,7 +25,7 @@ memstore.prototype.getItem = function(k,cb)
if(!err && v){
if(typeof v == 'object' && v.type == 'Buffer')
{
value = new Buffer.from(v.data);
value = Buffer.from(v.data);
}else{
value = JSON.parse(v);
}
......
......@@ -40,7 +40,7 @@ EventPub.prototype.send = function(topic,msg)
{
var self=this;
this.open(function(err){
self.ch.publish(self.name, topic, new Buffer(JSON.stringify(msg)));
self.ch.publish(self.name, topic, Buffer.from(JSON.stringify(msg)));
});
}
......
......@@ -41,7 +41,7 @@ QueueCaller.prototype.send = function(msg)
if(err){
console.log(err);
}
self.ch.sendToQueue(self.name, new Buffer(JSON.stringify(msg)), {persistent: true});
self.ch.sendToQueue(self.name, Buffer.from(JSON.stringify(msg)), {persistent: true});
});
}
......
......@@ -51,7 +51,7 @@ RPCCaller.prototype.call = function(req,cb){
self.ch.responseEmitter.once(corr, (resp)=>{
cb(null,resp);
});
self.ch.sendToQueue(self.name, new Buffer(JSON.stringify(req)), { correlationId: corr, replyTo: REPLY_QUEUE,persistent: false })
self.ch.sendToQueue(self.name, Buffer.from(JSON.stringify(req)), { correlationId: corr, replyTo: REPLY_QUEUE,persistent: false })
});
function generateUuid() {
......
......@@ -30,7 +30,7 @@ RPCServer.prototype.start = function(cb)
var req = JSON.parse(msg.content.toString());
self.remote_function(req,function(err,resp){
ch.sendToQueue(msg.properties.replyTo,new Buffer(JSON.stringify(resp)),{correlationId: msg.properties.correlationId,persistent: false});
ch.sendToQueue(msg.properties.replyTo,Buffer.from(JSON.stringify(resp)),{correlationId: msg.properties.correlationId,persistent: false});
});
//ch.ack(msg);
......
var BSON = require('buffalo');
var bsonp = require('bson')
var BSONP = new bsonp.BSONPure.BSON()
// var bsonp = require('bson')
// var BSONP = new bsonp.BSONPure.BSON()
const OBJHEADERSIZE = 80;
......
{
"name": "node-bigstream",
"description": "",
"version": "1.2.2",
"version": "1.2.3",
"main": "./bigstream.js",
"author": {
"name": "Kamron Aroonrua",
"email": "kamron.aroonrua@nectec.or.th"
},
"keywords": [],
"licenses": {
"type": "none"
},
"license": "Apache-2.0",
"dependencies": {
"amqplib": "^0.4.2",
"async": "^2.0.1",
......@@ -22,7 +20,6 @@
"dot-prop": "^5.1.0",
"express": "^4.14.0",
"express-jwt": "^5.3.1",
"ioredis": "^2.5.0",
"jsonwebtoken": "^8.2.2",
"minimatch": "^3.0.4",
"minimist": "^1.2.0",
......@@ -31,7 +28,6 @@
"node-gyp": "^3.6.2",
"node-persist": "^2.0.7",
"node-schedule": "^1.2.0",
"node-uuid": "^1.4.7",
"object-hash": "^1.1.8",
"pm2": "^2.4.0",
"qs": "^6.8.0",
......@@ -42,5 +38,10 @@
"request": "^2.79.0",
"thunky": "^1.0.2",
"tiny-worker": "^2.1.1"
}, "scripts": {
"test": "echo \"Error: no test specified\" && exit 1",
"install": "node ./script/install_plugins.js",
"start": "pm2 start pm2-default.json",
"stop": "pm2 stop pm2-default.json"
}
}
......@@ -7,14 +7,26 @@ function perform_function(context,response){
var param = context.jobconfig.data_in.param || {};
var memstore = context.task.memstore;
var input_data = context.input.data;
var input_meta = context.input.meta;
var input_meta = context.input.meta || {};
var output_type = 'object'
var data = input_data;
if(param.object == 'httpdata')
{
data = extract_httpdata(input_data);
if(data.object_type && data.object_type == 'httpdata'){
var htdata = data;
if(param.http_meta){
input_meta.http_headers = htdata.headers;
input_meta.http_method = htdata.method;
}
if(typeof htdata.data == 'object' && htdata.data.type == 'Buffer'){
data = Buffer.from(htdata.data);
}else{
data = htdata.data;
}
}
}
// memstore.setItem('lasttransaction',transaction_id,function(err){
// response.success(data);
......@@ -31,14 +43,14 @@ function perform_function(context,response){
}
function extract_httpdata(dat)
{
if(dat.object_type && dat.object_type == 'httpdata'){
return dat.data;
}else{
return dat;
}
}
// function extract_httpdata(dat)
// {
// if(dat.object_type && dat.object_type == 'httpdata'){
// return dat.data;
// }else{
// return dat;
// }
// }
module.exports = perform_function;
......@@ -46,7 +46,6 @@ function perform_function(context,response){
}).then(() => {
return sftp.list(prm_dir + '/');
}).then((fList) => {
var f_target = null;
var last_tts = 0;
var sync_list = [];
......@@ -95,7 +94,7 @@ function perform_function(context,response){
}).catch((err) => {
sftp.end();
response.error(err);
console.log(err, 'catch error');
console.log(err);
});
}
......
var util = require('util');
var DTPlugin = require('../dt-plugin');
function DTTask(context,request){
DTPlugin.call(this,context,request);
this.name = "pgpcrypt";
this.output_type = "";
}
util.inherits(DTTask,DTPlugin);
DTTask.prototype.perform = require('./perform');
module.exports = DTTask;
\ No newline at end of file
{
"name": "dt-pgpcrypt",
"version": "1.0.0",
"description": "",
"main": "index.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
},
"author": "",
"license": "ISC",
"dependencies": {
"openpgp": "^4.10.4"
}
}
var ctx = require('../../../context');
var Utils = ctx.getLib('lib/util/plugin-utils');
var pgplib = require('./pgp');
var path = require('path');
var fs = require('fs');
function perform_function(context,request,response){
var job_id = context.jobconfig.job_id;
var transaction_id = context.transaction.id;
var param = context.task.config.param || {};
var memstore = context.task.memstore
var output_type = request.input_type;
var data = request.data;
var meta = request.meta || {};
var req_function = param.function || "decrypt"
var req_publickey = param.publickey || ""
var req_privatekey = param.privatekey || ""
var req_passphrase = param.passphrase || ""
var req_output = param.output|| "binary"
var key_dir = ctx.getConfig('keystore.dir','./keys');
var fn_load_key = function (name) {
var k = "";
var fp = path.join(key_dir ,path.basename(name));
try{
k = fs.readFileSync(fp).toString('utf8');
}catch(e){
}
return k;
}
var env = {
'type' : output_type,
'data' : data,
'meta' : meta,
'_fn' : {
'load_key' : fn_load_key
}
}
req_publickey = Utils.vm_execute_text(env,req_publickey);
req_privatekey = Utils.vm_execute_text(env,req_privatekey);
req_passphrase = Utils.vm_execute_text(env,req_passphrase);
req_output = Utils.vm_execute_text(env,req_output);
//parsing param from meta
if(typeof meta._param == 'object')
{
var _prm = meta._param;
req_function = (_prm.function)?_prm.function:req_function;
req_publickey = (_prm.publickey)?_prm.publickey:req_publickey;
req_privatekey = (_prm.privatekey)?_prm.privatekey:req_privatekey;
req_passphrase = (_prm.passphrase)?_prm.passphrase:req_passphrase;
req_output = (_prm.output)?_prm.output:req_output;
}
if (['decrypt','dec'].indexOf(req_function) >= 0){
pgplib.decrypt({
private_key : req_privatekey,
passphrase : req_passphrase,
armor_in : (typeof data == 'string'),
data : data
}).then(d => {
var dout = d;
if(['utf8','text'].indexOf(req_output) >= 0){
dout = d.toString('utf8');
}else if(req_output == 'base64'){
dout = d.toString('base64');
}
ok_out(dout);
}).catch(e =>{
error_out('decrypt error');
})
} else if (['encrypt','enc'].indexOf(req_function) >= 0){
pgplib.encrypt({
public_key : req_publickey,
armor_out : (['armor','text'].indexOf(req_output) >= 0),
data : data
}).then(d => {
ok_out(d);
}).catch(e =>{
error_out('decrypt error');
})
} else {
error_out('invalid function')
}
function ok_out (out)
{
response.meta = meta;
response.success(out,output_type);
}
function error_out (msg)
{
response.error(msg);
}
//response.success();
//response.reject();
//response.error("error message")
}
module.exports = perform_function;
const openpgp = require('openpgp');
async function encrypt (opt) {
var publicKeyArmored = opt.public_key;
var datain = opt.data;
var armor = (opt.armor_out)?true:false;
var enc = await openpgp.encrypt({
message: openpgp.message.fromBinary(datain),
publicKeys: (await openpgp.key.readArmored(publicKeyArmored)).keys,
armor: armor
});
if(!armor){
enc = Buffer.from(enc.message.packets.write());
}
return enc;
}
async function decrypt (opt) {
var privateKeyArmored = opt.private_key;
var passphrase = opt.passphrase ;
var datain = opt.data;
var armor = (opt.armor_in)?true:false;
var { keys: [privateKey] } = await openpgp.key.readArmored(privateKeyArmored);
await privateKey.decrypt(passphrase);
var msg = null;
if(armor){
msg = await openpgp.message.readArmored(datain);
}else{
msg = await openpgp.message.read(datain);
}
const { data: decrypted } = await openpgp.decrypt({
message: msg,
privateKeys: [privateKey],
format:'binary'
});
return Buffer.from(decrypted);
}
module.exports.encrypt = encrypt;
module.exports.decrypt = decrypt;
\ No newline at end of file
......@@ -29,7 +29,7 @@ module.exports.newdata = function(prm,cb){
var msg = JSON.stringify(objMsg);
ch.assertExchange(ex, 'topic', {durable: false});
ch.publish(ex, key, new Buffer(msg));
ch.publish(ex, key, Buffer.from(msg));
//console.log("[AMQP] Sent %s:'%s'", key, msg);
}
});
......
......@@ -11,7 +11,7 @@ var ctx = require('../context');
//
// console.log(sel.data);
var uuid = require('node-uuid');
// var uuid = require('node-uuid');
// console.log(uuid.v1());
//
......
{
"version":"1.2.3",
"build":"202005181700"
"build":"202007171500"
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment