Commit c2bcc0e6 authored by Kamron Aroonrua's avatar Kamron Aroonrua 💬

bs1.2

parent aded08c8
#Changelog
## [UR] - 2018-04-xx
## [UR] - 2018-06-xx
### Added
- BS :: tokenization & access control
- SS :: REST API (v1.2ur) new object api
- SS :: object indexing
### Fixed
### Changed
- SS :: object address api 1.2 [ <storage_name>$obj_id ]
### Removed
## [UR] - 2018-05-xx
### Added
- SS :: Ipc Channel
- SS :: REST API (v1.1ur) method PUT
- SS :: REST API (v1.1ur) method PUT DELETE
- PLUGIN :: do-storage with ipc channel supported
### Fixed
- WORKER :: active/unactive job bugs
......
module.exports = {
'amqp' : require('./amqp.json'),
'memstore' : require('./memstore.json'),
'storage' : require('./storage.json')
'storage' : require('./storage.json'),
'auth' : {
'secret': require('./secret.json'),
'acl' : require('./acl.json')
}
}
[
{"permit":true}
]
\ No newline at end of file
{
"type" : "rabbitmq",
"url" : "amqp://rabbitmq-server"
}
}
\ No newline at end of file
{
"type" : "redis",
"url" : "redis://redis-server:6379/1"
}
}
\ No newline at end of file
{
"type":"text",
"value":"bigstream-server"
}
\ No newline at end of file
{
"api_hostname" : "http://bigstream-server:19080",
"repository" : "/var/bigstream/data"
}
}
\ No newline at end of file
......@@ -5,6 +5,11 @@ var app = express();
var bodyParser = require('body-parser');
var ConnCtx = ctx.getLib('lib/conn/connection-context');
var Tokenizer = ctx.getLib('lib/auth/tokenizer');
var ACLValidator = ctx.getLib('lib/auth/acl-validator');
var jwt = require('express-jwt');
var JobManager = require('./lib/job-manager');
var TriggerManager = require('./lib/trigger-manager')
......@@ -32,19 +37,30 @@ ControllerAPI.prototype.start = function()
ControllerAPI.prototype._http_start = function()
{
var self = this;
var auth_cfg = this.config.auth;
app.use(bodyParser.json({limit: '128mb'}));
app.use(bodyParser.json({limit: '64mb'}));
app.use(bodyParser.urlencoded({
extended: true
}));
var context = ctx.getLib('lib/ws/http-context');
this.acl_validator = ACLValidator.create(auth_cfg);
app.use(context.middleware({
'conn' : self.conn,
'acl_validator':self.acl_validator,
'jobManager' : JobManager.create({'conn' : self.conn}),
'triggerManager' : TriggerManager.create({'conn' : self.conn})
}));
var tokenizer = Tokenizer.create(auth_cfg);
app.use(tokenizer.middleware());
app.use(function (err, req, res, next) {
if (err.name === 'UnauthorizedError') {
res.status(401).send('invalid token');
}
});
app.use(require('./ws'));
app.listen(API_PORT, function () {
......
......@@ -2,5 +2,6 @@ var express = require('express')
, router = express.Router()
router.use('/v1',require('./v1'));
router.use('/v1.2',require('./v1.2'));
module.exports = router;
var express = require('express');
var router = express.Router();
router.use('/jobs',require('./ws-jobs'));
module.exports = router;
var ctx = require('../../../context');
var express = require('express');
var router = express.Router();
var cfg = ctx.config;
var response = ctx.getLib('lib/ws/response');
var request = ctx.getLib('lib/ws/request');
var Tokenizer = ctx.getLib('lib/auth/tokenizer');
const ACL_SERVICE_NAME = "job";
router.get('/',function (req, res) {
var reqHelper = request.create(req);
var respHelper = response.create(res);
var jm = req.context.jobManager;
var acl_validator = req.context.acl_validator;
var tInfo = Tokenizer.info(req.auth);
var result=[];
jm.listJob({},function (err,jobs){
jobs.forEach(job => {
var acp = acl_validator.isAccept(tInfo.acl,{
"vo":tInfo,"service":ACL_SERVICE_NAME,"resource":job,"mode":"l"
});
if(acp){
result.push(job);
}
});
respHelper.responseOK(result);
});
});
router.get('/:jid',function (req, res) {
var reqHelper = request.create(req);
var respHelper = response.create(res);
var jid = req.params.jid;
var jm = req.context.jobManager;
var acl_validator = req.context.acl_validator;
var tInfo = Tokenizer.info(req.auth);
var acp = acl_validator.isAccept(tInfo.acl,{
"vo":tInfo.vo,"service":ACL_SERVICE_NAME,"resource":jid,"mode":"r"
});
if(!acp){
return respHelper.response401();
}
jm.getJob({'job_id':jid},function (err,jobs){
if(jobs)
{
respHelper.responseOK(jobs);
}else{
respHelper.response404('Not found');
}
})
});
router.delete('/:jid',function (req, res) {
var reqHelper = request.create(req);
var respHelper = response.create(res);
var q = reqHelper.getQuery();
var jid = req.params.jid;
var jm = req.context.jobManager;
var tm = req.context.triggerManager;
var acl_validator = req.context.acl_validator;
var tInfo = Tokenizer.info(req.auth);
var acp = acl_validator.isAccept(tInfo.acl,{
"vo":tInfo.vo,"service":ACL_SERVICE_NAME,"resource":jid,"mode":"w"
});
if(!acp){
return respHelper.response401();
}
jm.deleteJob({'job_id':jid},function(err){
if(q.reload){
tm.reload();
}
respHelper.response200();
});
});
router.post('/',function (req, res) {
var reqHelper = request.create(req);
var respHelper = response.create(res);
var q = reqHelper.getQuery();
var jm = req.context.jobManager;
var tm = req.context.triggerManager;
var acl_validator = req.context.acl_validator;
var tInfo = Tokenizer.info(req.auth);
var json_job = req.body || {};
var jid = json_job.job_id || "";
var acp = acl_validator.isAccept(tInfo.acl,{
"vo":tInfo.vo,"service":ACL_SERVICE_NAME,"resource":jid,"mode":"w"
});
if(!acp){
return respHelper.response401();
}
jm.setJob({'job':json_job},function(err,res){
if(err)
{
respHelper.response400(err);
}else{
if(q.reload){
tm.reload();
}
respHelper.response201();
}
});
});
router.post('/action',function (req, res) {
var reqHelper = request.create(req);
var respHelper = response.create(res);
var q = reqHelper.getQuery();
var jm = req.context.jobManager;
var tm = req.context.triggerManager;
var acl_validator = req.context.acl_validator;
var tInfo = Tokenizer.info(req.auth);
var acp = acl_validator.isAccept(tInfo.acl,{
"vo":tInfo.vo,"service":ACL_SERVICE_NAME + '.action',"resource":"","mode":"x"
});
if(!acp){
return respHelper.response401();
}
var action = req.body;
jm.action({'action':action},function(err){
if(err)
{
respHelper.response400(err.message);
}else{
if(q.reload){
tm.reload();
}
respHelper.response201();
}
});
});
module.exports = router;
var mmatch = require("minimatch");
module.exports.create = function(prm)
{
return new ACLValidator(prm);
}
function ACLValidator(prm)
{
this.acl = prm.acl;
if(!Array.isArray(this.acl)){
this.acl=[];
}
}
ACLValidator.prototype.appendACL = function(aacl)
{
if(Array.isArray(aacl))
{
this.acl.concat(aacl);
}
}
ACLValidator.prototype.isAccept = function(tkacl,prm)
{
var ret=true;
var chk = {
"vo":prm.vo||"",
"service":prm.service||"",
"resource":prm.resource||"",
"mode":prm.mode||""
}
var the_acl = this.acl;
if(Array.isArray(tkacl))
{
the_acl = this.acl.concat(tkacl);
}
the_acl.forEach((rule) => {
if(rulematch(chk,rule) && typeof rule.accept == 'boolean'){
ret = rule.accept;
}
});
function rulematch(chk,rule)
{
var m_vo = (!rule.vo)?true:mmatch(chk.vo,rule.vo);
var m_service = (!rule.service)?true:mmatch(chk.service,rule.service);
var m_resource = (!rule.resource)?true:mmatch(chk.resource,rule.resource);
var m_mode = (!rule.mode)?true:mmatch(chk.mode,rule.mode);
return m_vo && m_service && m_resource && m_mode;
}
return ret;
}
var jwt = require('express-jwt');
module.exports.create = function(cfg)
{
return new Tokenizer(cfg);
}
module.exports.info = function(tok)
{
var ret = {}
if(!tok){
ret = {"vo":"",acl:""}
}else{
ret = {
"vo":tok.vo || "",
"acl":tok.acl || []
}
}
return ret;
}
function Tokenizer(cfg)
{
//this.config = cfg;
this.cfg_auth = cfg
}
Tokenizer.prototype.getJWTSecret = function()
{
if(process.env.BS_SECRET)
{
return {'type':'env','value':process.env.BS_SECRET};
}else{
return this.cfg_auth.secret;
}
}
Tokenizer.prototype.middleware = function()
{
var self = this;
var jt = jwt({
secret: self.getJWTSecret().value,
requestProperty: 'auth',
credentialsRequired: false,
getToken: function fromHeaderOrQuerystring (req) {
if (req.headers.authorization && req.headers.authorization.split(' ')[0] === 'Bearer') {
return req.headers.authorization.split(' ')[1];
} else if (req.query && req.query.token) {
return req.query.token;
}
return null;
}
});
return jt;
}
\ No newline at end of file
......@@ -76,6 +76,14 @@ responseHelper.prototype.response400 = function(msg){
}
}
responseHelper.prototype.response401 = function(msg){
if(msg){
this.response.status(401).json({response:'ERROR',message:msg});
}else{
this.response.status(401).send('unauthorized');
}
}
responseHelper.prototype.response403 = function(msg){
if(msg){
this.response.status(403).json({response:'ERROR',message:msg});
......
......@@ -20,7 +20,10 @@
"buffalo": "^0.1.3",
"dateformat": "^1.0.12",
"express": "^4.14.0",
"express-jwt": "^5.3.1",
"ioredis": "^2.5.0",
"jsonwebtoken": "^8.2.2",
"minimatch": "^3.0.4",
"minimist": "^1.2.0",
"moment": "^2.17.1",
"node-gyp": "^3.6.2",
......
......@@ -39,7 +39,7 @@ function perform_function(context,request,response){
if(meta && typeof meta == 'object')
{
Object.keys(meta).forEach((item)=>{
if(!item.startsWith('_')){
if(!item.startsWith('_') || item=='_key'){
dc_meta[item] = meta[item];
}
});
......
var fs = require('fs');
var BSSPool = require('./bsspool');
var StorageIndexstore = require('./storage-indexstore')
module.exports.create = function(cfg){
return new Db(cfg);
......@@ -9,8 +10,11 @@ function Db(cfg)
{
this.repos_dir = cfg.repos_dir;
this.context = cfg.context;
this.mem = cfg.redis;
this.idxstore = new StorageIndexstore({'mem':this.mem });
this.bsspool = new BSSPool({'repos_dir':this.repos_dir,'context':this.context});
}
Db.prototype.request = function(req,cb)
......@@ -29,6 +33,10 @@ Db.prototype.request = function(req,cb)
var prm = req.param
this.bsscmd_del(prm,cb)
break;
case 'idxget':
var prm = req.param
this.cmd_idxget(prm,cb)
break;
default:
cb(null,result_error('invalid command'));
}
......@@ -36,11 +44,27 @@ Db.prototype.request = function(req,cb)
}
Db.prototype.cmd_idxget = function(prm,cb)
{
var self = this;
var storage_name = prm.storage_name;
var key = prm.key;
self.idxstore.getIndex(storage_name,key,(err,val)=>{
if(!err && val){
cb(null,result_ok({'found':true,'object_id':val}));
}else{
cb(null,result_ok({}))
}
});
}
Db.prototype.bsscmd_w = function(prm,cb)
{
var self = this;
var filepath = this.repos_dir + '/' + name2path(prm.storage_name) + '.bss';
var bssname = prm.storage_name;
var metadata = prm.meta;
var w_cmd = {
'command' : 'write',
'param' : {
......@@ -56,7 +80,9 @@ Db.prototype.bsscmd_w = function(prm,cb)
if(!err){
bss.cmd(w_cmd,function(err,resp){
if(!err){
cb(null,result_ok(resp));
indexing(metadata,resp.resource_id,(err)=>{
cb(null,result_ok(resp));
})
}else{
cb(null,result_error('write error'));
}
......@@ -65,6 +91,15 @@ Db.prototype.bsscmd_w = function(prm,cb)
cb(null,result_error('bss error'));
}
});
function indexing(meta,objid,cb)
{
if(meta && meta._key && typeof meta._key == 'string'){
self.idxstore.setIndex(bssname,meta._key,objid,cb);
}else{
cb(null);
}
}
}
Db.prototype.bsscmd_del = function(prm,cb)
......@@ -73,10 +108,11 @@ Db.prototype.bsscmd_del = function(prm,cb)
var filepath = this.repos_dir + '/' + name2path(prm.storage_name) + '.bss';
var bssname = prm.storage_name;
var engine = self.bsspool.detach(bssname);
if(engine){
engine.close((err)=>{
var bssI = self.bsspool.detach(bssname);
if(bssI){
bssI.engine.close((err)=>{
if(!err){
self.idxstore.flush(bssname);
unlink(filepath,cb);
}else{
cb(null,result_error('delete error'));
......@@ -101,7 +137,7 @@ Db.prototype.bsscmd_del = function(prm,cb)
});
}else{
callback(null,result_error('file not found'));
callback(null,result_error('storage not found'));
}
});
......@@ -112,9 +148,9 @@ function name2path(name){
return name.split('.').join('/');
}
function result_error(msg)
function result_error(msg,code)
{
return {'status':'ERR','msg':msg}
return {'status':'ERR','msg':msg,'code':code}
}
function result_ok(resp)
......
......@@ -128,10 +128,9 @@ function bss_end(code)
function obj_out(obj,opt){
var ret = {
"_id" : (new ObjId(obj.header.ID)).toString()
}
var ret = {}
if(opt.id){ret._id = (new ObjId(obj.header.ID)).toString()}
if(opt.meta){ret.meta = obj.meta;}
if(opt.data){
if(obj.header.TY==BinStream.BINARY_TYPE)
......@@ -143,5 +142,13 @@ function obj_out(obj,opt){
}
}
if(opt.field=='_id'){
ret = ret._id;
}else if(opt.field=='_meta'){
ret = ret.meta;
}else if(opt.field=='_data'){
ret = ret.data;
}
return ret
}
var redis = require('redis');
const PREFIX = 'bs:storage:index';
function indexstore(conf){
this.prefix = PREFIX ;
if(conf.mem){
this.mem = conf.mem;
}else if(conf.conn){
this.mem = redis.createClient(conf.conn);
}
}
indexstore.prototype.setIndex = function(storage,keyname,value,cb){
var key = this.prefix + ":" + storage;
this.mem.hset(key,keyname,value,cb);
}
indexstore.prototype.getIndex = function(storage,keyname,cb)
{
var key = this.prefix + ":" + storage;
this.mem.hget(key,keyname,function(err,v){
var value = null;
if(!err && v){
//value = JSON.parse(v);
value = v;
}
cb(err,value);
});
}
indexstore.prototype.flush = function(storage,cb)
{
var key = this.prefix + ":" + storage;
this.mem.del(key);
if(typeof cb == 'function'){
cb();
}
}
module.exports = indexstore;
var ctx = require('../context');
var ConnCtx = ctx.getLib('lib/conn/connection-context');
var rpcserver = ctx.getLib('lib/amqp/rpcserver');
var SSServer = ctx.getLib('lib/axon/rpcserver');
var Db = ctx.getLib('storage-service/lib/db');
var WorkerPool = ctx.getLib('storage-service/lib/worker_pool');
var SSCaller = ctx.getLib('lib/axon/rpccaller');
var Tokenizer = ctx.getLib('lib/auth/tokenizer');
var ACLValidator = ctx.getLib('lib/auth/acl-validator');
var jwt = require('express-jwt');
var express = require('express');
var app = express();
var bodyParser = require('body-parser');
......@@ -34,7 +40,10 @@ var SS = function StorageService(p_cfg)
'evp':new EventPub({'url':amqp_cfg.url,'name':'bs_storage'})
}
this.db = Db.create({'repos_dir':storage_cfg.repository,'context':this.context});
this.conn = ConnCtx.create(this.config);
this.mem = this.conn.getMemstore();
this.db = Db.create({'redis':this.mem,'repos_dir':storage_cfg.repository,'context':this.context});
this.worker_pool = WorkerPool.create({'size':2});
//this.storagecaller = new SSCaller({'url':SS_URL});
}
......@@ -123,6 +132,7 @@ SS.prototype.http_start = function()
{
var self = this;
var amqp_cfg = this.config.amqp;
var auth_cfg = this.config.auth;
var API_PORT = (this.config.storage.api_port)?this.config.storage.api_port:19080;
......@@ -133,14 +143,23 @@ SS.prototype.http_start = function()
var context = ctx.getLib('lib/ws/http-context');
this.storagecaller = new SSCaller({'url':SS_URL});
this.acl_validator = ACLValidator.create(auth_cfg);
this.worker_pool.initWorker();
app.use(context.middleware({
'acl_validator':self.acl_validator,
'worker_pool' : self.worker_pool,
'storagecaller':self.storagecaller
}));
app.use(require('./ws'));
var tokenizer = Tokenizer.create(auth_cfg);
app.use(tokenizer.middleware());
app.use(function (err, req, res, next) {
if (err.name === 'UnauthorizedError') {
res.status(401).send('invalid token');
}
});
app.use(require('./ws'));
app.listen(API_PORT, function () {
console.log('SS:DATA_API START\t\t[OK]');
......
......@@ -4,5 +4,6 @@ var express = require('express')
router.use('/v0.1',require('./v0.1'));
router.use('/v1',require('./v0.1'));
router.use('/v1.1',require('./v1.1'));
router.use('/v1.2',require('./v1.2'));
module.exports = router;
......@@ -53,9 +53,9 @@ router.put('/:id',function (req, res) {
if(meta && typeof meta == 'object')
{
Object.keys(meta).forEach((item)=>{
if(!item.startsWith('_')){
//if(!item.startsWith('_') || item=='_key'){
dc_meta[item] = meta[item];
}
//}
});
}
......
var express = require('express');
var router = express.Router();
router.use('/object',require('./service-object'));
router.use('/storage',require('./service-storage'));
module.exports = router;
var ctx = require('../../../context');
var express = require('express');
var router = express.Router();
var path = require('path');
var fs = require('fs');
var cfg = ctx.config;
var storage_cfg = cfg.storage
var response = ctx.getLib('lib/ws/response');
var request = ctx.getLib('lib/ws/request');
var BinStream = ctx.getLib('lib/bss/binarystream_v1_1');
var ObjId = ctx.getLib('lib/bss/objid');
var BSData = ctx.getLib('lib/model/bsdata');
var Tokenizer = ctx.getLib('lib/auth/tokenizer');
const ACL_SERVICE_NAME = "storage";
router.get('/:id',function (req, res) {
var reqHelper = request.create(req);
var respHelper = response.create(res);
var oid = req.params.id;
var opt = {}
get_object(reqHelper,respHelper,{'oid':oid,'opt':opt});
});
router.get('/:id/data',function (req, res) {
var reqHelper = request.create(req);
var respHelper = response.create(res);
var query = reqHelper.getQuery();
var oid = req.params.id;
var opt = {
'field' : 'data'
}
opt.filetype = (query.filetype)?query.filetype:null
get_object(reqHelper,respHelper,{'oid':oid,'opt':opt});
});
function oid_parse(oid,caller,cb)
{
var ret = {'valid':true}
if(!oid)
{
return cb(null,{'valid':false});
}
var storage_and_addr = oid.split('$');
ret.storage_name = storage_and_addr[0];
if(storage_and_addr.length == 1)
{
ret.by = "seq"
ret.seq = -1;
return cb(null,ret);
}else if(storage_and_addr.length==2){
var str_addr = storage_and_addr[1];
if(str_addr.startsWith('{') && str_addr.endsWith('}')){
ret.by = "key";
ret.key = str_addr.substr(1,str_addr.length-2);
if(ret.key.length<=0){
return cb(null,{'valid':false});
}
var callreq = {
'object_type' : 'storage_request',
'command' : 'idxget',
'param' : {
'storage_name' : ret.storage_name,
'key' : ret.key
}
}
caller.call(callreq,function(err,resp){
if(!err && resp.status=='OK' && resp.resp.found){
ret.by = "obj";
ret.obj_id = resp.resp.object_id;
ret.seq = (new ObjId(ret.obj_id)).extract().seq;
cb(null,ret);
}else{
cb(null,{'valid':false});
}
});
}else if(str_addr.startsWith('[') && str_addr.endsWith(']')){
ret.by = "seq";
var str_num = str_addr.substr(1,str_addr.length-2);
if(isNaN(str_num) || !Number.isInteger(Number(str_num))){
ret.valid = false;
}else{
ret.seq = Number(str_num);
}
return cb(null,ret);
}else{
ret.by = "obj";
ret.obj_id = storage_and_addr[1];
try{
var obj_id = new ObjId(ret.obj_id);
ret.seq = obj_id.extract().seq;
}catch(err){
ret.valid=false;
}
return cb(null,ret);
}
}else{
ret.valid=false;
return cb(null,ret);
}
}
function get_object(reqHelper,respHelper,prm)
{
prm=prm||{};
var oid = prm.oid;
var opt = prm.opt || {};
var storagecaller = reqHelper.request.context.storagecaller;
oid_parse(oid,storagecaller,(err,oid_result)=>{
if(!oid_result.valid){
respHelper.response404();
}else{
var acl_validator = reqHelper.request.context.acl_validator;
var tInfo = Tokenizer.info(reqHelper.request.auth);
var acp = acl_validator.isAccept(tInfo.acl,{
"vo":tInfo.vo,"service":ACL_SERVICE_NAME,"resource":oid_result.storage_name,"mode":"r"
});
if(!acp){
respHelper.response403();
}else{
var bss_full_path = storage_cfg.repository + "/" + oid_result.storage_name.split('.').join('/') + ".bss";
fs.exists(bss_full_path,function(exists){
if(exists){
BinStream.open(bss_full_path,function(err,bss){
var rd = bss.reader();
var rec_count = rd.count();
var seq = (oid_result.seq>=0)?oid_result.seq:rec_count + oid_result.seq + 1;
rd.objectAt(seq,function(err,obj){
bss.close(function(err){
if(obj && (oid_result.by == 'seq' || oid_result.obj_id == (new ObjId(obj.header.ID)).toString()) ){
output(respHelper,obj,opt);
}else{respHelper.response404();}
});
});
});
}else{
respHelper.response404();
}
});
}
}
});
}
function output(resp,obj,opt)
{
if(opt.field=='data')
{
data_out(resp,obj,opt);
}else{
obj_out(resp,obj,opt);
}
}
function obj_out(resp,obj,opt)
{
var ret = {"_id" : (new ObjId(obj.header.ID)).toString(),
"meta" : obj.meta
}
if(obj.header.TY==BinStream.BINARY_TYPE)
{
var bs = BSData.create(obj.data);
ret.data = bs.serialize('object-encoded');
}else{
ret.data = obj.data;
}
resp.responseOK(ret);
}
function data_out(resp,obj,opt)
{
var objType = obj.header.TY;
if(objType == BinStream.BINARY_TYPE){
if(opt.filetype){
resp.response.type(opt.filetype);
}else{
}
resp.response.send(obj.data);
}else if(objType == BinStream.STRING_TYPE){
resp.response.send(obj.data);
}else{
resp.responseOK(obj.data);
}
}
module.exports = router;
This diff is collapsed.
{
"version":"1.2",
"build":"201806131400"
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment