Commit e62e0fae authored by Kamron Aroonrua's avatar Kamron Aroonrua 💬

Merge branch 'dev' into 'master'

Dev

See merge request !27
parents ec467260 7de3f23e
BIGSTREAM_TAG=dev
BS_SECRET=bigstream-server
REDIS_TAG=4
PREFIX_NO=19
VOLUME=../volume
\ No newline at end of file
package-lock.json
node_modules
\ No newline at end of file
#Changelog
## [1.2.3-UR]
### Fixed
- BS :: AMQP RPC Singleton Connection ,auto ack
- STORAGE :: 50x speedup
### Added
- BS :: Configuration Context with ENV
- BS :: keystore env
- BS :: httplistener headers
- PLUGIN :: dt-pgpcrypt
- PLUGIN :: dt-transform fn extension
- PLUGIN :: dt-transform register
- PLUGIN :: do-http
- PLUGIN :: do-bsspeak
- PLUGIN :: do-mqtt
- PLUGIN :: do-ext-storage
- API :: BS Info
### Removed
- STORAGE :: remove ipc channel
## [1.2.2] - 2019-03-01
### Added
- PLUGIN :: dt-pdfmaker
......
FROM node:lts-alpine
RUN apk add --no-cache make gcc g++ python linux-headers udev
COPY . /app/node-bigstream
WORKDIR /app/node-bigstream
RUN npm install
FROM node:lts-alpine
COPY --from=0 /app/node-bigstream /app/node-bigstream
RUN npm install -y pm2@latest -g
RUN mkdir -p /var/bigstream/data
EXPOSE 19980 19080 19180
# start server
WORKDIR /app/node-bigstream
CMD ["pm2-runtime", "start", "pm2-default.json"]
\ No newline at end of file
module.exports = {
'amqp' : require('./amqp.json'),
'memstore' : require('./memstore.json'),
'storage' : require('./storage.json'),
var fs = require('fs');
var cfg = {
'amqp' : cfg_load('amqp.json'),
'mqtt' : cfg_load('mqtt.json'),
'memstore' : cfg_load('memstore.json'),
'storage' : cfg_load('storage.json'),
'keystore': cfg_load('keystore.json'),
'auth' : {
'secret': require('./secret.json'),
'acl' : require('./acl.json')
'secret': cfg_load('secret.json'),
'acl' : cfg_load('acl.json')
}
}
function cfg_load(fd)
{
var ret = {};
if(fs.existsSync(__dirname + '/' + fd)){
ret = require(__dirname + '/' + fd);
}else if(fs.existsSync(__dirname + '/template/' + fd)){
ret = require(__dirname + '/template/' + fd);
}
return ret;
}
module.exports = cfg;
\ No newline at end of file
[
{"env":"BSCONFIG_AMQP_TYPE","conf":"amqp.type"},
{"env":"BSCONFIG_AMQP_URL","conf":"amqp.url"},
{"env":"BSCONFIG_MQTT_URL","conf":"mqtt.url"},
{"env":"BSCONFIG_MEMSTORE_TYPE","conf":"memstore.type"},
{"env":"BSCONFIG_MEMSTORE_URL","conf":"memstore.url"},
{"env":"BSCONFIG_STORAGE_REPOSITORY","conf":"storage.repository"},
{"env":"BSCONFIG_STORAGE_APIHOSTNAME","conf":"storage.api_hostname"},
{"env":"BSCONFIG_SECRET_TEXT","conf":"auth.secret.value"}
]
\ No newline at end of file
module.exports = require('./default.json');
\ No newline at end of file
{
"dir":"./keys"
}
\ No newline at end of file
{
"url" : "mqtt://rabbitmq-server"
}
\ No newline at end of file
var CONFIG_PATH = './conf/config';
var _dot = require('dot-prop');
module.exports.config = require(CONFIG_PATH);
var CONFIG_PATH = __dirname + '/conf/config';
var ENV_MAP = __dirname + '/env/index.js';
var cfg = require(CONFIG_PATH);
module.exports.config = cfg;
module.exports.getInfo = function (name)
{
var BSINFO = {
"version" : require('./version')
}
return BSINFO;
}
module.exports.getConfig = function(name,def,opt){
var option = {};
var def_val = def || '';
var ret=def_val;
if(typeof opt == 'object'){option=opt;}
if(typeof option.env == 'undefined'){option.env=true;}
if(typeof name != 'string' || name=='.'){name='';}
var bs_cfg={};
if(option.env){
bs_cfg = envcnf(cfg);
}else{
bs_cfg = cfg;
}
if(typeof name != 'string' || name=='.' || name =='' || name == '*'){
ret = bs_cfg;
}else{
ret = _dot.get(bs_cfg,name,def_val);
}
return ret;
}
var envcnf = function(init_obj){
var obj=init_obj || {};
var env = process.env;
var envmap = require(ENV_MAP);
if(!Array.isArray(envmap)){return obj;}
envmap.forEach((em)=>{
if(em.env && em.conf && env[em.env]){
_dot.set(obj,em.conf,env[em.env]);
}
});
return obj;
}
module.exports.getEnvConf = envcnf
module.exports.getLib = function(name){
if(name)
......@@ -35,5 +90,3 @@ module.exports.getUnixSocketUrl = function(name){
var sockname = name || 'test.sock';
return 'unix://' + __dirname + '/tmp/' + sockname;
}
//module.exports.socket_dir = __dirname + '/tmp';
//module.exports.tmp_dir = __dirname + '/tmp';
......@@ -2,6 +2,7 @@ var express = require('express');
var router = express.Router();
router.use('/jobs',require('./ws-jobs'));
router.use('/info',require('./ws-info'));
module.exports = router;
var ctx = require('../../../context');
var express = require('express');
var router = express.Router();
var response = ctx.getLib('lib/ws/response');
var request = ctx.getLib('lib/ws/request');
var Tokenizer = ctx.getLib('lib/auth/tokenizer');
const ACL_SERVICE_NAME = "info";
router.get('/',function (req, res) {
var reqHelper = request.create(req);
var respHelper = response.create(res);
var result=ctx.getInfo();
respHelper.responseOK(result);
});
router.get('/version',function (req, res) {
var reqHelper = request.create(req);
var respHelper = response.create(res);
var info = ctx.getInfo();
var result=info.version;
respHelper.responseOK(result);
});
module.exports = router;
version: '3'
services:
bigstream:
image: "bigstream:${BIGSTREAM_TAG}"
build: ./
container_name: bs_bigstream
restart: always
networks:
- bsnet
ports:
- "19980:19980"
- "19080:19080"
- "19180:19180"
- "19150:19150/udp"
environment:
- "BS_SECRET=${BS_SECRET}"
volumes:
- bsdata:/var/bigstream/data
redis:
image: "redis:${REDIS_TAG}"
command: redis-server --appendonly yes
container_name: bs_redis_server
restart: always
networks:
bsnet:
aliases:
- redis-server
ports:
- "6379:6379"
volumes:
- bsredis:/data
rabbitmq:
image: "igridproject/rabbitmq"
command: rabbitmq-server --hostname rabbitmq-server
container_name: bs_rabbitmq_server
restart: always
networks:
bsnet:
aliases:
- rabbitmq-server
ports:
- "5672:5672"
volumes:
bsdata:
bsredis:
networks:
bsnet:
driver: bridge
\ No newline at end of file
[
{"env":"BSCONFIG_AMQP_TYPE","conf":"amqp.type"},
{"env":"BSCONFIG_AMQP_URL","conf":"amqp.url"},
{"env":"BSCONFIG_MQTT_URL","conf":"mqtt.url"},
{"env":"BSCONFIG_MEMSTORE_TYPE","conf":"memstore.type"},
{"env":"BSCONFIG_MEMSTORE_URL","conf":"memstore.url"},
{"env":"BSCONFIG_STORAGE_REPOSITORY","conf":"storage.repository"},
{"env":"BSCONFIG_STORAGE_APIHOSTNAME","conf":"storage.api_hostname"},
{"env":"BSCONFIG_SECRET_TEXT","conf":"auth.secret.value"},
{"env":"BSCONFIG_KEYSTORE_DIR","conf":"keystore.dir"}
]
\ No newline at end of file
module.exports = require('./default.json');
\ No newline at end of file
......@@ -48,13 +48,13 @@ HTTPListener.prototype._http_start = function()
}
});
app.use(bodyParser.json({limit: '128mb'}));
app.use(bodyParser.json({limit: '128mb',type:"*/json"}));
app.use(bodyParser.urlencoded({
extended: true,
limit: '128mb'
}));
app.use(bodyParser.raw({limit: '128mb'}));
app.use(bodyParser.text({limit: '128mb',type:"text/*"}));
app.use(bodyParser.raw({limit: '128mb',type:"*/*"}));
var context = require('./lib/http-context');
app.use(context.middleware({
......
......@@ -28,6 +28,7 @@ var process_req = function(req, res ,method) {
j.forEach(function(item){
var httpdata = {
'object_type' : 'httpdata',
'headers': req.headers,
'method' : method,
'data' : {}
}
......
......@@ -108,7 +108,7 @@ JobTask.prototype.run = function ()
perform_di({'context':context,'handle':self} ,function(err,resp){
if(resp){
console.log('[DI STATUS]\t\t: ' + resp.status);
// console.log('[DI STATUS]\t\t: ' + resp.status);
self.stats.di = resp;
}
if(resp.status == 'success'){
......@@ -149,7 +149,7 @@ JobTask.prototype.run = function ()
perform_dt({'cfg':cur_cfg,'name':dt_name,'context':context,'request':dt_request,'handle':self},function(err,dt_resp){
if(dt_resp){
console.log('[DT:' + dt_name + ' STATUS]\t\t: ' + dt_resp.status);
// console.log('[DT:' + dt_name + ' STATUS]\t\t: ' + dt_resp.status);
}
idx++;
......@@ -188,7 +188,9 @@ JobTask.prototype.run = function ()
dm_o.run(function() {
perform_do({'context':context,'request':do_request,'handle':self},function(err,do_resp){
if(do_resp){console.log('[DO STATUS]\t\t: ' + do_resp.status);}
if(do_resp){
//console.log('[DO STATUS]\t\t: ' + do_resp.status);
}
if(do_resp.status == 'success'){
callback(null,do_resp);
}else {
......@@ -205,18 +207,21 @@ JobTask.prototype.run = function ()
//self.emit('error',new Error('job execution timeout'))
},self.job_timeout);
console.log('***** JOB RUNNING *****');
console.log('[JOB ID]\t\t: ' + job_id);
console.log('[TRANSACTION ID]\t: ' + transaction_id);
// console.log('***** JOB RUNNING *****');
// console.log('[JOB ID]\t\t: ' + job_id);
// console.log('[TRANSACTION ID]\t: ' + transaction_id);
async.waterfall([task_di,task_dt,task_do],function (err,resp) {
clearTimeout(jtimeout);
//console.log('[JOB DONE] id=' + job_id + ' ,tr=' + transaction_id + '\t' + resp.status);
if(!err){
self.stop(resp)
console.log('***** JOB SUCCESSFULLY DONE *****\n');
// console.log('***** JOB SUCCESSFULLY DONE *****\n');
console.log('[JOB DONE] id=' + job_id + ' ,tr=' + transaction_id + '\tsuccess');
}else{
self.stop(err)
console.log('***** JOB UNSUCCESSFULLY DONE *****\n');
// console.log('***** JOB UNSUCCESSFULLY DONE *****\n');
console.log('[JOB DONE] id=' + job_id + ' ,tr=' + transaction_id + '\tunsuccess');
}
});
......
module.exports.create = function (regis)
{
return new MemsRegister(regis);
}
function MemsRegister(regis)
{
this._register = {};
if(typeof regis == 'object' && regis != null){
this._register = regis;
}
}
MemsRegister.prototype.init = function(name,val){
var value = (val)?val:'';
if(typeof this._register[name]=='undefined'){
this._register[name]=value;
}
}
MemsRegister.prototype.get = function(name){
if(name){
return this._register[name];
}
return this._register;
}
MemsRegister.prototype.set= function(name,val){
if(name){
this._register[name]=val;
}
}
MemsRegister.prototype.reset= function(name){
if(name){
delete this._register[name];
}else{
this._register={};
}
}
MemsRegister.prototype.init_counter = function(name,val){
if(!name){return;}
if(typeof this._register[name]!='number'){
this._register[name]=(typeof val == 'number')?val:0;
}
}
MemsRegister.prototype.inc = function(name,val){
if(!name){return;}
if(typeof val != 'number'){val=1}
if(typeof this._register[name]!='number'){
this.init_counter(name);
}
this._register[name]+=val;
}
\ No newline at end of file
......@@ -25,7 +25,7 @@ memstore.prototype.getItem = function(k,cb)
if(!err && v){
if(typeof v == 'object' && v.type == 'Buffer')
{
value = new Buffer(v.data);
value = Buffer.from(v.data);
}else{
value = JSON.parse(v);
}
......
......@@ -6,6 +6,7 @@ var QueueReceiver = ctx.getLib('lib/amqp/queuereceiver');
var ConnCtx = ctx.getLib('lib/conn/connection-context');
var JobRegistry = ctx.getLib('lib/mems/job-registry');
var SSCaller = ctx.getLib('lib/axon/rpccaller');
var RPCCaller = ctx.getLib('lib/amqp/rpccaller');
var ACLValidator = ctx.getLib('lib/auth/acl-validator');
var JobTransaction = require('./lib/jobtransaction')
......@@ -32,7 +33,12 @@ var JW = function JobWorker (prm)
this.job_registry = JobRegistry.create({'redis':this.mem});
this.acl_validator = ACLValidator.create(this.auth_cfg);
this.storagecaller = new SSCaller({'url':SS_URL});
/* Disable RPC Feature */
//this.storagecaller = new SSCaller({'url':SS_URL});
this.storagecaller = new RPCCaller({
url : this.conn.getAmqpUrl(),
name :'storage_request'
});
}
JW.prototype.start = function ()
......
......@@ -40,7 +40,7 @@ EventPub.prototype.send = function(topic,msg)
{
var self=this;
this.open(function(err){
self.ch.publish(self.name, topic, new Buffer(JSON.stringify(msg)));
self.ch.publish(self.name, topic, Buffer.from(JSON.stringify(msg)));
});
}
......
......@@ -41,7 +41,7 @@ QueueCaller.prototype.send = function(msg)
if(err){
console.log(err);
}
self.ch.sendToQueue(self.name, new Buffer(JSON.stringify(msg)), {persistent: true});
self.ch.sendToQueue(self.name, Buffer.from(JSON.stringify(msg)), {persistent: true});
});
}
......
var amqp = require('amqplib/callback_api');
var EventEmitter = require('events').EventEmitter;
var thunky = require('thunky');
const REPLY_QUEUE = 'amq.rabbitmq.reply-to';
function RPCCaller(config)
{
this.config = config;
this.url = config.url;
this.name = config.name || "rpc_queue";
this.conn = null;
this.ch = null;
var self = this;
this.opened = false;
this.open = thunky(open);
this.open();
function open (cb) {
amqp.connect(self.url, function(err, conn) {
if (err){return cb(err)}
conn.createChannel(function(err, ch) {
if (err){return cb(err)}
ch.responseEmitter = new EventEmitter();
ch.responseEmitter.setMaxListeners(0);
ch.prefetch(4);
ch.consume(REPLY_QUEUE ,
(msg) => { ch.responseEmitter.emit(msg.properties.correlationId, JSON.parse(msg.content.toString()))},
{noAck: true});
self.opened = true;
self.conn = conn;
self.ch = ch;
cb();
});
});
}
}
RPCCaller.prototype.call = function(req,cb){
var self = this;
var corr = generateUuid();
self.open(function(err){
if(err){
console.log(err);
}
self.ch.responseEmitter.once(corr, (resp)=>{
cb(null,resp);
});
self.ch.sendToQueue(self.name, Buffer.from(JSON.stringify(req)), { correlationId: corr, replyTo: REPLY_QUEUE,persistent: false })
});
function generateUuid() {
return Math.random().toString() +
Math.random().toString() +
Math.random().toString();
}
}
/*
function RPCCaller(config)
{
this.config = config;
......@@ -35,5 +98,6 @@ RPCCaller.prototype.call = function(req,cb){
Math.random().toString();
}
}
*/
module.exports = RPCCaller;
......@@ -23,18 +23,18 @@ RPCServer.prototype.start = function(cb)
var q = self.name;
ch.assertQueue(q, {durable: false});
ch.prefetch(1);
ch.prefetch(4);
//console.log(' [x] Awaiting RPC requests');
ch.consume(q, function reply(msg) {
var req = JSON.parse(msg.content.toString());
self.remote_function(req,function(err,resp){
ch.sendToQueue(msg.properties.replyTo,new Buffer(JSON.stringify(resp)),{correlationId: msg.properties.correlationId});
ch.sendToQueue(msg.properties.replyTo,Buffer.from(JSON.stringify(resp)),{correlationId: msg.properties.correlationId,persistent: false});
});
ch.ack(msg);
});
//ch.ack(msg);
}, {noAck: true});
cb(null);
});
......
var BSON = require('buffalo');
var bsonp = require('bson')
var BSONP = new bsonp.BSONPure.BSON()
// var bsonp = require('bson')
// var BSONP = new bsonp.BSONPure.BSON()
const OBJHEADERSIZE = 80;
......
{
"name": "node-bigstream",
"description": "",
"version": "0.0.1",
"version": "1.2.3",
"main": "./bigstream.js",
"author": {
"name": "Kamron Aroonrua",
"email": "kamron.aroonrua@nectec.or.th"
},
"keywords": [],
"licenses": {
"type": "none"
},
"license": "Apache-2.0",
"dependencies": {
"amqplib": "^0.4.2",
"async": "^2.0.1",
......@@ -19,20 +17,20 @@
"bson": "^0.5.6",
"buffalo": "^0.1.3",
"dateformat": "^1.0.12",
"dot-prop": "^5.1.0",
"express": "^4.14.0",
"express-jwt": "^5.3.1",
"ioredis": "^2.5.0",
"jsonwebtoken": "^8.2.2",
"microgear": "^0.8.1",
"minimatch": "^3.0.4",
"minimist": "^1.2.0",
"moment": "^2.17.1",
"mqtt": "^3.0.0",
"node-gyp": "^3.6.2",
"node-persist": "^2.0.7",
"node-schedule": "^1.2.0",
"node-uuid": "^1.4.7",
"object-hash": "^1.1.8",
"pm2": "^2.4.0",
"qs": "^6.8.0",
"query-string": "^4.2.3",
"quickq": "^0.8.1",
"random-access-file": "^1.3.0",
......@@ -40,5 +38,10 @@
"request": "^2.79.0",
"thunky": "^1.0.2",
"tiny-worker": "^2.1.1"
}, "scripts": {
"test": "echo \"Error: no test specified\" && exit 1",
"install": "node ./script/install_plugins.js",
"start": "pm2 start pm2-default.json",
"stop": "pm2 stop pm2-default.json"
}
}
......@@ -7,14 +7,26 @@ function perform_function(context,response){
var param = context.jobconfig.data_in.param || {};
var memstore = context.task.memstore;
var input_data = context.input.data;
var input_meta = context.input.meta;
var input_meta = context.input.meta || {};
var output_type = 'object'
var data = input_data;
if(param.object == 'httpdata')
{
data = extract_httpdata(input_data);
if(data.object_type && data.object_type == 'httpdata'){
var htdata = data;
if(param.http_meta){
input_meta.http_headers = htdata.headers;
input_meta.http_method = htdata.method;
}
if(typeof htdata.data == 'object' && htdata.data.type == 'Buffer'){
data = Buffer.from(htdata.data);
}else{
data = htdata.data;
}
}
}
// memstore.setItem('lasttransaction',transaction_id,function(err){
// response.success(data);
......@@ -31,14 +43,14 @@ function perform_function(context,response){
}
function extract_httpdata(dat)
{
if(dat.object_type && dat.object_type == 'httpdata'){
return dat.data;
}else{
return dat;
}
}
// function extract_httpdata(dat)
// {
// if(dat.object_type && dat.object_type == 'httpdata'){
// return dat.data;
// }else{
// return dat;
// }
// }
module.exports = perform_function;
......@@ -46,7 +46,6 @@ function perform_function(context,response){
}).then(() => {
return sftp.list(prm_dir + '/');
}).then((fList) => {
var f_target = null;
var last_tts = 0;
var sync_list = [];
......@@ -70,24 +69,24 @@ function perform_function(context,response){
'filesize': f_target.size,
'modify_ts' : Math.round(f_target.modifyTime/1000)
}
return sftp.get(prm_dir + '/' + f_target.name,null,null);
return sftp.get(prm_dir + '/' + f_target.name);
}else{
return null;
}
}).then((data) => {
if(data){
data.on('data',(dat)=>{
var nb = Buffer.concat([buff_out,dat]);
var nb = Buffer.concat([buff_out,data]);
buff_out = nb;
})
data.on('end',()=>{
sftp.end()
memstore.setItem('lastmodify',last_mod,function(err){
var result=(prm_encoding=='binary')?buff_out:buff_out.toString('utf8');
response.success(result, {"meta":meta,"continue": fs_continue});
});
});
}else{
sftp.end();
response.reject();
......@@ -95,7 +94,7 @@ function perform_function(context,response){
}).catch((err) => {
sftp.end();
response.error(err);
console.log(err, 'catch error');
console.log(err);
});
}
......
var util = require('util');
var DOPlugin = require('../do-plugin');
function DOTask(context,request){
DOPlugin.call(this,context,request);
this.name = "bsspeak";
}
util.inherits(DOTask,DOPlugin);
DOTask.prototype.perform = require('./perform');
module.exports = DOTask;
var ctx = require('../../../context');
var Utils = ctx.getLib('lib/util/plugin-utils');
var mqtt = require('mqtt')
var async = require('async');
function perform_function(context,request,response){
var job_id = context.jobconfig.job_id;
var transaction_id = context.transaction.id;
var param = context.jobconfig.data_out.param || {};
var memstore = context.task.memstore;
var in_type = request.type;
var data = (Array.isArray(request.data))?request.data:[request.data];
var meta = request.meta;
var prm_url = param.url || "mqtt://127.0.0.1";
var prm_topic = param.topic;
var client = mqtt.connect(prm_url);
if(!prm_topic){
prm_topic = "/bsspeak/job/" + job_id;
}
client.on('connect', function () {
var idx = 0;
async.whilst(
function() { return idx < data.length; },
function(callback) {
var ev = {
'type' : in_type,
'meta' : meta,
'data' : data[idx]
}
var topic=Utils.vm_execute_text(ev,prm_topic);
client.publish(topic,JSON.stringify(data[idx]),function(err){
idx++;
callback(err);
});
},
function (err) {
if(!err){
response.success();
}else{
console.log(err);
response.error("publish error");
}
client.end();
}
);
});
//response.success();
//response.reject();
//response.error("error message")
}
module.exports = perform_function;
var util = require('util');
var DOPlugin = require('../do-plugin');
function DOTask(context,request){
DOPlugin.call(this,context,request);
this.name = "ext-storage";
}
util.inherits(DOTask,DOPlugin);
DOTask.prototype.perform = require('./perform');
module.exports = DOTask;
var ctx = require('../../../context');
var Utils = ctx.getLib('lib/util/plugin-utils');
var bsdata = ctx.getLib('lib/model/bsdata');
var request = require("request");
function perform_function(context,request,response){
var job_id = context.jobconfig.job_id;
var transaction_id = context.transaction.id;
var param = context.jobconfig.data_out.param;
var memstore = context.task.memstore
var output_type = request.input_type;
var data = request.data;
var meta = request.meta;
var token = param.token;
var api_url = param.api;
var storage_name = param.storage_name;
var env = {
'type' : output_type,
'data' : data,
'meta' : meta
}
var sname = Utils.vm_execute_text(env,storage_name);
var en_data = bsdata.create(data).serialize('object-encoded');
var storage_url = api_url + '/storage/' + sname
var msgbody = {
'meta':meta,
'data':en_data
}
send_to_storage({'api':storage_url,'token':token,'body':msgbody},function(err){
if(!err){
response.success();
}else{
response.error(err);
}
})
//response.success();
//response.reject();
//response.error("error message")
}
function send_to_storage(prm,cb)
{
var options = { method: 'PUT',
url: prm.api,
headers:
{ 'cache-control': 'no-cache',
'content-type': 'application/json' },
json: prm.body
};
if(prm.token){options.headers.authorization='Bearer ' + prm.token;}
request(options, function (err, resp, body) {
if (!err && resp.statusCode==200) {
cb();
}else{
cb(new Error("api error"));
}
});
}
module.exports = perform_function;
var util = require('util');
var DOPlugin = require('../do-plugin');
function DOTask(context,request){
DOPlugin.call(this,context,request);
this.name = "http";
}
util.inherits(DOTask,DOPlugin);
DOTask.prototype.perform = require('./perform');
module.exports = DOTask;
var ctx = require('../../../context');
var Utils = ctx.getLib('lib/util/plugin-utils');
var request = require("request");
function perform_function(context,request,response){
var job_id = context.jobconfig.job_id;
var transaction_id = context.transaction.id;
var param = context.jobconfig.data_out.param;
var memstore = context.task.memstore
var output_type = request.input_type;
var data = request.data;
var meta = request.meta;
var req_url = param.url || "";
var req_method = param.method || "GET";
var req_body_type = param.body_type || "json";
var env = {
'type' : output_type,
'data' : data,
'meta' : meta
}
var req_url = Utils.vm_execute_text(env,req_url);
send_request({'url':req_url,'method':req_method,'headers':param.headers,'body_type':req_body_type,'body':data},function(err){
if(!err){
response.success();
}else{
response.error(err);
}
})
//response.success();
//response.reject();
//response.error("error message")
}
function send_request(prm,cb)
{
var options = { method: 'GET',
url: prm.url,
headers:
{ 'cache-control': 'no-cache' }
};
if(prm.method.toLowerCase()=='post' || prm.method.toLowerCase()=='put')
{
options.method = prm.method.toUpperCase();
if(prm.body_type=='json' && typeof prm.body == 'object'){
options.headers['content-type'] = 'application/json';
options.json = prm.body;
}else if(prm.body_type=='text' || typeof prm.body == 'string'){
options.headers['content-type'] = 'text/plain';
options.body = prm.body;
}else{
options.body = prm.body;
}
}
if(typeof prm.headers == 'object')
{
options.headers = Object.assign(options.headers,prm.headers)
}
request(options, function (err, resp, body) {
if (!err) {
cb();
}else{
cb(new Error("request error"));
}
});
}
module.exports = perform_function;
var util = require('util');
var DOPlugin = require('../do-plugin');
function DOTask(context,request){
DOPlugin.call(this,context,request);
this.name = "mqtt";
}
util.inherits(DOTask,DOPlugin);
DOTask.prototype.perform = require('./perform');
module.exports = DOTask;
var ctx = require('../../../context');
var Utils = ctx.getLib('lib/util/plugin-utils');
var mqtt = require('mqtt')
var async = require('async');
function perform_function(context,request,response){
var job_id = context.jobconfig.job_id;
var transaction_id = context.transaction.id;
var param = context.jobconfig.data_out.param;
var memstore = context.task.memstore;
var in_type = request.type;
var data = (Array.isArray(request.data))?request.data:[request.data];
var meta = request.meta;
var prm_url = param.url || ctx.getConfig('mqtt.url','mqtt://127.0.0.1');
var prm_topic = param.topic;
var client = mqtt.connect(prm_url);
client.on('connect', function () {
var idx = 0;
async.whilst(
function() { return idx < data.length; },
function(callback) {
var ev = {
'type' : in_type,
'meta' : meta,
'data' : data[idx]
}
var topic=Utils.vm_execute_text(ev,prm_topic);
client.publish(topic,data[idx],function(err){
idx++;
callback(err);
});
},
function (err) {
if(!err){
response.success();
}else{
console.log(err);
response.error("publish error");
}
client.end();
}
);
});
//response.success();
//response.reject();
//response.error("error message")
}
module.exports = perform_function;
......@@ -24,14 +24,19 @@ function perform_function(context,request,response){
var amqp_cfg = ctx.config.amqp;
var storage_name = param.storage_name;
// var caller = new RPCCaller({
// url : amqp_cfg.url,
// name :'storage_request'
// });
var caller = storagecaller;
if(param.channel!='ipc'){
caller = new RPCCaller({
url : amqp_cfg.url,
name :'storage_request'
});
}
// if(param.channel!='ipc'){
// caller = new RPCCaller({
// url : amqp_cfg.url,
// name :'storage_request'
// });
// }
var dc_meta = {
......
var util = require('util');
var DTPlugin = require('../dt-plugin');
function DTTask(context,request){
DTPlugin.call(this,context,request);
this.name = "http";
this.output_type = "";
}
util.inherits(DTTask,DTPlugin);
DTTask.prototype.perform = require('./perform');
module.exports = DTTask;
\ No newline at end of file
var ctx = require('../../../context');
var Utils = ctx.getLib('lib/util/plugin-utils');
var request = require("request").defaults({ encoding: null });
function perform_function(context,request,response){
var job_id = context.jobconfig.job_id;
var transaction_id = context.transaction.id;
var param = context.task.config.param || {};
var memstore = context.task.memstore
var output_type = request.input_type;
var data = request.data;
var meta = request.meta || {};
var req_url = param.url || "";
var req_method = param.method || "GET";
var req_headers = param.headers || {};
var req_body_type = param.body_type || "json";
var resp_encode = param.encoding || "text";
var env = {
'type' : output_type,
'data' : data,
'meta' : meta
}
req_url = Utils.vm_execute_text(env,req_url);
//parsing param from meta
if(typeof meta._param == 'object')
{
var _prm = meta._param;
req_url = (_prm.url)?_prm.url:req_url;
req_method = (_prm.method)?_prm.method:req_method;
req_headers = (_prm.headers)?_prm.headers:req_headers;
req_body_type = (_prm.body_type)?_prm.body_type:req_body_type;
resp_encode = (_prm.encoding)?_prm.encoding:resp_encode;
}
send_request({'url':req_url,
'method':req_method,
'headers':req_headers,
'body_type':req_body_type,
'body':data,
'resp_encode':resp_encode},function(err,resp,body){
var respmeta = meta;
Object.keys(respmeta).forEach((k)=>{
if(k.startsWith('_')){delete respmeta[k];}
});
respmeta['_status_code'] = (err)?0:resp.statusCode;
respmeta['_error'] = (err)?true:false;
response.meta = respmeta;
if(!err){
if(resp_encode=='json'){
try{
var j = JSON.parse(body);
response.success(j,output_type);
}catch(err){
response.success({},output_type);
}
}else{
response.success(body,output_type);
}
}else{
response.success(null,output_type);
}
});
//response.success();
//response.reject();
//response.error("error message")
}
function send_request(prm,cb)
{
var options = { method: 'GET',
url: prm.url,
headers:
{ 'cache-control': 'no-cache' }
};
if(prm.method.toLowerCase()=='post' || prm.method.toLowerCase()=='put')
{
options.method = prm.method.toUpperCase();
if(prm.body_type=='json' && typeof prm.body == 'object'){
options.headers['content-type'] = 'application/json';
options.json = prm.body;
}else if(prm.body_type=='text' || typeof prm.body == 'string'){
options.headers['content-type'] = 'text/plain';
options.body = prm.body;
}else{
options.body = prm.body;
}
}
if(typeof prm.headers == 'object')
{
options.headers = Object.assign(options.headers,prm.headers)
}
options.encoding = (prm.resp_encode == 'binary')?null:'utf8';
request(options, function (err, resp, body) {
if (!err) {
cb(err, resp, body);
}else{
cb(new Error("request error"));
}
});
}
module.exports = perform_function;
var util = require('util');
var DTPlugin = require('../dt-plugin');
function DTTask(context,request){
DTPlugin.call(this,context,request);
this.name = "pgpcrypt";
this.output_type = "";
}
util.inherits(DTTask,DTPlugin);
DTTask.prototype.perform = require('./perform');
module.exports = DTTask;
\ No newline at end of file
{
"name": "dt-pgpcrypt",
"version": "1.0.0",
"description": "",
"main": "index.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
},
"author": "",
"license": "ISC",
"dependencies": {
"openpgp": "^4.10.4"
}
}
var ctx = require('../../../context');
var Utils = ctx.getLib('lib/util/plugin-utils');
var pgplib = require('./pgp');
var path = require('path');
var fs = require('fs');
function perform_function(context,request,response){
var job_id = context.jobconfig.job_id;
var transaction_id = context.transaction.id;
var param = context.task.config.param || {};
var memstore = context.task.memstore
var output_type = request.input_type;
var data = request.data;
var meta = request.meta || {};
var req_function = param.function || "decrypt"
var req_publickey = param.publickey || ""
var req_privatekey = param.privatekey || ""
var req_passphrase = param.passphrase || ""
var req_output = param.output|| "binary"
var key_dir = ctx.getConfig('keystore.dir','./keys');
var fn_load_key = function (name) {
var k = "";
var fp = path.join(key_dir ,path.basename(name));
try{
k = fs.readFileSync(fp).toString('utf8');
}catch(e){
}
return k;
}
var env = {
'type' : output_type,
'data' : data,
'meta' : meta,
'_fn' : {
'load_key' : fn_load_key
}
}
req_publickey = Utils.vm_execute_text(env,req_publickey);
req_privatekey = Utils.vm_execute_text(env,req_privatekey);
req_passphrase = Utils.vm_execute_text(env,req_passphrase);
req_output = Utils.vm_execute_text(env,req_output);
//parsing param from meta
if(typeof meta._param == 'object')
{
var _prm = meta._param;
req_function = (_prm.function)?_prm.function:req_function;
req_publickey = (_prm.publickey)?_prm.publickey:req_publickey;
req_privatekey = (_prm.privatekey)?_prm.privatekey:req_privatekey;
req_passphrase = (_prm.passphrase)?_prm.passphrase:req_passphrase;
req_output = (_prm.output)?_prm.output:req_output;
}
if (['decrypt','dec'].indexOf(req_function) >= 0){
pgplib.decrypt({
private_key : req_privatekey,
passphrase : req_passphrase,
armor_in : (typeof data == 'string'),
data : data
}).then(d => {
var dout = d;
if(['utf8','text'].indexOf(req_output) >= 0){
dout = d.toString('utf8');
}else if(req_output == 'base64'){
dout = d.toString('base64');
}
ok_out(dout);
}).catch(e =>{
error_out('decrypt error');
})
} else if (['encrypt','enc'].indexOf(req_function) >= 0){
pgplib.encrypt({
public_key : req_publickey,
armor_out : (['armor','text'].indexOf(req_output) >= 0),
data : data
}).then(d => {
ok_out(d);
}).catch(e =>{
error_out('decrypt error');
})
} else {
error_out('invalid function')
}
function ok_out (out)
{
response.meta = meta;
response.success(out,output_type);
}
function error_out (msg)
{
response.error(msg);
}
//response.success();
//response.reject();
//response.error("error message")
}
module.exports = perform_function;
const openpgp = require('openpgp');
async function encrypt (opt) {
var publicKeyArmored = opt.public_key;
var datain = opt.data;
var armor = (opt.armor_out)?true:false;
var enc = await openpgp.encrypt({
message: openpgp.message.fromBinary(datain),
publicKeys: (await openpgp.key.readArmored(publicKeyArmored)).keys,
armor: armor
});
if(!armor){
enc = Buffer.from(enc.message.packets.write());
}
return enc;
}
async function decrypt (opt) {
var privateKeyArmored = opt.private_key;
var passphrase = opt.passphrase ;
var datain = opt.data;
var armor = (opt.armor_in)?true:false;
var { keys: [privateKey] } = await openpgp.key.readArmored(privateKeyArmored);
await privateKey.decrypt(passphrase);
var msg = null;
if(armor){
msg = await openpgp.message.readArmored(datain);
}else{
msg = await openpgp.message.read(datain);
}
const { data: decrypted } = await openpgp.decrypt({
message: msg,
privateKeys: [privateKey],
format:'binary'
});
return Buffer.from(decrypted);
}
module.exports.encrypt = encrypt;
module.exports.decrypt = decrypt;
\ No newline at end of file
var crypto = require("crypto");
var publicEncrypt = function(data, publicKey) {
var buffer = Buffer.from(data);
var encrypted = crypto.publicEncrypt(publicKey, buffer);
return encrypted.toString("base64");
};
var privateDecrypt = function(crypted_data, privateKey) {
var buffer = Buffer.from(crypted_data, "base64");
var decrypted = crypto.privateDecrypt(privateKey, buffer);
return decrypted.toString("utf8");
};
module.exports = {
publicEncrypt: publicEncrypt,
privateDecrypt: privateDecrypt
}
\ No newline at end of file
var DomParser = require('dom-parser');
var parser = new DomParser();
module.exports = function (html) {
return parser.parseFromString(html);
}
\ No newline at end of file
var crypto = require('crypto');
module.exports.sha256 = function (text) {
var dat = (typeof text == 'string')?text:String(text);
return crypto.createHash('sha256').update(dat).digest('hex');
}
\ No newline at end of file
{
"name": "dt-transform",
"version": "1.0.0",
"description": "",
"main": "index.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
},
"author": "",
"license": "ISC",
"dependencies": {
"dom-parser": "^0.1.6"
}
}
var vm = require('vm');
var ctx = require('../context');
var Utils = ctx.getLib('lib/util/plugin-utils');
var Register = ctx.getLib('jobworker/lib/mems-register');
function perform_function(context,request,response){
var job_id = context.jobconfig.job_id;
var transaction_id = context.transaction.id;
var param = context.task.config.param;
var param = context.task.config.param || {};
var memstore = context.task.memstore
var in_type = request.input_type;
var in_data = request.data;
var in_meta = request.meta;
var mapscr = Utils.parse_script_param(param.script);
var datascr = param.text ;
var ba64script = param.ba64script;
if(datascr){
mapscr = mapscr + "; data=`" + datascr + "`";
}
var in_meta = request.meta || {};
var mapenv = {
'src' : {
'type' : in_type,
......@@ -27,10 +20,65 @@ function perform_function(context,request,response){
'meta' : in_meta
},
'_env':{},
'_fn':{},
'type' : in_type,
'data' : in_data,
'meta' : in_meta
}
if(param.use_function){
var fns = (Array.isArray(param.use_function))?param.use_function:Array.of(param.use_function);
fns.forEach((fname)=>{
if(typeof fname == 'string' && fname.length>0){
mapenv._fn[fname] = _loadfunc(fname);
}
});
}
if(param.use_register){
memstore.getItem('register',function(err,value){
if(err){return response.error("memstore error");}
mapenv.register = Register.create(value);
mapenv = _compile(mapenv,param);
memstore.setItem('register',mapenv.register.get(),function(err){
_response();
});
});
}else{
mapenv = _compile(mapenv,param);
_response();
}
function _response()
{
var data = mapenv.data;
var meta = mapenv.meta;
var output_type = mapenv.type;
if(param.to_binary && typeof data == 'string'){
data = Buffer.from(data, 'base64');
output_type = 'binary';
}
response.success(data,{'meta':meta,'output_type':output_type});
}
}
function _compile(mape,param)
{
var mapenv = mape;
var mapscr = Utils.parse_script_param(param.script);
var datascr = param.text ;
var ba64script = param.ba64script;
if(datascr){
mapscr = mapscr + "; data=`" + datascr + "`";
}
var context = new vm.createContext(mapenv);
......@@ -43,18 +91,18 @@ function perform_function(context,request,response){
var script = new vm.Script(mapscr);
script.runInContext(context);
var data = mapenv.data;
var meta = mapenv.meta;
var output_type = mapenv.type;
return mapenv;
}
if(param.to_binary && typeof data == 'string'){
data = Buffer.from(data, 'base64');
output_type = 'binary';
function _loadfunc(name)
{
var f = null;
try {
f = require('./fn/' + name);
} catch (error) {
}
response.success(data,{'meta':meta,'output_type':output_type});
return f;
}
module.exports = perform_function;
......@@ -6,9 +6,9 @@ function avg_point(prm,cb)
var bg = prm.bg;
var fg = prm.fg;
var dpoint = prm.point;
var radius = prm.radius || 10;
var bg_threshold = prm.bg_threshold || 20;
var mapping_threshold = prm.mapping_threshold || 128;
var radius = Number(prm.radius) || 10;
var bg_threshold = Number(prm.bg_threshold) || 20;
var mapping_threshold = Number(prm.mapping_threshold) || 128;
var table = prm.table || [];
async.waterfall([
......@@ -128,7 +128,7 @@ function mapping(map_table,color)
if(itm.color && itm.value){
var d = distance(itm.color,color)
if(d<out.distance){
out.value = itm.value;
out.value = Number(itm.value);
out.distance = d;
}
}
......@@ -162,7 +162,7 @@ function weight(x, y, cx, cy,radius) {
function distance(a,b)
{
sum = 0;
sum = Math.pow(a[0] - b[0],2) + Math.pow(a[1] - b[1],2) + Math.pow(a[2] - b[2],2)
sum = Math.pow(Number(a[0]) - Number(b[0]),2) + Math.pow(Number(a[1]) - Number(b[1]),2) + Math.pow(Number(a[2]) - Number(b[2]),2)
return Math.sqrt(sum)
}
......
var ctx = require('./context');
var BSCONFIG = ctx.getConfig();
var ControllerAPI = ctx.getLib('coreservice/controller-api');
var api = ControllerAPI.create(ctx.config);
var api = ControllerAPI.create(BSCONFIG);
api.start();
var ctx = require('./context');
var BSCONFIG = ctx.getConfig();
var SchedulerService = ctx.getLib('coreservice/scheduler');
var StorageEventService = ctx.getLib('coreservice/storage-trigger');
var ss = SchedulerService.create(ctx.config);
var ss = SchedulerService.create(BSCONFIG);
ss.start();
var ses = StorageEventService.create(ctx.config);
var ses = StorageEventService.create(BSCONFIG);
ses.start();
var ctx = require('./context');
var HTTPListener = ctx.getLib('http-listener/main');
var BSCONFIG = ctx.getConfig();
var hs = HTTPListener.create(ctx.config);
var hs = HTTPListener.create(BSCONFIG);
hs.start();
var ctx = require('./context');
var NBUdpTrigger = ctx.getLib('triggers/trg-nbudp');
var BSCONFIG = ctx.getConfig();
var trg = NBUdpTrigger.create(ctx.config);
var trg = NBUdpTrigger.create(BSCONFIG);
trg.start();
var ctx = require('./context');
var SchedulerService = ctx.getLib('coreservice/scheduler');
var BSCONFIG = ctx.getConfig();
var ss = SchedulerService.create(ctx.config);
var ss = SchedulerService.create(BSCONFIG);
ss.start();
var ctx = require('./context');
var StorageService = ctx.getLib('storage-service/main');
var BSCONFIG = ctx.getConfig();
var argv = require('minimist')(process.argv.slice(2));
......@@ -8,10 +8,10 @@ var argv = require('minimist')(process.argv.slice(2));
var m = {'read':"false",'write':"false"}
if(argv['process-read']){m.read = "true";}
if(argv['process-write']){m.write = "true";}
if(argv['api-port']){ctx.config.storage.api_port = Number(argv['api-port']);}
if(argv['api-port']){BSCONFIG.storage.api_port = Number(argv['api-port']);}
//console.log(argv);
var ss = StorageService.create(ctx.config);
var ss = StorageService.create(BSCONFIG);
if(m.read=="false" && m.write=="false")
{
......@@ -20,6 +20,7 @@ if(m.read=="false" && m.write=="false")
if(m.read=="true"){ss.http_start();}
if(m.write=="true"){
ss.amqp_start();
ss.ipc_start();
//Disable IPC Feature
//ss.ipc_start();
}
}
......@@ -4,6 +4,7 @@ var ctx = require('../../context');
var BinStream = ctx.getLib('lib/bss/binarystream_v1_1');
var ObjId = ctx.getLib('lib/bss/objid');
var bsdata = ctx.getLib('lib/model/bsdata');
var thunky = require('thunky');
var importer = require('./importer');
var dataevent = require('./dataevent');
......@@ -21,22 +22,53 @@ module.exports.create = function(prm)
function BSSEngine(prm)
{
var self = this;
if(typeof prm == 'string'){
prm = {'file':prm,'context':null};
}
// this.repos_dir = prm.repos_dir;
// this.name = prm.name;
this.file = prm.file;
this.name = prm.name;
this.context = (prm.context)?prm.context:null;
this.concurrent = 0;
this.serial=prm.serial||'';
this.outdate=false;
this.bss=null;
this.open = thunky(openbss);
this.open();
function openbss (cb) {
if(fs.existsSync(self.file))
{
open()
}else{
BinStream.format(self.file,function(err){
if(!err){
open()
}else{
cb("format error")
}
});
}
function open(){
BinStream.open(self.file,function(err,bss){
if(!err){
self.bss = bss;
}
cb(err,bss);
});
}
}
}
BSSEngine.prototype.filepath = function()
{
//return this.repos_dir + '/' + name2path(this.name) + '.bss';
return this.file;
}
......@@ -46,48 +78,21 @@ BSSEngine.prototype.exists = function()
return fs.existsSync(fp);
}
BSSEngine.prototype.open = function(cb)
{
var self = this;
if(self.exists())
{
open()
}else{
BinStream.format(self.filepath(),function(err){
if(!err){
open()
}else{
cb("format error")
}
});
}
function open(){
BinStream.open(self.filepath(),function(err,bss){
if(!err){
self.bss = bss;
}
cb(err);
});
}
}
BSSEngine.prototype.close = function(cb)
{
this.bss.close(cb);
var self = this;
self.open((err,bss)=>{
bss.close(cb);
});
}
BSSEngine.prototype.cmd = function(cmd,cb)
{
var self = this
var command = cmd.command;
var param = cmd.param;
var self=this;
switch (command) {
case 'write':
self.cmd_write(param,cb);
......@@ -105,25 +110,29 @@ BSSEngine.prototype.cmd_write = function(prm,cb)
if(!data){return cb("null data")}
this.bss.write(data,{'meta':meta},function(err,obj){
if(!err){
var head = obj.getHeader();
var obj_id = new ObjId(head.ID);
var resp = {
'resource_id' : obj_id.toString(),
'storage_name' : self.name
}
//dataevent.newdata({'resourceId':obj_id.toString(),'storageId':self.name});
if(self.context){
newdata_event(self.context,{'resourceId':obj_id.toString(),'storageId':self.name});
self.open((err,bss)=>{
bss.write(data,{'meta':meta},function(err,obj){
if(!err){
var head = obj.getHeader();
var obj_id = new ObjId(head.ID);
var resp = {
'resource_id' : obj_id.toString(),
'storage_name' : self.name
}
//dataevent.newdata({'resourceId':obj_id.toString(),'storageId':self.name});
if(self.context){
newdata_event(self.context,{'resourceId':obj_id.toString(),'storageId':self.name});
}
cb(null,resp);
}else {
cb("write error");
}
});
cb(null,resp);
}else {
cb("write error");
}
});
}
function newdata_event(ctx,prm)
......
......@@ -12,7 +12,7 @@ function BSSPool(prm)
BSSPool.prototype.get = function(name,opt,cb)
{
if(!cb){cb=opt;opt={};}
var self=this;
var filepath = this.repos_dir + '/' + name2path(name) + '.bss'
var bssname = name;
......@@ -32,18 +32,14 @@ BSSPool.prototype.get = function(name,opt,cb)
'name' : bssname,
'newInstance':opt.newInstance
});
bss_engine.open(function(err){
if(!err){
self.pool.push({
'name' : name,
'engine':bss_engine
});
}
self.clean(function(err){
cb(err,bss_engine);
});
self.pool.push({
'name' : name,
'engine':bss_engine
});
self.clean(function(err){
cb(err,bss_engine);
});
}
}
......
......@@ -29,7 +29,7 @@ module.exports.newdata = function(prm,cb){
var msg = JSON.stringify(objMsg);
ch.assertExchange(ex, 'topic', {durable: false});
ch.publish(ex, key, new Buffer(msg));
ch.publish(ex, key, Buffer.from(msg));
//console.log("[AMQP] Sent %s:'%s'", key, msg);
}
});
......
var ctx = require('../context');
var ConnCtx = ctx.getLib('lib/conn/connection-context');
var rpcserver = ctx.getLib('lib/amqp/rpcserver');
var RPCCaller = ctx.getLib('lib/amqp/rpccaller');
var SSServer = ctx.getLib('lib/axon/rpcserver');
var Db = ctx.getLib('storage-service/lib/db');
var WorkerPool = ctx.getLib('storage-service/lib/worker_pool');
......@@ -16,7 +17,7 @@ var app = express();
var bodyParser = require('body-parser');
var EventPub = ctx.getLib('lib/amqp/event-pub');
var cfg = ctx.config;
//var cfg = ctx.config;
var SS_LISTEN = ctx.getUnixSocketUrl('ss.sock');
var SS_URL = ctx.getUnixSocketUrl('ss.sock');
......@@ -52,7 +53,10 @@ SS.prototype.start = function()
{
console.log('Starting Storage Service ...\n');
this.amqp_start();
this.ipc_start();
/* Disable RPC Feature */
//this.ipc_start();
setTimeout(function(){
this.http_start();
},5000);
......@@ -142,7 +146,11 @@ SS.prototype.http_start = function()
}));
var context = ctx.getLib('lib/ws/http-context');
this.storagecaller = new SSCaller({'url':SS_URL});
//this.storagecaller = new SSCaller({'url':SS_URL});
this.storagecaller = new RPCCaller({
url : amqp_cfg.url,
name :'storage_request'
});
this.acl_validator = ACLValidator.create(auth_cfg);
this.worker_pool.initWorker();
app.use(context.middleware({
......
......@@ -36,11 +36,28 @@ router.get('/:id/data',function (req, res) {
var opt = {
'field' : 'data'
}
opt.filetype = (query.filetype)?query.filetype:null
opt.filetype = (query.file_type || query.filetype)?query.file_type || query.filetype:null;
get_object(reqHelper,respHelper,{'oid':oid,'opt':opt});
});
router.get('/:id/file',function (req, res) {
var reqHelper = request.create(req);
var respHelper = response.create(res);
var query = reqHelper.getQuery();
var oid = req.params.id;
var opt = {
'field' : 'file'
}
opt.filetype = (query.file_type || query.filetype)?query.file_type || query.filetype:null;
opt.filename = (query.file_name || query.filename)?query.file_name || query.filename:null;
opt.download = (query.download)?true:null;
get_object(reqHelper,respHelper,{'oid':oid,'opt':opt});
});
function oid_parse(oid,caller,cb)
{
var ret = {'valid':true}
......@@ -177,6 +194,9 @@ function output(resp,obj,opt)
if(opt.field=='data')
{
data_out(resp,obj,opt);
}else if(opt.field=='file')
{
file_out(resp,obj,opt);
}else{
obj_out(resp,obj,opt);
}
......@@ -218,5 +238,39 @@ function data_out(resp,obj,opt)
}
function file_out(resp,obj,opt)
{
var objType = obj.header.TY;
var objId = (new ObjId(obj.header.ID)).toString();
var meta = obj.meta || {};
var defName=null;
var defType=null;
if(objType == BinStream.BINARY_TYPE){
defType = "application/octet-stream";
defName = (opt.filetype)?objId + "." + opt.filetype:objId + ".out";
}else if(objType == BinStream.STRING_TYPE){
defType = "text";
defName = (opt.filetype)?objId + "." + opt.filetype:objId + ".out";
}else{
defType = "json";
defName = (opt.filetype)?objId + "." + opt.filetype:objId + ".json";
}
var file_name = opt.filename || meta.file_name || defName;
var file_type = opt.filetype || meta.file_type || defType;
resp.response.type(file_type);
if(opt.download){
resp.response.set('Content-Disposition', 'attachment; filename="' + file_name + '"');
}else{
resp.response.set('Content-Disposition', 'filename="' + file_name + '"');
}
resp.response.send(obj.data);
}
module.exports = router;
......@@ -11,7 +11,7 @@ var ctx = require('../context');
//
// console.log(sel.data);
var uuid = require('node-uuid');
// var uuid = require('node-uuid');
// console.log(uuid.v1());
//
......
var ctx = require('../context');
var async = require('async');
var amqp_cfg = ctx.config.amqp;
var RPCCaller = ctx.getLib('lib/amqp/rpccaller');
var caller = new RPCCaller({
url : amqp_cfg.url,
name :'storage_request'
name :'test_request'
});
var req = {
'object_type' : 'storage_request',
'object_type' : 'test_request',
'command' : 'write',
'param' : {
'storage_name' : 'gcs.file.test',
......@@ -24,10 +25,29 @@ var req = {
}
}
caller.call(req,function(err,resp){
console.log(resp);
});
var idx = 0;
async.whilst(
function() { return idx < 2000; },
function(callback) {
req.id = idx;
caller.call(req,function(err,resp){
if(idx%100==0){
console.log(resp)
}
idx++;
callback(null);
});
},
function (err) {
if(!err){
console.log('finish')
}else{
console.log(err);
}
}
);
......
......@@ -5,26 +5,14 @@ var rpcserver = ctx.getLib('lib/amqp/rpcserver');
var server = new rpcserver({
url : amqp_cfg.url,
name : 'test_request'
});
server.set_remote_function(function(req,callback){
var n = parseInt(req.t);
console.log('REQUEST ' + req);
setTimeout(function(){
callback(null,{'time':n,'data':req.d});
},n);
callback(null,{'cmd':req.command,'id':req.id});
})
server.start(function(err){
console.log('server start');
})
var http = require('http');
http.createServer(function (req, res) {
res.writeHead(200, {
'Content-Type': 'text/plain; charset=UTF-8'
});
console.log(req.body);
res.end("req");
}).listen(9080, "");
var thunky = require('thunky')
var _self = null;
function TestThunk()
{
_self = this;
this.rnumber = 0;
}
TestThunk.prototype.init = function()
{
console.log('waiting 1s and returning random number');
}
TestThunk.prototype.open = thunky(function (callback) { // the inner function should only accept a callback
_self.init();
setTimeout(function () {
var ran = Math.random();
_self.rnumber = ran;
callback(null,ran)
}, 1000)
})
TestThunk.prototype.test = function(x){
_self.open((err,num)=>{
console.log(_self.rnumber + ' ' + x)
});
}
var tt = new TestThunk();
tt.test(1);
tt.test(2);
tt.test(3);
tt.test(4);
tt.test(5);
\ No newline at end of file
{
"version":"1.2.2",
"build":"201902121700"
"version":"1.2.3",
"build":"202007171500"
}
\ No newline at end of file
var ctx = require('./context');
var JobWorker = ctx.getLib('jobworker/worker');
var BSCONFIG = ctx.getConfig();
var worker = JobWorker.create({'config':ctx.config,'name':'worker'});
var worker = JobWorker.create({'config':BSCONFIG,'name':'worker'});
worker.start();
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment