Commit bd55f70d authored by Kamron Aroonrua's avatar Kamron Aroonrua 💬

new storage api,new plugins

parent e0e97157
var util = require('util'); var util = require('util');
var domain = require('domain');
var async = require('async'); var async = require('async');
var domain = require('domain'); var domain = require('domain');
var crypto = require("crypto"); var crypto = require("crypto");
...@@ -18,8 +17,10 @@ function JobTask (prm) ...@@ -18,8 +17,10 @@ function JobTask (prm)
this.handle = prm.handle; this.handle = prm.handle;
this.mem = prm.handle.mem; this.mem = prm.handle.mem;
this.jobcaller = prm.handle.jobcaller;
this.jobcfg = prm.job_config; this.jobcfg = prm.job_config;
this.input_meta = prm.input_meta;
this.input_data = prm.input_data; this.input_data = prm.input_data;
this.transaction_id = prm.transaction_id; this.transaction_id = prm.transaction_id;
this.job_timeout = prm.opt.job_timeout || 60000; this.job_timeout = prm.opt.job_timeout || 60000;
...@@ -43,6 +44,7 @@ JobTask.prototype.run = function () ...@@ -43,6 +44,7 @@ JobTask.prototype.run = function ()
{ {
var self=this; var self=this;
var transaction_id = this.transaction_id || genTransactionId(); var transaction_id = this.transaction_id || genTransactionId();
var input_meta = this.input_meta;
var obj_input_data = getInputData(this.input_data); var obj_input_data = getInputData(this.input_data);
var job_tr_config = this.jobcfg; var job_tr_config = this.jobcfg;
var job_id = job_tr_config.job_id; var job_id = job_tr_config.job_id;
...@@ -61,7 +63,7 @@ JobTask.prototype.run = function () ...@@ -61,7 +63,7 @@ JobTask.prototype.run = function ()
var context = { var context = {
"jobconfig" : job_tr_config, "jobconfig" : job_tr_config,
"transaction" : ctx_transaction, "transaction" : ctx_transaction,
"input_data" : obj_input_data, "input" : {'data':obj_input_data,'meta':input_meta} ,
"job" : ctx_job "job" : ctx_job
} }
...@@ -87,7 +89,7 @@ JobTask.prototype.run = function () ...@@ -87,7 +89,7 @@ JobTask.prototype.run = function ()
} }
var task_dt = function (request,callback) { var task_dt = function (request,callback) {
var dt_request = {'input_type':request.type,'data':request.data} var dt_request = {'input_type':request.type,'meta':request.meta,'data':request.data}
var dm_t = domain.create(); var dm_t = domain.create();
dm_t.on('error', function(err) { dm_t.on('error', function(err) {
...@@ -110,7 +112,7 @@ JobTask.prototype.run = function () ...@@ -110,7 +112,7 @@ JobTask.prototype.run = function ()
} }
var task_do = function (request,callback) { var task_do = function (request,callback) {
var do_request = {'input_type':request.type,'data':request.data} var do_request = {'input_type':request.type,'meta':request.meta,'data':request.data}
var dm_o = domain.create(); var dm_o = domain.create();
dm_o.on('error', function(err) { dm_o.on('error', function(err) {
...@@ -204,9 +206,11 @@ function perform_do(prm,cb) ...@@ -204,9 +206,11 @@ function perform_do(prm,cb)
var do_cfg = do_context.jobconfig.data_out; var do_cfg = do_context.jobconfig.data_out;
var DOTask = getPlugins('do',do_cfg.type); var DOTask = getPlugins('do',do_cfg.type);
var doMem = new memstore({'job_id':job_id,'cat':'do','mem':prm.handle.mem}) var doMem = new memstore({'job_id':job_id,'cat':'do','mem':prm.handle.mem});
var jobcaller = prm.handle.jobcaller;
do_context.task = { do_context.task = {
"memstore" : doMem "memstore" : doMem,
"jobcaller" : jobcaller
} }
var dout = new DOTask(do_context,prm.request); var dout = new DOTask(do_context,prm.request);
...@@ -219,7 +223,17 @@ function perform_do(prm,cb) ...@@ -219,7 +223,17 @@ function perform_do(prm,cb)
function getPlugins(type,name) function getPlugins(type,name)
{ {
var path = '../../plugins/' + type + '/' + type + '-' +name; var path = '../../plugins/';
var path_token= name.split('.');
if(path_token.length >=2)
{
var pName = path_token.pop();
var pPath = path_token.join('/');
path = path + type + '/' + pPath + '/' + type + '-' + pName;
}else{
path += type + '/' + type + '-' + name;
}
return require(path); return require(path);
} }
......
...@@ -46,6 +46,7 @@ JT.prototype.run = function (done) ...@@ -46,6 +46,7 @@ JT.prototype.run = function (done)
var task_prm = { var task_prm = {
'handle' : self.handle, 'handle' : self.handle,
'job_config' : jobCfg, 'job_config' : jobCfg,
'input_meta' : command.input_meta,
'input_data' : command.input_data, 'input_data' : command.input_data,
'opt' : {'job_timeout' :60000} 'opt' : {'job_timeout' :60000}
} }
...@@ -84,5 +85,5 @@ function validate_execute_cmd(cmd) ...@@ -84,5 +85,5 @@ function validate_execute_cmd(cmd)
function genTransactionId() function genTransactionId()
{ {
var id = crypto.randomBytes(3).toString("hex"); var id = crypto.randomBytes(3).toString("hex");
return "TR" + (new Date).getTime() + id; return "TR" + (new Date).getTime() + id.toUpperCase();
} }
...@@ -20,7 +20,9 @@ var JW = function JobWorker (prm) ...@@ -20,7 +20,9 @@ var JW = function JobWorker (prm)
this.instance_name = param.name; this.instance_name = param.name;
this.conn = ConnCtx.create(this.config); this.conn = ConnCtx.create(this.config);
this.mem = this.conn.getMemstore() this.mem = this.conn.getMemstore();
this.jobcaller = new QueueCaller({'url':this.conn.getAmqpUrl(),'name':'bs_jobs_cmd'});
this.job_registry = JobRegistry.create({'redis':this.mem}); this.job_registry = JobRegistry.create({'redis':this.mem});
} }
......
...@@ -72,10 +72,18 @@ Storage.prototype.open = function(cb) ...@@ -72,10 +72,18 @@ Storage.prototype.open = function(cb)
var fileOpt = null; var fileOpt = null;
this.file = fileAccessBuffer.create(this.filename,fileOpt); this.file = fileAccessBuffer.create(this.filename,fileOpt);
this.root = new Root(this.file); self.root = new Root(self.file);
this.root.load(function(err,obj){ self.root.load(function(err,obj){
cb(err,self); cb(err,self);
}) });
// this.file.open(function(err){
//
// self.root = new Root(self.file);
// self.root.load(function(err,obj){
// cb(err,self);
// });
//
// });
} }
Storage.prototype.write = function(data,opt,cb) Storage.prototype.write = function(data,opt,cb)
......
var thunky = require('thunky');
var randomAccessFile = require('random-access-file'); var randomAccessFile = require('random-access-file');
var fs = require("fs"); var fs = require("fs");
var async = require('async'); var async = require('async');
...@@ -7,11 +8,38 @@ var BUFFERSIZE = 1*1024*1024; ...@@ -7,11 +8,38 @@ var BUFFERSIZE = 1*1024*1024;
function FileAccessBuffer(filename,opt){ function FileAccessBuffer(filename,opt){
if (!opt) opt = {}; if (!opt) opt = {};
var self = this;
this.filename = filename; this.filename = filename;
this.size = opt.size || BUFFERSIZE; this.size = opt.size || BUFFERSIZE;
this.buffer = null; this.buffer = null;
this.file = randomAccessFile(this.filename,opt); this.file = new randomAccessFile(this.filename,opt);
this.opened = false
}
FileAccessBuffer.prototype.open = function open (cb) {
var self = this;
fs.open(self.filename, 'r', onopen)
function onopen (err, fd) {
if (err) {
return cb(err);
}
self.opened = true;
self.fd = fd;
fs.fstat(fd, function (err, st) {
if (err) {
return cb(err)
}
self.filesize = st.size;
cb();
});
}
} }
FileAccessBuffer.prototype.bufferedRead = function (offset, length, cb) { FileAccessBuffer.prototype.bufferedRead = function (offset, length, cb) {
...@@ -20,17 +48,23 @@ FileAccessBuffer.prototype.bufferedRead = function (offset, length, cb) { ...@@ -20,17 +48,23 @@ FileAccessBuffer.prototype.bufferedRead = function (offset, length, cb) {
var readEnd = (offset + length) - 1; var readEnd = (offset + length) - 1;
var ret_buffer = new Buffer(length); var ret_buffer = new Buffer(length);
if(!this.filesize){ if(!self.filesize){
var stats = fs.statSync(this.filename); fs.stat(self.filename,function(err,stats){
this.filesize = stats["size"]; self.filesize = stats["size"];
do_read();
});
}else{
do_read();
} }
if(this.buffer){ function do_read(){
var buffStart = this.buffer.offset;
var buffEnd = (this.buffer.offset + this.buffer.data.length) - 1; if(self.buffer){
var buffStart = self.buffer.offset;
var buffEnd = (self.buffer.offset + self.buffer.data.length) - 1;
if(readStart >= buffStart && readEnd <= buffEnd){ if(readStart >= buffStart && readEnd <= buffEnd){
this.buffer.data.copy(ret_buffer,0,readStart-buffStart,(readStart-buffStart)+length); self.buffer.data.copy(ret_buffer,0,readStart-buffStart,(readStart-buffStart)+length);
setImmediate(function (){ setImmediate(function (){
cb(null,ret_buffer); cb(null,ret_buffer);
//cb(null,self.buffer.data.slice(readStart-buffStart,(readStart-buffStart)+length)); //cb(null,self.buffer.data.slice(readStart-buffStart,(readStart-buffStart)+length));
...@@ -39,7 +73,7 @@ FileAccessBuffer.prototype.bufferedRead = function (offset, length, cb) { ...@@ -39,7 +73,7 @@ FileAccessBuffer.prototype.bufferedRead = function (offset, length, cb) {
//intersec back //intersec back
var bytesDiff = (buffEnd-readStart)+1; var bytesDiff = (buffEnd-readStart)+1;
this.buffer.data.copy(ret_buffer,0,readStart-buffStart,(readStart-buffStart)+bytesDiff); self.buffer.data.copy(ret_buffer,0,readStart-buffStart,(readStart-buffStart)+bytesDiff);
var ioffset = buffEnd+1; var ioffset = buffEnd+1;
var bytesRequire = length-bytesDiff; var bytesRequire = length-bytesDiff;
...@@ -49,7 +83,7 @@ FileAccessBuffer.prototype.bufferedRead = function (offset, length, cb) { ...@@ -49,7 +83,7 @@ FileAccessBuffer.prototype.bufferedRead = function (offset, length, cb) {
ilength = self.filesize - ioffset; ilength = self.filesize - ioffset;
} }
self.file.read(ioffset,ilength,function(err,buff){ self.read(ioffset,ilength,function(err,buff){
//console.log('intersec read'); //console.log('intersec read');
if(!err){ if(!err){
self.buffer = { self.buffer = {
...@@ -68,6 +102,8 @@ FileAccessBuffer.prototype.bufferedRead = function (offset, length, cb) { ...@@ -68,6 +102,8 @@ FileAccessBuffer.prototype.bufferedRead = function (offset, length, cb) {
newbuffer(cb); newbuffer(cb);
} }
}
function newbuffer(callback){ function newbuffer(callback){
var loffset = offset; var loffset = offset;
var llength = (self.size>length)?self.size:length; var llength = (self.size>length)?self.size:length;
...@@ -76,14 +112,15 @@ FileAccessBuffer.prototype.bufferedRead = function (offset, length, cb) { ...@@ -76,14 +112,15 @@ FileAccessBuffer.prototype.bufferedRead = function (offset, length, cb) {
llength = self.filesize - loffset; llength = self.filesize - loffset;
} }
self.file.read(loffset,llength,function(err,buff){ self.read(loffset,llength,function(err,buff){
//console.log('new read'); //console.log('new read');
if(!err){ if(!err){
self.buffer = { self.buffer = {
"offset":loffset, "offset":loffset,
"data":buff "data":buff
}; };
buff.copy(ret_buffer,0,0,length); //try{buff.copy(ret_buffer,0,0,llength);}catch(e){process.exit(1)}
buff.copy(ret_buffer,0,0,llength);
} }
callback(err,ret_buffer); callback(err,ret_buffer);
}); });
...@@ -92,8 +129,32 @@ FileAccessBuffer.prototype.bufferedRead = function (offset, length, cb) { ...@@ -92,8 +129,32 @@ FileAccessBuffer.prototype.bufferedRead = function (offset, length, cb) {
} }
FileAccessBuffer.prototype.read = function (offset, length, cb) { FileAccessBuffer.prototype.read = function (offset, length, cb) {
//console.log('bypss read'); //console.log('bypss read ');
this.file.read(offset, length, cb); this.file.read(offset, length, cb);
// var self=this;
// if(!self.fd){return cb(new Error('File not opened'));}
//
// var buf = new Buffer(length)
// var len = 0;
// var bufOfs = 0;
// var readlen = length;
// var fileOfs = offset;
// async.whilst(
// function() { return len < length },
// function(callback) {
// //console.log(self.filename + ' ' + String(len) + ' ' + String(readlen) + ' ' + String(fileOfs));
// fs.read(self.fd, buf, len, readlen, fileOfs, function(err,bytes){
// len+=bytes;
// fileOfs+=bytes;
// readlen-=bytes;
// callback(err);
// });
// },function(err){
// if(err){console.log(err);}
// cb(err,buf);
// }
// );
} }
FileAccessBuffer.prototype.write = function (offset, buf, cb) { FileAccessBuffer.prototype.write = function (offset, buf, cb) {
...@@ -106,5 +167,6 @@ FileAccessBuffer.prototype.close = function (cb) { ...@@ -106,5 +167,6 @@ FileAccessBuffer.prototype.close = function (cb) {
module.exports.create = function(filename,opt) module.exports.create = function(filename,opt)
{ {
return new FileAccessBuffer(filename,opt); var inst = new FileAccessBuffer(filename,opt);
return inst;
} }
...@@ -16,7 +16,12 @@ var oatmeta_struct = function(){ ...@@ -16,7 +16,12 @@ var oatmeta_struct = function(){
} }
var create = module.exports.create = function(fd,prm,opt){ var create = module.exports.create = function(fd,prm,opt){
var oatmeta = oatmeta_struct(); var oatmeta = {
"SEQ" : 0,
"SZ" : 10000,
"PREV" : null,
"NEXT" : null,
};
oatmeta.SEQ = (prm.SEQ)?prm.SEQ:1; oatmeta.SEQ = (prm.SEQ)?prm.SEQ:1;
oatmeta.SZ = (prm.SZ)?prm.SZ:10000; oatmeta.SZ = (prm.SZ)?prm.SZ:10000;
oatmeta.PREV = (prm.PREV)?prm.PREV:null; oatmeta.PREV = (prm.PREV)?prm.PREV:null;
...@@ -35,15 +40,10 @@ module.exports.load = function(fd,address,opt,cb){ ...@@ -35,15 +40,10 @@ module.exports.load = function(fd,address,opt,cb){
options = opt || {}; options = opt || {};
} }
fd.read(address, OATMETASIZE, function(err, buffer) { var inst = new Oat(fd,address,null,options);
var oatmeta = null; inst.loadOat(function(err){
if(!err){ cb(err,inst)
oatmeta = BSON.parse(buffer);
var inst = new Oat(fd,address,oatmeta,options);
}
cb(err,inst);
}); });
} }
function Oat(fd,addr,meta,opt){ function Oat(fd,addr,meta,opt){
...@@ -59,6 +59,20 @@ Oat.prototype.setNextAddr = function(addr){ ...@@ -59,6 +59,20 @@ Oat.prototype.setNextAddr = function(addr){
this.oatmeta.NEXT = addr; this.oatmeta.NEXT = addr;
} }
Oat.prototype.loadOat = function(cb){
var self=this;
self.file.read(self.address, OATMETASIZE, function(err, buffer) {
var oatmeta = null;
if(!err){
oatmeta = BSON.parse(buffer);
self.oatmeta = oatmeta
}
cb(err);
});
}
Oat.prototype.oatAt = function(seq,cb){ Oat.prototype.oatAt = function(seq,cb){
var self=this; var self=this;
var curSeq = this.oatmeta.SEQ; var curSeq = this.oatmeta.SEQ;
...@@ -173,30 +187,41 @@ Oat.prototype.readSlot = function(index,opt,cb){ ...@@ -173,30 +187,41 @@ Oat.prototype.readSlot = function(index,opt,cb){
}else{ }else{
options = opt || {}; options = opt || {};
} }
//options.nobuffer = true;
if(index>=this.oatmeta.SZ){ if(index>=self.oatmeta.SZ){
return cb(new Error("Index out of bound")); return cb(new Error("Index out of bound"));
} }
var slotStart = this.address + OATMETASIZE; var slotStart = self.address + OATMETASIZE;
var slotOffset = index*OBJHEADERSIZE; var slotOffset = index*OBJHEADERSIZE;
var slotAddr = slotStart + slotOffset; var slotAddr = slotStart + slotOffset;
if(this.slotbuffer){ if(self.slotbuffer){
var buff = this.slotbuffer.slice(slotOffset,slotOffset+OBJHEADERSIZE); var buff = self.slotbuffer.slice(slotOffset,slotOffset+OBJHEADERSIZE);
var objbuff = BSON.parse(buff);
setImmediate(function (){ setImmediate(function (){
cb(null,BSON.parse(buff)); cb(null,objbuff);
// if(!objbuff.AD){console.log('ERRR');console.log(index); console.log(slotOffset);console.log(self.slotbuffer.slice(550560,550570));console.log('ERRRRR');}
// cb(null,objbuff);
//cb(null,BSON.parse(buff));
}); });
}else{ }else{
var readstart = (options.nobuffer)?slotAddr:slotStart; var readstart = (options.nobuffer)?slotAddr:slotStart;
var readlen = (options.nobuffer)?OBJHEADERSIZE:(OBJHEADERSIZE * this.oatmeta.SZ); var readlen = (options.nobuffer)?OBJHEADERSIZE:(OBJHEADERSIZE * self.oatmeta.SZ);
//console.log(String(readstart) + "," + String(self.address));
//console.log('Read ' + self.file.fd + ' at ' + String(readstart) + ' index ' + index );
self.file.read(readstart, readlen, function(err, rdbuffer) { self.file.read(readstart, readlen, function(err, rdbuffer) {
//console.log(self.file.filename);
//console.log('Read ' + self.file.fd + ' at ' + String(readstart) + ' index ' + index );
if(options.nobuffer){ if(options.nobuffer){
cb(err,BSON.parse(rdbuffer)); cb(err,BSON.parse(rdbuffer));
}else{ }else{
self.slotbuffer = rdbuffer; self.slotbuffer = rdbuffer;
//console.log(self.slotbuffer.slice(550560,550570));
var slicebuff = self.slotbuffer.slice(slotOffset,slotOffset+OBJHEADERSIZE); var slicebuff = self.slotbuffer.slice(slotOffset,slotOffset+OBJHEADERSIZE);
cb(err,BSON.parse(slicebuff)); var objret = BSON.parse(slicebuff);
cb(err,objret);
} }
}); });
......
...@@ -9,12 +9,13 @@ module.exports = Reader; ...@@ -9,12 +9,13 @@ module.exports = Reader;
function Reader(fd,root){ function Reader(fd,root){
this.file = fd; this.file = fd;
this.root = root; this.root = root;
this.rootData = root.getRoot();
this.cursorIdx = 0; this.cursorIdx = 0;
this.oat = null; this.oat = null;
} }
Reader.prototype.moveTo = function(idx){ Reader.prototype.moveTo = function(idx){
var rootData = this.root.getRoot(); var rootData = this.rootData;
if(idx>0 && idx <= rootData.SEQ){ if(idx>0 && idx <= rootData.SEQ){
this.cursorIdx = idx-1; this.cursorIdx = idx-1;
return true; return true;
...@@ -74,7 +75,7 @@ Reader.prototype.readAt = function(seq,opt,cb){ ...@@ -74,7 +75,7 @@ Reader.prototype.readAt = function(seq,opt,cb){
options = opt || {}; options = opt || {};
} }
var rootData = this.root.getRoot(); var rootData = this.rootData;
if(seq<=0 || seq > rootData.SEQ){ if(seq<=0 || seq > rootData.SEQ){
return cb(new Error('unavailable')); return cb(new Error('unavailable'));
...@@ -83,20 +84,6 @@ Reader.prototype.readAt = function(seq,opt,cb){ ...@@ -83,20 +84,6 @@ Reader.prototype.readAt = function(seq,opt,cb){
var oatNo = Math.ceil(seq / rootData.OATZ); var oatNo = Math.ceil(seq / rootData.OATZ);
var slotNo = (seq-1)%rootData.OATZ; var slotNo = (seq-1)%rootData.OATZ;
// var hobj3 = {
// "ID":new Buffer(12),
// "TY":3,
// "FG":null,
// "MZ":94,
// "DZ":97,
// "AD":17060704
// }
// var obj = ObjectData.createByHeader(self.file,hobj3);
// process.nextTick(function(){
// cb(null,obj);
// })
async.waterfall([ async.waterfall([
function(callback) { function(callback) {
......
...@@ -21,10 +21,7 @@ var storage_write_request = { ...@@ -21,10 +21,7 @@ var storage_write_request = {
'meta' : {'name':'gcs'}, 'meta' : {'name':'gcs'},
'data' : { 'data' : {
'type' : 'bsdata', 'type' : 'bsdata',
'value' : { 'value' : bsdata
'data_type' : 'string',
'data' : 'AA00FFCC'
}
} }
} }
} }
......
...@@ -46,6 +46,11 @@ responseHelper.prototype.type = function (tpy) ...@@ -46,6 +46,11 @@ responseHelper.prototype.type = function (tpy)
this.response.type(tpy); this.response.type(tpy);
} }
responseHelper.prototype.status = function (code)
{
return this.response.status(code);
}
responseHelper.prototype.write = function (data) responseHelper.prototype.write = function (data)
{ {
this.response.write(data); this.response.write(data);
......
...@@ -21,6 +21,7 @@ ...@@ -21,6 +21,7 @@
"dateformat": "^1.0.12", "dateformat": "^1.0.12",
"express": "^4.14.0", "express": "^4.14.0",
"ioredis": "^2.5.0", "ioredis": "^2.5.0",
"minimist": "^1.2.0",
"moment": "^2.17.1", "moment": "^2.17.1",
"node-persist": "^2.0.7", "node-persist": "^2.0.7",
"node-schedule": "^1.2.0", "node-schedule": "^1.2.0",
...@@ -31,6 +32,7 @@ ...@@ -31,6 +32,7 @@
"random-access-file": "^1.3.0", "random-access-file": "^1.3.0",
"redis": "^2.6.5", "redis": "^2.6.5",
"request": "^2.79.0", "request": "^2.79.0",
"thunky": "^1.0.2" "thunky": "^1.0.2",
"tiny-worker": "^2.1.1"
} }
} }
module.exports = require('../../context');
...@@ -6,6 +6,7 @@ function perform_function(context,response){ ...@@ -6,6 +6,7 @@ function perform_function(context,response){
var output_type = 'text' var output_type = 'text'
var data = 'hello world ' + transaction_id; var data = 'hello world ' + transaction_id;
var meta = {'module':'example','tid':transaction_id}
// memstore.setItem('lasttransaction',transaction_id,function(err){ // memstore.setItem('lasttransaction',transaction_id,function(err){
// response.success(data); // response.success(data);
...@@ -17,15 +18,10 @@ function perform_function(context,response){ ...@@ -17,15 +18,10 @@ function perform_function(context,response){
// response.success(value); // response.success(value);
// }); // });
var ts = (new Date).getTime();
var delay = 0; var delay = 0;
if(ts%9==0){
//delay = 10000;
}
setTimeout(function(){ setTimeout(function(){
response.success(data,output_type); response.success(data,{'meta':meta,'output_type':output_type});
},delay) },delay)
//response.success(data,output_type); //response.success(data,output_type);
//response.reject(); //response.reject();
......
var ctx = require('../../../context');
var bsdata = ctx.getLib('lib/model/bsdata');
function perform_function(context,response){ function perform_function(context,response){
var job_id = context.jobconfig.job_id; var job_id = context.jobconfig.job_id;
var transaction_id = context.transaction.id; var transaction_id = context.transaction.id;
var param = context.jobconfig.data_in.param || {}; var param = context.jobconfig.data_in.param || {};
var memstore = context.task.memstore; var memstore = context.task.memstore;
var input_data = context.input_data; var input_data = context.input.data;
var input_meta = context.input.meta;
var output_type = 'object' var output_type = 'object'
var data = input_data; var data = input_data;
...@@ -21,7 +25,7 @@ function perform_function(context,response){ ...@@ -21,7 +25,7 @@ function perform_function(context,response){
// }); // });
response.success(data,output_type); response.success(data,{'meta':input_meta,'output_type':output_type});
//response.reject(); //response.reject();
//response.error("error message") //response.error("error message")
......
...@@ -34,17 +34,26 @@ function DIResponse(handle){ ...@@ -34,17 +34,26 @@ function DIResponse(handle){
this.handle = handle; this.handle = handle;
this.status = null; this.status = null;
this.data = null; this.data = null;
this.meta = null;
this.output_type = ''; this.output_type = '';
} }
DIResponse.prototype.success = function(data,type){ DIResponse.prototype.success = function(data,type){
var self=this; var self=this;
var handle = this.handle; var handle = this.handle;
this.data = data;
if(typeof type == 'string'){
this.output_type=type;
}else if(typeof type == 'object' && type){
this.output_type=(type.output_type)?type.output_type:this.output_type;
this.meta=(type.meta)?type.meta:this.meta;
}
this.data = (data)?data:this.data ;
this.status = 'success'; this.status = 'success';
if(type){this.output_type=type;}
process.nextTick(function(){ process.nextTick(function(){
handle.emit('done',{'status':'success','data':data,'type':self.output_type}); handle.emit('done',{'status':'success','meta':self.meta,'data':self.data,'type':self.output_type});
}); });
} }
......
var util = require('util');
var DIPlugin = require('../../di-plugin');
function DITask(context){
DIPlugin.call(this,context);
this.name = "igrid.example";
this.output_type = "text";
}
util.inherits(DITask,DIPlugin);
DITask.prototype.perform = require('./perform');
module.exports = DITask;
function perform_function(context,response){
var job_id = context.jobconfig.job_id;
var transaction_id = context.transaction.id;
var param = context.jobconfig.data_in.param;
var memstore = context.task.memstore
var output_type = 'text'
var data = 'hello world ' + transaction_id;
var meta = {'module':'igrid.example','tid':transaction_id}
// memstore.setItem('lasttransaction',transaction_id,function(err){
// response.success(data);
// });
// memstore.getItem('lasttransaction2',function(err,value){
// console.log('key');
// console.log(value);
// response.success(value);
// });
var delay = 0;
setTimeout(function(){
response.success(data,{'meta':meta,'output_type':output_type});
},delay)
//response.success(data,output_type);
//response.reject();
//response.error("error message")
}
module.exports = perform_function;
module.exports = require('../../context');
var util = require('util');
var DOPlugin = require('../do-plugin');
function DOTask(context,request){
DOPlugin.call(this,context,request);
this.name = "call";
}
util.inherits(DOTask,DOPlugin);
DOTask.prototype.perform = require('./perform');
module.exports = DOTask;
var ctx = require('../../../context');
var bsdata = ctx.getLib('lib/model/bsdata');
function perform_function(context,request,response){
var job_id = context.jobconfig.job_id;
var transaction_id = context.transaction.id;
var param = context.jobconfig.data_out.param;
var memstore = context.task.memstore;
var jobcaller = context.task.jobcaller;
var data = request.data;
var meta = request.meta;
var prm_to = param.to;
if(Array.isArray(prm_to))
{
prm_to.forEach(function(job){
call_to(job);
});
}else{
call_to(prm_to);
}
function call_to(target)
{
var job_id = null;
if(typeof target == 'string')
{
job_id = target;
}else{return;}
var cmd = {
'object_type':'job_execute',
'source' : 'scheduler',
'jobId' : job_id,
'option' : {'exe_level':'secondary'},
'input_meta' : meta,
'input_data' : {
'type' : 'bsdata',
'value' : bsdata.create(data).serialize('object-encoded')
}
}
jobcaller.send(cmd);
}
response.success();
//response.reject();
//response.error("error message")
}
module.exports = perform_function;
...@@ -6,8 +6,11 @@ function perform_function(context,request,response){ ...@@ -6,8 +6,11 @@ function perform_function(context,request,response){
var output_type = request.input_type; var output_type = request.input_type;
var data = request.data; var data = request.data;
var meta = request.meta;
if(meta){console.log('meta => ' + JSON.stringify(meta));}
console.log('--- data ---');
console.log(data); console.log(data);
response.success(); response.success();
......
module.exports = require('../../context');
var util = require('util');
var DTPlugin = require('../dt-plugin');
function DTTask(context,request){
DTPlugin.call(this,context,request);
this.name = "jsmap";
this.output_type = "";
}
util.inherits(DTTask,DTPlugin);
DTTask.prototype.perform = require('./perform');
module.exports = DTTask;
var vm = require('vm');
function perform_function(context,request,response){
var job_id = context.jobconfig.job_id;
var transaction_id = context.transaction.id;
var param = context.jobconfig.data_transform.param;
var memstore = context.task.memstore
var in_type = request.input_type;
var in_data = request.data;
var in_meta = request.meta;
var mapscr = param.script || "";
var datascr = param.text ;
if(param.text){
mapscr = mapscr + "; data=`" + param.text + "`";
}
var mapenv = {
'src' : {
'type' : in_type,
'data' : in_data,
'meta' : in_meta
},
'type' : in_type,
'data' : in_data,
'meta' : in_meta
}
var script = new vm.Script(mapscr);
var context = new vm.createContext(mapenv);
script.runInContext(context);
var data = mapenv.data;
var meta = mapenv.meta;
var output_type = mapenv.type;
// memstore.setItem('lasttransaction',transaction_id,function(err){
// response.success(data);
// });
// memstore.getItem('lasttransaction',function(err,value){
// response.success(value);
// });
response.success(data,{'meta':meta,'output_type':output_type});
//response.reject();
//response.error("error message")
}
module.exports = perform_function;
...@@ -6,6 +6,7 @@ function perform_function(context,request,response){ ...@@ -6,6 +6,7 @@ function perform_function(context,request,response){
var output_type = request.input_type; var output_type = request.input_type;
var data = request.data; var data = request.data;
var meta = request.meta;
// memstore.setItem('lasttransaction',transaction_id,function(err){ // memstore.setItem('lasttransaction',transaction_id,function(err){
...@@ -17,7 +18,7 @@ function perform_function(context,request,response){ ...@@ -17,7 +18,7 @@ function perform_function(context,request,response){
// }); // });
//data = data.a.b + "--DT--"; //data = data.a.b + "--DT--";
response.success(data,output_type); response.success(data,{'meta':meta,'output_type':output_type});
//response.reject(); //response.reject();
//response.error("error message") //response.error("error message")
......
...@@ -35,17 +35,26 @@ function DTResponse(handle){ ...@@ -35,17 +35,26 @@ function DTResponse(handle){
this.handle = handle; this.handle = handle;
this.status = null; this.status = null;
this.data = null; this.data = null;
this.meta = null;
this.output_type = ''; this.output_type = '';
} }
DTResponse.prototype.success = function(data,type){ DTResponse.prototype.success = function(data,type){
var self=this; var self=this;
var handle = this.handle; var handle = this.handle;
this.data = data;
if(typeof type == 'string'){
this.output_type=type;
}else if(typeof type == 'object' && type){
this.output_type=(type.output_type)?type.output_type:this.output_type;
this.meta=(type.meta)?type.meta:this.meta;
}
this.data = (data)?data:this.data ;
this.status = 'success'; this.status = 'success';
if(type){this.output_type=type;}
process.nextTick(function(){ process.nextTick(function(){
handle.emit('done',{'status':'success','data':data,'type':self.output_type}); handle.emit('done',{'status':'success','meta':self.meta,'data':self.data,'type':self.output_type});
}); });
} }
......
...@@ -4,4 +4,4 @@ How to build xml2json ...@@ -4,4 +4,4 @@ How to build xml2json
- npm install - npm install
(for centos : http://hiltmon.com/blog/2015/08/09/c-plus-plus-11-on-centos-6-dot-6/) (for centos : http://hiltmon.com/blog/2015/08/09/c-plus-plus-11-on-centos-6-dot-6/)
Git clone Git Clone
\ No newline at end of file \ No newline at end of file
var ctx = require('./context'); var ctx = require('./context');
var StorageService = ctx.getLib('storage-service/main'); var StorageService = ctx.getLib('storage-service/main');
var argv = require('minimist')(process.argv.slice(2));
//ss.start();
var m = {'read':false,'write':false}
if(argv['process-read']){m.read = true;}
if(argv['process-write']){m.write = true;}
if(argv['api-port']){ctx.config.storage.api_port = Number(argv['api-port']);}
//console.log(argv);
var ss = StorageService.create(ctx.config); var ss = StorageService.create(ctx.config);
ss.start();
if(!m.read && !m.write)
{
ss.start();
}else{
if(m.read){ss.http_start();}
if(m.write){ss.amqp_start();}
}
...@@ -92,6 +92,8 @@ BSSEngine.prototype.cmd_write = function(prm,cb) ...@@ -92,6 +92,8 @@ BSSEngine.prototype.cmd_write = function(prm,cb)
var data = parseData(prm.data); var data = parseData(prm.data);
var meta = prm.meta; var meta = prm.meta;
if(!data){return cb("null data")}
this.bss.write(data,{'meta':meta},function(err,obj){ this.bss.write(data,{'meta':meta},function(err,obj){
if(!err){ if(!err){
var head = obj.getHeader(); var head = obj.getHeader();
...@@ -116,7 +118,9 @@ function parseData(dat) ...@@ -116,7 +118,9 @@ function parseData(dat)
if(dat.type && dat.type == 'bsdata') if(dat.type && dat.type == 'bsdata')
{ {
var bs = bsdata.parse(dat.value); var bs = bsdata.parse(dat.value);
if(bs){
ret = bs.data; ret = bs.data;
}
}else{ }else{
ret = dat.value; ret = dat.value;
} }
......
//console.log(__dirname);
var ctx = require(__dirname + '/context');
var path = require('path');
var fs = require('fs');
var async = require('async');
var BinStream = ctx.getLib('lib/bss/binarystream_v1_1');
var ObjId = ctx.getLib('lib/bss/objid');
var BSData = ctx.getLib('lib/model/bsdata');
onmessage = function (ev) {
var msg = ev.data;
if(msg.cmd == 'read')
{
bss_read_objects(msg.prm)
}
};
function bss_read_objects (prm)
{
var bss_full_path = prm.bss_full_path;
var tail_no = prm.tail_no;
var from_seq = prm.from_seq;
var limit = prm.limit;
var output_type = prm.output_type;
var objOpt = prm.objOpt;
var sizelimit = prm.sizelimit;
fs.exists(bss_full_path,function(exists){
if(exists){
BinStream.open(bss_full_path,function(err,bss){
var rd = bss.reader();
var rec_count = rd.count();
if(tail_no){
var last_count=Number(tail_no);
from_seq = (rec_count - last_count) + 1;
}
if(from_seq<1){from_seq=1;}
var idx = from_seq;
var obj_return = [];
var cont = true;
if(idx > rec_count){cont=false;}
rd.moveTo(idx);
//start stream response
var resultIdx=0;
var counter=0;
bss_start();
async.whilst(
function() { return cont; },
function(callback){
rd.nextObject(function(err,obj){
if(idx > rec_count || !obj){
cont=false;
}else{
idx++;
var dataout = JSON.stringify(obj_out(obj,objOpt));
//if(resultIdx>0){stream_newrec(respHelper,output_type);}
bss_output(dataout);
counter += dataout.length;
if(sizelimit>0 && counter>=sizelimit){
cont=false;
}
if(limit>0 && idx>=from_seq+limit){
cont=false;
}
resultIdx++;
}
callback();
});
},function(err){
//stream_end(respHelper,output_type);
bss_end(200);
bss.close(function(err){
//res.status(200).end();
//bss_end(200);
});
});
});
}else{
bss_end(404);
}
});
}
function bss_start(data)
{
//console.log('start');
var msg = {
'on' : 'start',
'data' : data
}
postMessage(msg);
}
function bss_output(data)
{
//console.log('data');
var msg = {
'on' : 'data',
'data' : data
}
postMessage(msg);
}
function bss_end(code)
{
//console.log('end');
var msg = {
'on' : 'end',
'code' : code
}
postMessage(msg);
}
function obj_out(obj,opt){
var ret = {
"_id" : (new ObjId(obj.header.ID)).toString()
}
if(opt.meta){ret.meta = obj.meta;}
if(opt.data){
if(obj.header.TY==BinStream.BINARY_TYPE)
{
var bs = BSData.create(obj.data);
ret.data = bs.serialize('object-encoded');
}else{
ret.data = obj.data;
}
}
return ret
}
var util = require('util');
var EventEmitter = require('events').EventEmitter;
var Worker = require("tiny-worker");
module.exports = ReaderWorker;
function ReaderWorker ()
{
EventEmitter.call(this);
var self=this;
this.resp = null;
this.output_type = '';
this.firstline=true;
this.w = new Worker("storage-service/lib/ps_bssread.js");
this.w.onmessage = function (ev) {
var msg = ev.data;
var code = msg.code;
if(msg.on == 'end'){
// if(code == '404')
// {
// end(404);
// }else if(code == '200'){
// stream_end();
// end(200);
// }
self.emit('end',msg.code);
}
if(msg.on == 'start'){
// stream_start();
self.emit('start',msg.data);
}
if(msg.on == 'data'){
// if(!self.firstline){
// stream_newrec();
// }else{self.firstline = false;}
// stream_data(msg.data);
self.emit('data',msg.data);
}
}
// function stream_start()
// {
// var resp = self.resp;
// var type = self.output_type;
// if(type=='stream')
// {
// resp.type('text');
// }else{
// resp.type('application/json');
// resp.write('[');
// }
// }
//
// function stream_newrec()
// {
// var resp = self.resp;
// var type = self.output_type;
// if(type=='stream')
// {
// resp.write('\n');
// }else{
// resp.write(',');
// }
// }
//
// function stream_data(data)
// {
// var resp = self.resp;
// resp.write(data);
// }
//
// function stream_end()
// {
// var resp = self.resp;
// var type = self.output_type;
// if(type=='stream')
// {
// resp.write('');
// }else{
// resp.write(']');
// }
// }
//
// function end(code)
// {
// var resp = self.resp;
// if(code==404){
// resp.response404()
// }else{
// resp.status(code).end();
// }
// }
}
util.inherits(ReaderWorker, EventEmitter);
ReaderWorker.prototype.execute = function (msg)
{
this.working = true;
this.w.postMessage(msg);
}
ReaderWorker.prototype.shutdown = function ()
{
this.w.terminate();
}
var ReaderWorker = require('./reader_worker');
module.exports.create = function(opt)
{
return new WorkerPool(opt);
}
function WorkerPool (opt)
{
opt = opt || {};
this.pool = [];
this.size = opt.size || 1;
}
WorkerPool.prototype.initWorker = function ()
{
var pz = this.pool.length;
for(var i=0;i<this.size-pz;i++)
{
var w = new ReaderWorker();
this.pool.push(w);
}
}
WorkerPool.prototype.get = function ()
{
var w;
if(this.pool.length > 0)
{
//console.log('worker get');
w = this.pool.shift();
}else{
//console.log('worker new');
w = new ReaderWorker();
}
this.initWorker();
return w;
}
WorkerPool.prototype.push = function (worker)
{
worker.resp = null;
worker.output_type = null;
if(this.pool.length >= this.size)
{
//console.log('shutdown');
worker.shutdown();
}else{
//console.log('worker back');
this.pool.push(worker);
}
}
var ctx = require('../context'); var ctx = require('../context');
var rpcserver = ctx.getLib('lib/amqp/rpcserver'); var rpcserver = ctx.getLib('lib/amqp/rpcserver');
var Db = ctx.getLib('storage-service/lib/db'); var Db = ctx.getLib('storage-service/lib/db');
var WorkerPool = ctx.getLib('storage-service/lib/worker_pool');
var express = require('express'); var express = require('express');
var app = express(); var app = express();
...@@ -18,8 +19,9 @@ module.exports.create = function(cfg) ...@@ -18,8 +19,9 @@ module.exports.create = function(cfg)
var SS = function StorageService(cfg) var SS = function StorageService(cfg)
{ {
this.config = cfg; this.config = cfg;
storage_cfg = cfg.storage;
this.db = Db.create({'repos_dir':storage_cfg.repository}); this.db = Db.create({'repos_dir':storage_cfg.repository});
this.worker_pool = WorkerPool.create({'size':2});
} }
SS.prototype.start = function() SS.prototype.start = function()
...@@ -66,14 +68,20 @@ SS.prototype.amqp_start = function() ...@@ -66,14 +68,20 @@ SS.prototype.amqp_start = function()
SS.prototype.http_start = function() SS.prototype.http_start = function()
{ {
var self = this; var self = this;
var amqp_cfg = this.config.amqp;
var API_PORT = 19080; var API_PORT = (this.config.storage.api_port)?this.config.storage.api_port:19080;
app.use(bodyParser.json({limit: '5mb'})); app.use(bodyParser.json({limit: '5mb'}));
app.use(bodyParser.urlencoded({ app.use(bodyParser.urlencoded({
extended: true extended: true
})); }));
var context = ctx.getLib('lib/ws/http-context');
this.worker_pool.initWorker();
app.use(context.middleware({
'worker_pool' : self.worker_pool
}));
app.use(require('./ws')); app.use(require('./ws'));
......
...@@ -2,5 +2,6 @@ var express = require('express') ...@@ -2,5 +2,6 @@ var express = require('express')
, router = express.Router() , router = express.Router()
router.use('/v0.1',require('./v0.1')); router.use('/v0.1',require('./v0.1'));
router.use('/v1',require('./v0.1'));
module.exports = router; module.exports = router;
...@@ -2,7 +2,7 @@ var express = require('express'); ...@@ -2,7 +2,7 @@ var express = require('express');
var router = express.Router(); var router = express.Router();
router.use('/object',require('./service-object')); router.use('/object',require('./service-object'));
router.use('/storage',require('./service-storage')); router.use('/storage',require('./service-storage2'));
router.use('/storage2',require('./service-storage2'));
module.exports = router; module.exports = router;
...@@ -11,6 +11,7 @@ var response = ctx.getLib('lib/ws/response'); ...@@ -11,6 +11,7 @@ var response = ctx.getLib('lib/ws/response');
var request = ctx.getLib('lib/ws/request'); var request = ctx.getLib('lib/ws/request');
var BinStream = ctx.getLib('lib/bss/binarystream_v1_1'); var BinStream = ctx.getLib('lib/bss/binarystream_v1_1');
var ObjId = ctx.getLib('lib/bss/objid'); var ObjId = ctx.getLib('lib/bss/objid');
var BSData = ctx.getLib('lib/model/bsdata');
router.get('/:id',function (req, res) { router.get('/:id',function (req, res) {
var reqHelper = request.create(req); var reqHelper = request.create(req);
...@@ -111,8 +112,15 @@ function output(resp,obj,opt) ...@@ -111,8 +112,15 @@ function output(resp,obj,opt)
function obj_out(resp,obj,opt) function obj_out(resp,obj,opt)
{ {
var ret = {"_id" : (new ObjId(obj.header.ID)).toString(), var ret = {"_id" : (new ObjId(obj.header.ID)).toString(),
"meta" : obj.meta, "meta" : obj.meta
"data" : obj.data }
if(obj.header.TY==BinStream.BINARY_TYPE)
{
var bs = BSData.create(obj.data);
ret.data = bs.serialize('object-encoded');
}else{
ret.data = obj.data;
} }
resp.responseOK(ret); resp.responseOK(ret);
} }
...@@ -129,7 +137,6 @@ function data_out(resp,obj,opt) ...@@ -129,7 +137,6 @@ function data_out(resp,obj,opt)
} }
resp.response.send(obj.data); resp.response.send(obj.data);
//resp.endOK();
}else if(objType == BinStream.STRING_TYPE){ }else if(objType == BinStream.STRING_TYPE){
resp.response.send(obj.data); resp.response.send(obj.data);
}else{ }else{
......
...@@ -12,6 +12,7 @@ var response = ctx.getLib('lib/ws/response'); ...@@ -12,6 +12,7 @@ var response = ctx.getLib('lib/ws/response');
var request = ctx.getLib('lib/ws/request'); var request = ctx.getLib('lib/ws/request');
var BinStream = ctx.getLib('lib/bss/binarystream_v1_1'); var BinStream = ctx.getLib('lib/bss/binarystream_v1_1');
var ObjId = ctx.getLib('lib/bss/objid'); var ObjId = ctx.getLib('lib/bss/objid');
var BSData = ctx.getLib('lib/model/bsdata');
router.get('/:id/stats',function (req, res) { router.get('/:id/stats',function (req, res) {
var reqHelper = request.create(req); var reqHelper = request.create(req);
...@@ -66,8 +67,12 @@ router.get('/:id/objects',function (req, res) { ...@@ -66,8 +67,12 @@ router.get('/:id/objects',function (req, res) {
var from_seq = 1; var from_seq = 1;
var limit = 0; var limit = 0;
var sizelimit = 20 * 1000 * 1000; var sizelimit = 64 * 1000 * 1000;
//compat with v1
// param => offset
// param => obj_after
if(query.offset){query.obj_after=query.offset;}
if(query.obj_after){ if(query.obj_after){
var o_seq; var o_seq;
try{ try{
...@@ -79,18 +84,28 @@ router.get('/:id/objects',function (req, res) { ...@@ -79,18 +84,28 @@ router.get('/:id/objects',function (req, res) {
from_seq = o_seq+1; from_seq = o_seq+1;
} }
// param => limit
if(query.limit){ if(query.limit){
limit = Number(query.limit); limit = Number(query.limit);
} }
// param => sizelimit
if(query.sizelimit){ if(query.sizelimit){
sizelimit = Number(query.sizelimit) * 1000 * 1000; sizelimit = Number(query.sizelimit) * 1000 * 1000;
} }
// param => output = [object],stream
var output_type = (query.output)?query.output:'object';
//compat with v1
// param => from
// param => seq_from
if(query.from){query.seq_from = query.from;}
if(query.seq_from){ if(query.seq_from){
from_seq = Number(query.seq_from); from_seq = Number(query.seq_from);
} }
// param => field = id,meta,[data]
var objOpt = {'meta':true,'data':true} var objOpt = {'meta':true,'data':true}
if(query.field == 'id'){ if(query.field == 'id'){
objOpt.meta = false; objOpt.meta = false;
...@@ -99,6 +114,9 @@ router.get('/:id/objects',function (req, res) { ...@@ -99,6 +114,9 @@ router.get('/:id/objects',function (req, res) {
objOpt.data = false; objOpt.data = false;
} }
// param => last
var tail_no = query.last;
fs.exists(bss_full_path,function(exists){ fs.exists(bss_full_path,function(exists){
if(exists){ if(exists){
...@@ -106,10 +124,12 @@ router.get('/:id/objects',function (req, res) { ...@@ -106,10 +124,12 @@ router.get('/:id/objects',function (req, res) {
BinStream.open(bss_full_path,function(err,bss){ BinStream.open(bss_full_path,function(err,bss){
var rd = bss.reader(); var rd = bss.reader();
var rec_count = rd.count(); var rec_count = rd.count();
if(query.last){
var last_count=Number(query.last); if(tail_no){
var last_count=Number(tail_no);
from_seq = (rec_count - last_count) + 1; from_seq = (rec_count - last_count) + 1;
} }
if(from_seq<1){from_seq=1;} if(from_seq<1){from_seq=1;}
var idx = from_seq; var idx = from_seq;
...@@ -120,22 +140,21 @@ router.get('/:id/objects',function (req, res) { ...@@ -120,22 +140,21 @@ router.get('/:id/objects',function (req, res) {
rd.moveTo(idx); rd.moveTo(idx);
//start stream response //start stream response
res.type('application/json'); stream_start(respHelper,output_type);
res.write('[');
var resultIdx=0; var resultIdx=0;
var counter=0; var counter=0;
async.whilst( async.whilst(
function() { return cont; }, function() { return cont; },
function(callback){ function(callback){
rd.nextObject(function(err,obj){ rd.nextObject(function(err,obj){
idx++; if(idx > rec_count || !obj){
if(!obj){
cont=false; cont=false;
}else{ }else{
idx++;
var dataout = JSON.stringify(obj_out(obj,objOpt)); var dataout = JSON.stringify(obj_out(obj,objOpt));
if(resultIdx>0){res.write(',');} if(resultIdx>0){stream_newrec(respHelper,output_type);}
res.write(dataout); res.write(dataout);
counter+=dataout.length; counter += dataout.length;
if(sizelimit>0 && counter>=sizelimit){ if(sizelimit>0 && counter>=sizelimit){
cont=false; cont=false;
} }
...@@ -147,10 +166,9 @@ router.get('/:id/objects',function (req, res) { ...@@ -147,10 +166,9 @@ router.get('/:id/objects',function (req, res) {
callback(); callback();
}); });
},function(err){ },function(err){
res.write(']'); stream_end(respHelper,output_type);
bss.close(function(err){ bss.close(function(err){
res.status(200).end(); res.status(200).end();
//respHelper.responseOK(obj_return);
}); });
}); });
...@@ -164,13 +182,53 @@ router.get('/:id/objects',function (req, res) { ...@@ -164,13 +182,53 @@ router.get('/:id/objects',function (req, res) {
}); });
function stream_start(resp,type)
{
if(type=='stream')
{
resp.type('text');
}else{
resp.type('application/json');
resp.write('[');
}
}
function stream_newrec(resp,type)
{
if(type=='stream')
{
resp.write('\n');
}else{
resp.write(',');
}
}
function stream_end(resp,type)
{
if(type=='stream')
{
resp.write('');
}else{
resp.write(']');
}
}
function obj_out(obj,opt){ function obj_out(obj,opt){
var ret = { var ret = {
"_id" : (new ObjId(obj.header.ID)).toString() "_id" : (new ObjId(obj.header.ID)).toString()
} }
if(opt.meta){ret.meta = obj.meta;} if(opt.meta){ret.meta = obj.meta;}
if(opt.data){ret.data = obj.data;} if(opt.data){
if(obj.header.TY==BinStream.BINARY_TYPE)
{
var bs = BSData.create(obj.data);
ret.data = bs.serialize('object-encoded');
}else{
ret.data = obj.data;
}
}
return ret return ret
} }
......
var ctx = require('../../../context');
var express = require('express');
var router = express.Router();
var path = require('path');
var fs = require('fs');
var async = require('async');
var Worker = require("tiny-worker");
var cfg = ctx.config;
var storage_cfg = cfg.storage;
var response = ctx.getLib('lib/ws/response');
var request = ctx.getLib('lib/ws/request');
var BinStream = ctx.getLib('lib/bss/binarystream_v1_1');
var ObjId = ctx.getLib('lib/bss/objid');
var BSData = ctx.getLib('lib/model/bsdata');
router.get('/:id/stats',function (req, res) {
var reqHelper = request.create(req);
var respHelper = response.create(res);
var sid = req.params.id;
if(!sid){
return respHelper.response404();
}
var storage_path = sid.split('.').join('/');
var bss_full_path = storage_cfg.repository + "/" + storage_path + ".bss";
fs.exists(bss_full_path,function(exists){
if(exists){
var fstat = stats = fs.statSync(bss_full_path);
BinStream.open(bss_full_path,function(err,bss){
var rd = bss.reader();
var obj_stat = {
"storagename" : sid,
"count" : rd.count(),
"filename" : storage_path + ".bss",
"filesize" : fstat.size
}
bss.close(function(err){
respHelper.responseOK(obj_stat);
});
});
}else{
respHelper.response404();
}
});
});
router.get('/:id/objects',function (req, res) {
var reqHelper = request.create(req);
var respHelper = response.create(res);
var sid = req.params.id;
var query = reqHelper.getQuery();
if(!query){query={};}
if(!sid){
return respHelper.response404();
}
var storage_path = sid.split('.').join('/');
var bss_full_path = storage_cfg.repository + "/" + storage_path + ".bss";
var from_seq = 1;
var limit = 0;
var sizelimit = 64 * 1000 * 1000;
//compat with v1
// param => offset
// param => obj_after
if(query.offset){query.obj_after=query.offset;}
if(query.obj_after){
var o_seq;
try{
var obj_id = new ObjId(query.obj_after);
o_seq = obj_id.extract().seq;
}catch(err){
return respHelper.response404();
}
from_seq = o_seq+1;
}
// param => limit
if(query.limit){
limit = Number(query.limit);
}
// param => sizelimit
if(query.sizelimit){
sizelimit = Number(query.sizelimit) * 1000 * 1000;
}
// param => output = [object],stream
var output_type = (query.output)?query.output:'object';
//compat with v1
// param => from
// param => seq_from
if(query.from){query.seq_from = query.from;}
if(query.seq_from){
from_seq = Number(query.seq_from);
}
// param => field = id,meta,[data]
var objOpt = {'meta':true,'data':true}
if(query.field == 'id'){
objOpt.meta = false;
objOpt.data = false;
}else if(query.field == 'meta'){
objOpt.data = false;
}
// param => last
var tail_no = query.last;
var rd_prm = {
'bss_full_path' : bss_full_path,
'tail_no' : tail_no,
'from_seq' : from_seq,
'limit' : limit,
'output_type' : output_type,
'objOpt' : objOpt,
'sizelimit' : sizelimit
}
var worker_pool = req.context.worker_pool;
var worker = worker_pool.get();
worker.resp = respHelper;
worker.output_type = output_type;
worker.execute({'cmd':'read','prm':rd_prm});
worker.on('start',function(data){
stream_start();
});
var firstline=true;
worker.on('data',function(data){
if(!firstline){
stream_newrec();
}else{firstline = false;}
stream_data(data);
});
worker.on('end',function(code){
if(code == '404')
{
end(404);
}else if(code == '200'){
stream_end();
end(200);
}
worker.shutdown();
//worker_pool.push(worker);
});
function stream_start()
{
var resp = worker.resp;
var type = worker.output_type;
if(type=='stream')
{
resp.type('text');
}else{
resp.type('application/json');
resp.write('[');
}
}
function stream_newrec()
{
var resp = worker.resp;
var type = worker.output_type;
if(type=='stream')
{
resp.write('\n');
}else{
resp.write(',');
}
}
function stream_data(data)
{
var resp = worker.resp;
resp.write(data);
}
function stream_end()
{
var resp = worker.resp;
var type = worker.output_type;
if(type=='stream')
{
resp.write('');
}else{
resp.write(']');
}
}
function end(code)
{
var resp = worker.resp;
if(code==404){
resp.response404()
}else{
resp.status(code).end();
}
}
});
module.exports = router;
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment