Commit 991e914b authored by Kamron Aroonrua's avatar Kamron Aroonrua 💬

Merge branch 'dev' into 'master'

Dev

See merge request !39
parents dafa1212 979184a2
BIGSTREAM_TAG=test BIGSTREAM_IMG=bigstream:test
BS_SECRET=bigstream-server BS_SECRET=bigstream-server
REDIS_TAG=4 REDIS_TAG=4
PREFIX_NO=19 PREFIX_NO=19
......
#Changelog #Changelog
## [1.2.4] - 2021-06-21
### Added
- STORAGE :: Object Cache
### Changed
- BS :: change bson lib to mongo/bson
- SS :: Direct Idxstore
## [1.2.3] - 2021-05-05 ## [1.2.3] - 2021-05-05
### Update ### Update
- AMQPLib Support Heartbeat Monitoring - AMQPLib Support Heartbeat Monitoring
......
version: '3' version: '3'
services: services:
bigstream: bigstream:
image: "bigstream:${BIGSTREAM_TAG}" image: "${BIGSTREAM_IMG}"
build: ./
container_name: bs_bigstream container_name: bs_bigstream
restart: always restart: always
networks: networks:
......
...@@ -217,11 +217,13 @@ JobTask.prototype.run = function () ...@@ -217,11 +217,13 @@ JobTask.prototype.run = function ()
if(!err){ if(!err){
self.stop(resp) self.stop(resp)
// console.log('***** JOB SUCCESSFULLY DONE *****\n'); // console.log('***** JOB SUCCESSFULLY DONE *****\n');
console.log('[JOB DONE] id=' + job_id + ' ,tr=' + transaction_id + '\tsuccess'); console.log('[JOB DONE] id=' + job_id + ' ,tr=' + transaction_id + '\tSUCCESS');
}else{ }else{
self.stop(err) self.stop(err)
// console.log('***** JOB UNSUCCESSFULLY DONE *****\n'); // console.log('***** JOB UNSUCCESSFULLY DONE *****\n');
console.log('[JOB DONE] id=' + job_id + ' ,tr=' + transaction_id + '\tunsuccess'); var st = 'unsuccess';
if(err.status){st=String(err.status).toUpperCase()}
console.log('[JOB DONE] id=' + job_id + ' ,tr=' + transaction_id + '\t' + st);
} }
}); });
......
...@@ -61,43 +61,4 @@ RPCCaller.prototype.call = function(req,cb){ ...@@ -61,43 +61,4 @@ RPCCaller.prototype.call = function(req,cb){
} }
} }
/*
function RPCCaller(config)
{
this.config = config;
this.url = config.url;
this.name = config.name || "rpc_queue";
}
RPCCaller.prototype.call = function(req,cb){
var self = this;
amqp.connect(self.url, function(err, conn) {
conn.createChannel(function(err, ch) {
ch.assertQueue('', {exclusive: true}, function(err, q) {
var corr = generateUuid();
ch.consume(q.queue, function(msg) {
if (msg.properties.correlationId == corr) {
var resp = JSON.parse(msg.content.toString());
conn.close();
cb(null,resp);
}
}, {noAck: true});
ch.sendToQueue(self.name,
new Buffer(JSON.stringify(req)),
{ correlationId: corr, replyTo: q.queue });
});
});
});
function generateUuid() {
return Math.random().toString() +
Math.random().toString() +
Math.random().toString();
}
}
*/
module.exports = RPCCaller; module.exports = RPCCaller;
This diff is collapsed.
var randomAccessFile = require('random-access-file');
var async = require('async');
var BSON = require('buffalo');
var ObjectId = BSON.ObjectId;
var fileAccessBuffer = require('./file-access-buffer');
var Root = require('./root');
var ObjectData = require('./objectdata');
var ObjId = require('./objid');
var Oat = require('./oat');
var Reader = require('./reader');
var quickq = require('quickq');
const VERSION = "1.0";
const ROOTSIZE = 256;
const OATSLOT = 10000;
const OATMETASIZE = 100;
const HEADERSIZE = 80;
const STRING_TYPE = 1;
const BINARY_TYPE = 2;
const OBJECT_TYPE = 3;
module.exports.STRING_TYPE = STRING_TYPE;
module.exports.BINARY_TYPE = BINARY_TYPE;
module.exports.OBJECT_TYPE = OBJECT_TYPE;
module.exports.format = function(filename,opt,cb){
var options = {};
if(typeof opt == 'function'){
cb = opt;
}else{
options = opt || {};
}
var prm = {};
if(options.hashnumber && options.hashnumber instanceof Buffer){
prm.FHN = options.hashnumber;
}
var fd = fileAccessBuffer.create(filename,{truncate: true});
var root = new Root(fd);
root.newroot(prm);
root.write(cb);
};
module.exports.open = function(filename,opt,cb){
var options = {};
if(typeof opt == 'function'){
cb = opt;
}else{
options = opt || {};
}
var inst = new Storage(filename,options);
inst.open(cb);
}
function Storage(filename,opt)
{
this.options = opt;
this.filename = filename;
this.write_queue = quickq(io_write,{ concurrency: 1 });
}
Storage.prototype.open = function(cb)
{
var self = this;
if(this.root){return cb(new Error('Already opened'));}
var fileOpt = null;
this.file = fileAccessBuffer.create(this.filename,fileOpt);
self.root = new Root(self.file);
self.root.load(function(err,obj){
cb(err,self);
});
// this.file.open(function(err){
//
// self.root = new Root(self.file);
// self.root.load(function(err,obj){
// cb(err,self);
// });
//
// });
}
Storage.prototype.write = function(data,opt,cb)
{
var prm = {'data':data,'opt':opt,'self':this}
this.write_queue.push(prm,cb);
}
var io_write = function(prm,cb)
{
var data = prm.data;
var opt = prm.opt;
var self = prm.self;
var options = {};
if(typeof opt == 'function'){
cb = opt;
}else{
options = opt || {};
}
var rootData = self.root.data;
var nSeq = rootData.SEQ+1;
var nFhn = rootData.FHN;
var oid = new ObjId({'fhn':nFhn,'seq':nSeq});
var objData = ObjectData.createByData(self.file,{
id:oid.bytes,
data:data,
meta:options.meta
})
if(!objData){
return cb(new Error('Data error'));
}
var slotNo = rootData.SEQ%rootData.OATZ;
async.waterfall(
[
function(callback){
//make oat buffer
if(rootData.SEQ == 0){
//First OAT
self.lastOat = Oat.create(self.file,
{
'address':rootData.AOF,
'SEQ':1,
});
rootData.OATA = self.lastOat.address;
rootData.AOF = rootData.AOF + self.lastOat.getSize();
self.lastOat.writeMeta(function(err){
callback(err,true);
})
}else if(!self.lastOat){
//load last Oat
Oat.load(self.file,rootData.OATA,function(err,oat){
self.lastOat = oat;
callback(err,false);
})
}else{
callback(null,false);
}
///end make oat
},
function(first,callback){
//new oat table
if(!first && slotNo==0){
var nextOat = Oat.create(self.file,
{
'address':rootData.AOF,
'SEQ':self.lastOat.oatmeta.SEQ + 1,
'PREV':self.lastOat.address
});
rootData.OATA = nextOat.address;
rootData.AOF = rootData.AOF + nextOat.getSize();
self.lastOat.setNextAddr(nextOat.address);
nextOat.writeMeta(function(err){
if(err){
callback(err);
}else{
self.lastOat.writeMeta(function(err){
self.lastOat = nextOat;
callback(err);
})
}
})
}else{
callback(null);
}
},
function(callback){
//write oat slot
objData.setAddress(rootData.AOF);
self.lastOat.writeSlot(slotNo,objData.getHeader(),function(err){
callback(err);
});
},
function(callback){
//update root
rootData.AOF = rootData.AOF + objData.getObjectSize();
rootData.SEQ = nSeq;
self.root.write(function(err){
callback(err);
})
},
function(callback){
//write data
objData.write(function(err){
callback(err,objData);
});
}
],
function(err){
cb(err,objData);
}
);
}
Storage.prototype.reader = function(prm){
var prm = prm || {};
var rd = new Reader(this.file,this.root);
return rd;
}
Storage.prototype.close = function(cb){
this.write_queue.pause( )
this.file.close(cb);
};
var thunky = require('thunky');
var randomAccessFile = require('random-access-file');
var fs = require("fs");
var async = require('async');
var BSON = require('buffalo');
var BUFFERSIZE = 1*1024*1024;
function FileAccessBuffer(filename,opt){
if (!opt) opt = {};
var self = this;
this.filename = filename;
this.size = opt.size || BUFFERSIZE;
this.buffer = null;
this.file = new randomAccessFile(this.filename,opt);
this.opened = false
}
FileAccessBuffer.prototype.open = function open (cb) {
var self = this;
fs.open(self.filename, 'r', onopen)
function onopen (err, fd) {
if (err) {
return cb(err);
}
self.opened = true;
self.fd = fd;
fs.fstat(fd, function (err, st) {
if (err) {
return cb(err)
}
self.filesize = st.size;
cb();
});
}
}
FileAccessBuffer.prototype.bufferedRead = function (offset, length, cb) {
var self=this;
var readStart = offset;
var readEnd = (offset + length) - 1;
//var ret_buffer = new Buffer(length);
var ret_buffer = Buffer.alloc(length);
if(!self.filesize){
fs.stat(self.filename,function(err,stats){
self.filesize = stats["size"];
do_read();
});
}else{
do_read();
}
function do_read(){
if(self.buffer){
var buffStart = self.buffer.offset;
var buffEnd = (self.buffer.offset + self.buffer.data.length) - 1;
if(readStart >= buffStart && readEnd <= buffEnd){
self.buffer.data.copy(ret_buffer,0,readStart-buffStart,(readStart-buffStart)+length);
setImmediate(function (){
cb(null,ret_buffer);
//cb(null,self.buffer.data.slice(readStart-buffStart,(readStart-buffStart)+length));
});
}else if(readStart >= buffStart && readStart <= buffEnd){
//intersec back
var bytesDiff = (buffEnd-readStart)+1;
self.buffer.data.copy(ret_buffer,0,readStart-buffStart,(readStart-buffStart)+bytesDiff);
var ioffset = buffEnd+1;
var bytesRequire = length-bytesDiff;
var ilength = (self.size>bytesRequire)?self.size:bytesRequire;
if(ioffset+ilength > self.filesize){
ilength = self.filesize - ioffset;
}
self.read(ioffset,ilength,function(err,buff){
//console.log('intersec read');
if(!err){
self.buffer = {
"offset":ioffset,
"data":buff
};
buff.copy(ret_buffer,bytesDiff,0,bytesRequire);
}
cb(err,ret_buffer);
});
}else{
newbuffer(cb);
}
}else{
newbuffer(cb);
}
}
function newbuffer(callback){
var loffset = offset;
var llength = (self.size>length)?self.size:length;
if(loffset+llength > self.filesize){
llength = self.filesize - loffset;
}
self.read(loffset,llength,function(err,buff){
//console.log('new read');
if(!err){
self.buffer = {
"offset":loffset,
"data":buff
};
//try{buff.copy(ret_buffer,0,0,llength);}catch(e){process.exit(1)}
buff.copy(ret_buffer,0,0,llength);
}
callback(err,ret_buffer);
});
}
}
FileAccessBuffer.prototype.read = function (offset, length, cb) {
//console.log('bypss read ');
this.file.read(offset, length, cb);
// var self=this;
// if(!self.fd){return cb(new Error('File not opened'));}
//
// var buf = new Buffer(length)
// var len = 0;
// var bufOfs = 0;
// var readlen = length;
// var fileOfs = offset;
// async.whilst(
// function() { return len < length },
// function(callback) {
// //console.log(self.filename + ' ' + String(len) + ' ' + String(readlen) + ' ' + String(fileOfs));
// fs.read(self.fd, buf, len, readlen, fileOfs, function(err,bytes){
// len+=bytes;
// fileOfs+=bytes;
// readlen-=bytes;
// callback(err);
// });
// },function(err){
// if(err){console.log(err);}
// cb(err,buf);
// }
// );
}
FileAccessBuffer.prototype.write = function (offset, buf, cb) {
this.file.write(offset, buf, cb);
}
FileAccessBuffer.prototype.close = function (cb) {
this.file.close(cb);
}
module.exports.create = function(filename,opt)
{
var inst = new FileAccessBuffer(filename,opt);
return inst;
}
var BSON = require('buffalo');
var async = require('async');
const OATMETASIZE = 64;
const OBJHEADERSIZE = 80;
//OAT Header structure
var oatmeta_struct = function(){
return {
"SEQ" : 0,
"SZ" : 10000,
"PREV" : null,
"NEXT" : null,
}
}
var create = module.exports.create = function(fd,prm,opt){
var oatmeta = {
"SEQ" : 0,
"SZ" : 10000,
"PREV" : null,
"NEXT" : null,
};
oatmeta.SEQ = (prm.SEQ)?prm.SEQ:1;
oatmeta.SZ = (prm.SZ)?prm.SZ:10000;
oatmeta.PREV = (prm.PREV)?prm.PREV:null;
oatmeta.NEXT = (prm.NEXT)?prm.NEXT:null;
var address = prm.address;
return new Oat(fd,address,oatmeta,opt);
};
module.exports.load = function(fd,address,opt,cb){
var options = {};
if(typeof opt == 'function'){
cb = opt;
}else{
options = opt || {};
}
var inst = new Oat(fd,address,null,options);
inst.loadOat(function(err){
cb(err,inst)
});
}
function Oat(fd,addr,meta,opt){
this.options = opt || {};
this.file = fd;
this.slotbuffer = null;
this.address = addr;
this.oatmeta = meta;
}
Oat.prototype.setNextAddr = function(addr){
this.oatmeta.NEXT = addr;
}
Oat.prototype.loadOat = function(cb){
var self=this;
self.file.read(self.address, OATMETASIZE, function(err, buffer) {
var oatmeta = null;
if(!err){
oatmeta = BSON.parse(buffer);
self.oatmeta = oatmeta
}
cb(err);
});
}
Oat.prototype.oatAt = function(seq,cb){
var self=this;
var curSeq = this.oatmeta.SEQ;
var curOat = this;
if(seq>curSeq){
//forward
async.whilst(
function() { return ((curOat!=null) && (seq>curSeq)); },
function(callback) {
curOat.nextOat(function(err,oat){
curOat = oat;
if(curOat){
curSeq = oat.oatmeta.SEQ;
callback(err,curOat);
}else{
callback(null,null);
}
});
},
function (err, oat) {
cb(err,oat);
}
);
}else if(seq<curSeq){
//backward
async.whilst(
function() { return ((curOat!=null) && (seq<curSeq)); },
function(callback) {
curOat.prevOat(function(err,oat){
curOat = oat;
if(curOat){
curSeq = oat.oatmeta.SEQ;
callback(err,curOat);
}else{
callback(null,null);
}
});
},
function (err, oat) {
cb(err,oat);
}
);
}else{
callback(null,curOat);
}
}
Oat.prototype.nextOat = function(cb){
if(!this.oatmeta.NEXT){
return cb(null,null);
}
var inst = new Oat(this.file,this.oatmeta.NEXT);
inst.readMeta(function(err){
if(!err){
cb(null,inst);
}else{
cb(err);
}
})
}
Oat.prototype.prevOat = function(cb){
if(!this.oatmeta.PREV){
return cb(null,null);
}
var inst = new Oat(this.file,this.oatmeta.PREV);
inst.readMeta(function(err){
if(!err){
cb(null,inst);
}else{
cb(err);
}
})
}
Oat.prototype.readMeta = function(cb){
var self = this;
this.file.read(this.address, OATMETASIZE, function(err, buffer) {
if(!err){
self.oatmeta = BSON.parse(buffer);
}
cb(err);
});
}
Oat.prototype.writeMeta = function(cb){
//var buffer = new Buffer(OATMETASIZE);
var buffer = Buffer.alloc(OATMETASIZE);
BSON.serialize(this.oatmeta,buffer);
this.file.write(this.address,buffer,cb);
}
Oat.prototype.writeSlot = function(index,data,cb){
if(index>=this.oatmeta.SZ){
return cb(new Error("Index out of bound"));
}
var slotAddr = this.address + OATMETASIZE + (index*OBJHEADERSIZE);
//var buffer = new Buffer(OBJHEADERSIZE);
var buffer = Buffer.alloc(OBJHEADERSIZE);
BSON.serialize(data,buffer);
this.file.write(slotAddr,buffer,cb);
}
Oat.prototype.readSlot = function(index,opt,cb){
self=this;
var options = {};
if(typeof opt == 'function'){
cb = opt;
}else{
options = opt || {};
}
//options.nobuffer = true;
if(index>=self.oatmeta.SZ){
return cb(new Error("Index out of bound"));
}
var slotStart = self.address + OATMETASIZE;
var slotOffset = index*OBJHEADERSIZE;
var slotAddr = slotStart + slotOffset;
if(self.slotbuffer){
var buff = self.slotbuffer.slice(slotOffset,slotOffset+OBJHEADERSIZE);
var objbuff = BSON.parse(buff);
setImmediate(function (){
cb(null,objbuff);
// if(!objbuff.AD){console.log('ERRR');console.log(index); console.log(slotOffset);console.log(self.slotbuffer.slice(550560,550570));console.log('ERRRRR');}
// cb(null,objbuff);
//cb(null,BSON.parse(buff));
});
}else{
var readstart = (options.nobuffer)?slotAddr:slotStart;
var readlen = (options.nobuffer)?OBJHEADERSIZE:(OBJHEADERSIZE * self.oatmeta.SZ);
//console.log(String(readstart) + "," + String(self.address));
//console.log('Read ' + self.file.fd + ' at ' + String(readstart) + ' index ' + index );
self.file.read(readstart, readlen, function(err, rdbuffer) {
//console.log(self.file.filename);
//console.log('Read ' + self.file.fd + ' at ' + String(readstart) + ' index ' + index );
if(options.nobuffer){
cb(err,BSON.parse(rdbuffer));
}else{
self.slotbuffer = rdbuffer;
//console.log(self.slotbuffer.slice(550560,550570));
var slicebuff = self.slotbuffer.slice(slotOffset,slotOffset+OBJHEADERSIZE);
var objret = BSON.parse(slicebuff);
cb(err,objret);
}
});
}
}
Oat.prototype.getSize = function(){
return OATMETASIZE + (this.oatmeta.SZ * OBJHEADERSIZE);
}
var BSON = require('buffalo');
// var bsonp = require('bson')
// var BSONP = new bsonp.BSONPure.BSON()
const OBJHEADERSIZE = 80;
const STRING_TYPE = 1;
const BINARY_TYPE = 2;
const OBJECT_TYPE = 3;
//Header structure
var header_struct = function(){
// return {
// "ID" : null,
// "TY" : 1,
// "FG" : null,
// "MZ" : 0,
// "DZ" : 0,
// "AD" : 0
// }
return JSON.parse(JSON.stringify({
"ID" : null,
"TY" : 1,
"FG" : null,
"MZ" : 0,
"DZ" : 0,
"AD" : 0
}));
}
function ObjectData(fd,header,meta,data){
this.file = fd;
this.header = header;
//Buffer
this.metaBuffer = meta;
this.dataBuffer = data;
}
ObjectData.prototype.getHeader = function(){
return this.header;
}
ObjectData.prototype.getObjectSize = function(){
return this.header.MZ + this.header.DZ;
}
ObjectData.prototype.setAddress = function(addr){
return this.header.AD = addr;
}
ObjectData.prototype.readMeta = function(opt,cb){
self=this;
var options = {};
if(typeof opt == 'function'){
cb = opt;
}else{
options = opt || {};
}
var fd=this.file;
var oStart = this.header.AD;
var oLen = this.header.MZ;
fd.bufferedRead(oStart,oLen,function(err,buff){
if(!err){
//cb(null,BSON.parse(buff));
cb(null,safe_parse_bson(buff));
}else{
cb(err);
}
});
}
ObjectData.prototype.readData = function(opt,cb){
var self=this;
var options = {};
if(typeof opt == 'function'){
cb = opt;
}else{
options = opt || {};
}
var fd=this.file;
var oStart = this.header.AD + this.header.MZ;
var oLen = this.header.DZ;
fd.bufferedRead(oStart,oLen,function(err,buff){
if(!err){
cb(null,parse_data(buff,self.header.TY));
}else{
cb(err);
}
});
}
ObjectData.prototype.readObject = function(opt,cb){
var self=this;
var options = {};
if(typeof opt == 'function'){
cb = opt;
}else{
options = opt || {};
}
var fd=this.file;
var oStart = this.header.AD;
var oLen = this.header.MZ + this.header.DZ;
fd.bufferedRead(oStart,oLen,function(err,buff){
if(!err){
var obj = {
header : self.header,
//meta : BSON.parse(buff.slice(0,self.header.MZ)),
meta : safe_parse_bson(buff.slice(0,self.header.MZ)),
data : parse_data(buff.slice(self.header.MZ,self.header.MZ+self.header.DZ),self.header.TY),
}
cb(null,obj);
}else{
cb(err);
}
});
}
var parse_data = function(buffer,ty){
if(ty==STRING_TYPE){
return buffer.toString('utf8');
}else if(ty==OBJECT_TYPE){
//return BSON.parse(buffer);
return safe_parse_bson(buffer);
}else{
return buffer;
}
}
var safe_parse_bson = function(buffer)
{
var ret={};
try{
ret=BSON.parse(buffer);
}catch(err){
ret={};
}
return ret;
}
ObjectData.prototype.write = function(addr,cb){
var address = this.header.AD;
if(typeof addr == 'function'){
cb = addr;
}else if(Number(addr)>0){
address = addr;
this.header.AD = address;
}
var fd=this.file;
//var objBuffer = new Buffer(this.header.MZ + this.header.DZ)
var objBuffer = Buffer.alloc(this.header.MZ + this.header.DZ)
if(this.header.MZ>0){
this.metaBuffer.copy(objBuffer);
}
this.dataBuffer.copy(objBuffer,this.header.MZ)
fd.write(this.header.AD,objBuffer,function(err){
cb(err);
})
}
module.exports.HEADERSIZE = OBJHEADERSIZE;
module.exports.createByHeader = function(fd,header){
return new ObjectData(fd,header,null,null);
}
module.exports.createByData = function(fd,prm){
var id = prm.id;
var data = prm.data;
var meta = prm.meta;
var address = (prm.address)?prm.address:0;
var header = header_struct();
var metaBuffer = null;
var dataBuffer = null;
//data
switch (typeof data){
case 'string':
header.TY = STRING_TYPE;
header.DZ = Buffer.byteLength(data, 'utf8');
//dataBuffer = new Buffer(data);
dataBuffer = Buffer.from(data,'utf8');
break;
case 'object':
if(data instanceof Buffer){
header.TY = BINARY_TYPE;
header.DZ = data.length;
dataBuffer = data;
}else{
var objData = BSON.serialize(data);
header.TY = OBJECT_TYPE;
header.DZ = objData.length
dataBuffer = objData;
}
break;
default :
return null;
}
if(meta){
metaBuffer = BSON.serialize(meta);
header.MZ = metaBuffer.length;
}
header.ID = id;
header.AD = address
return new ObjectData(fd,header,metaBuffer,dataBuffer);
}
module.exports = ObjId;
function ObjId(prm){
if (Buffer.isBuffer(prm)) {
if (prm.length != 12){
throw new Error("Buffer-based ObjectId must be 12 bytes")
}
this.bytes = prm
}else if(typeof prm == 'object'){
//var fhn = (Buffer.isBuffer(prm.fhn) )?prm.fhn:new Buffer(4);
var fhn = (Buffer.isBuffer(prm.fhn) )?prm.fhn:Buffer.alloc(4);
var seq = prm.seq || 1;
var ts = (prm.ts)?prm.ts:(Date.now() / 1000) & 0xFFFFFFFF;
seq = seq & 0xFFFFFFFF;
// this.bytes = new Buffer([
// fhn[0],
// fhn[1],
// fhn[2],
// fhn[3],
// seq>>24,
// seq>>16,
// seq>>8,
// seq,
// ts>>24,
// ts>>16,
// ts>>8,
// ts
// ]);
this.bytes = Buffer.from([
fhn[0],
fhn[1],
fhn[2],
fhn[3],
seq>>24,
seq>>16,
seq>>8,
seq,
ts>>24,
ts>>16,
ts>>8,
ts
]);
}else if (typeof prm == 'string') {
if (prm.length != 24) throw new Error("String-based ObjectId must be 24 bytes")
if (!/^[0-9a-f]{24}$/i.test(prm)) throw new Error("String-based ObjectId must in hex-format:" + prm)
this.bytes = fromHex(prm)
}
}
ObjId.prototype.extract = function(){
var e_fhn = this.bytes.slice(0,4);
var e_seq = this.bytes.readUInt32BE(4);
var e_ts = this.bytes.readUInt32BE(8);
return {fhn:e_fhn,seq:e_seq,ts:e_ts};
}
ObjId.prototype.toString = function() {
return toHex(this.bytes)
}
var toHex = function(buffer) {
return buffer.toString('hex')
}
var fromHex = function(string) {
//return new Buffer(string, 'hex')
return Buffer.from(string, 'hex')
}
var async = require('async');
var Root = require('./root');
var ObjectData = require('./objectdata');
var ObjId = require('./objid');
var Oat = require('./oat');
module.exports = Reader;
function Reader(fd,root){
this.file = fd;
this.root = root;
this.rootData = root.getRoot();
this.cursorIdx = 0;
this.oat = null;
}
Reader.prototype.moveTo = function(idx){
var rootData = this.rootData;
if(idx>0 && idx <= rootData.SEQ){
this.cursorIdx = idx-1;
return true;
}else{
return false;
}
}
Reader.prototype.next = function(cb){
var self=this;
this.readAt(++this.cursorIdx,function(err,obj){
if(!err && obj){
cb(null,obj)
}else{
cb(err,null);
}
});
}
Reader.prototype.nextObject = function(cb){
var self=this;
this.readAt(++this.cursorIdx,function(err,obj){
if(!err && obj){
//cb(null,{})
obj.readObject(cb);
}else{
cb(err,null);
}
});
}
Reader.prototype.objectAt = function(seq,opt,cb){
var options = {};
if(typeof opt == 'function'){
cb = opt;
}else{
options = opt || {};
}
this.readAt(seq,options,function(err,obj){
if(!err && obj){
obj.readObject(cb);
}else{
cb(err,null);
}
});
}
Reader.prototype.readAt = function(seq,opt,cb){
var self = this;
var options = {};
if(typeof opt == 'function'){
cb = opt;
}else{
options = opt || {};
}
var rootData = this.rootData;
if(seq<=0 || seq > rootData.SEQ){
return cb(new Error('unavailable'));
}
var oatNo = Math.ceil(seq / rootData.OATZ);
var slotNo = (seq-1)%rootData.OATZ;
async.waterfall([
function(callback) {
//load last oat
if(self.oat == null){
Oat.load(self.file,rootData.OATA,function(err,oat){
self.oat = oat;
if(oat){
callback(null);
}else{
callback(new Error('Oat Error'));
}
});
}else{
callback(null);
}
},
function(callback){
//move to current oat
if(self.oat.oatmeta.SEQ==oatNo){
callback(null);
}else{
self.oat.oatAt(oatNo,function(err,oat){
if(oat && oat.oatmeta.SEQ == oatNo){
self.oat = oat;
callback(null);
}else{
callback(new Error('Oat Error'));
}
});
}
},
function(callback){
//getSlot
self.oat.readSlot(slotNo,{nobuffer:options.nobuffer},function(err,slot){
callback(err,slot);
});
}
], function (err,slot) {
var obj = ObjectData.createByHeader(self.file,slot);
cb(err,obj);
});
}
Reader.prototype.remain = function(){
var rootData = this.root.getRoot();
return rootData.SEQ - this.cursorIdx;
}
Reader.prototype.count = function(){
var rootData = this.root.getRoot();
return rootData.SEQ;
}
var BSON = require('buffalo');
const ROOTSIZE = 256;
const VERSION = "1.0";
const OATSIZE = 10000;
var root_struct = function (){
// return {
// "VER":VERSION,
// "FHN":new Buffer(4),
// "SEQ":0,
// "OATA":0,
// "OATZ":OATSIZE,
// "AOF":ROOTSIZE
// };
return JSON.parse(JSON.stringify({
"VER":VERSION,
"FHN":Buffer.alloc(4),
"SEQ":0,
"OATA":0,
"OATZ":OATSIZE,
"AOF":ROOTSIZE
}));
}
module.exports = Root;
function Root(fd,opt)
{
this.options = opt || {};
this.file = fd;
this.data = null;
}
Root.prototype.setVal = function(name,val){
this.data[name] = val;
}
Root.prototype.getVal = function(name){
return this.data[name] = val;
}
Root.prototype.getRoot = function(){
return this.data;
}
Root.prototype.newroot = function(prm){
prm = prm || {};
this.data = root_struct();
if(prm.FHN){
this.data.FHN = prm.FHN;
}
}
Root.prototype.load = function(cb){
var self = this;
this.file.read(0, ROOTSIZE-1, function(err, buffer) {
if(!err){
var objroot = BSON.parse(buffer);
self.data = objroot;
}
cb(err,objroot);
});
}
Root.prototype.write = function(cb){
//var buffer = new Buffer(ROOTSIZE);
var buffer = Buffer.alloc(ROOTSIZE);
BSON.serialize(this.data,buffer);
this.file.write(0,buffer,cb);
}
var async = require('async');
var BSON = require('buffalo');
var ObjId = BSON.ObjectId;
var Oat = require('./oat');
var ObjectData = require('./objectdata');
var ObjId = require('./objid');
module.exports = Writer;
function Writer(fd,root,opt){
this.file = fd;
this.root = root;
}
Writer.prototype.write = function(data,opt,cb){
var self = this;
var options = {};
if(typeof opt == 'function'){
cb = opt;
}else{
options = opt || {};
}
var nSeq = ++this.root.SEQ;
var nFhn = this.root.FHN;
var id = new ObjId({'fhn':nFhn,'seq':nSeq})
console.log(id.toString());
}
This diff is collapsed.
var randomAccessFile = require('random-access-file'); var randomAccessFile = require('random-access-file');
var async = require('async'); var async = require('async');
var BSON = require('buffalo');
var ObjectId = BSON.ObjectId;
var fileAccessBuffer = require('./file-access-buffer'); var fileAccessBuffer = require('./file-access-buffer');
var Root = require('./root'); var Root = require('./root');
...@@ -76,14 +74,7 @@ Storage.prototype.open = function(cb) ...@@ -76,14 +74,7 @@ Storage.prototype.open = function(cb)
self.root.load(function(err,obj){ self.root.load(function(err,obj){
cb(err,self); cb(err,self);
}); });
// this.file.open(function(err){
//
// self.root = new Root(self.file);
// self.root.load(function(err,obj){
// cb(err,self);
// });
//
// });
} }
Storage.prototype.write = function(data,opt,cb) Storage.prototype.write = function(data,opt,cb)
......
const BSON = require('bson')
module.exports.parse = function (buff) {
return BSON.deserialize(buff,{allowObjectSmallerThanBufferSize:true,promoteBuffers:true})
}
module.exports.serialize = function (obj,buff) {
if(buff){
BSON.serializeWithBufferAndIndex(obj,buff)
return buff
}else{
return BSON.serialize(obj)
}
}
module.exports.deserialize = function (buff,opt) {
return BSON.deserialize(buff,opt)
}
\ No newline at end of file
...@@ -2,7 +2,6 @@ var thunky = require('thunky'); ...@@ -2,7 +2,6 @@ var thunky = require('thunky');
var randomAccessFile = require('random-access-file'); var randomAccessFile = require('random-access-file');
var fs = require("fs"); var fs = require("fs");
var async = require('async'); var async = require('async');
var BSON = require('buffalo');
var BUFFERSIZE = 1*1024*1024; var BUFFERSIZE = 1*1024*1024;
function FileAccessBuffer(filename,opt){ function FileAccessBuffer(filename,opt){
...@@ -46,7 +45,8 @@ FileAccessBuffer.prototype.bufferedRead = function (offset, length, cb) { ...@@ -46,7 +45,8 @@ FileAccessBuffer.prototype.bufferedRead = function (offset, length, cb) {
var self=this; var self=this;
var readStart = offset; var readStart = offset;
var readEnd = (offset + length) - 1; var readEnd = (offset + length) - 1;
var ret_buffer = new Buffer(length); //var ret_buffer = new Buffer(length);
var ret_buffer = Buffer.alloc(length);
if(!self.filesize){ if(!self.filesize){
fs.stat(self.filename,function(err,stats){ fs.stat(self.filename,function(err,stats){
......
var BSON = require('buffalo'); //var BSON = require('buffalo');
var BSON = require('./bsbson');
var async = require('async'); var async = require('async');
const OATMETASIZE = 64; const OATMETASIZE = 64;
...@@ -164,7 +165,8 @@ Oat.prototype.readMeta = function(cb){ ...@@ -164,7 +165,8 @@ Oat.prototype.readMeta = function(cb){
} }
Oat.prototype.writeMeta = function(cb){ Oat.prototype.writeMeta = function(cb){
var buffer = new Buffer(OATMETASIZE); //var buffer = new Buffer(OATMETASIZE);
var buffer = Buffer.alloc(OATMETASIZE);
BSON.serialize(this.oatmeta,buffer); BSON.serialize(this.oatmeta,buffer);
this.file.write(this.address,buffer,cb); this.file.write(this.address,buffer,cb);
} }
...@@ -174,7 +176,8 @@ Oat.prototype.writeSlot = function(index,data,cb){ ...@@ -174,7 +176,8 @@ Oat.prototype.writeSlot = function(index,data,cb){
return cb(new Error("Index out of bound")); return cb(new Error("Index out of bound"));
} }
var slotAddr = this.address + OATMETASIZE + (index*OBJHEADERSIZE); var slotAddr = this.address + OATMETASIZE + (index*OBJHEADERSIZE);
var buffer = new Buffer(OBJHEADERSIZE); //var buffer = new Buffer(OBJHEADERSIZE);
var buffer = Buffer.alloc(OBJHEADERSIZE);
BSON.serialize(data,buffer); BSON.serialize(data,buffer);
this.file.write(slotAddr,buffer,cb); this.file.write(slotAddr,buffer,cb);
} }
......
var BSON = require('buffalo'); //var BSON = require('buffalo');
// var bsonp = require('bson') // var bsonp = require('bson')
// var BSONP = new bsonp.BSONPure.BSON() // var BSONP = new bsonp.BSONPure.BSON()
var BSON = require('./bsbson')
const OBJHEADERSIZE = 80; const OBJHEADERSIZE = 80;
...@@ -10,14 +11,22 @@ const OBJECT_TYPE = 3; ...@@ -10,14 +11,22 @@ const OBJECT_TYPE = 3;
//Header structure //Header structure
var header_struct = function(){ var header_struct = function(){
return { // return {
// "ID" : null,
// "TY" : 1,
// "FG" : null,
// "MZ" : 0,
// "DZ" : 0,
// "AD" : 0
// }
return JSON.parse(JSON.stringify({
"ID" : null, "ID" : null,
"TY" : 1, "TY" : 1,
"FG" : null, "FG" : null,
"MZ" : 0, "MZ" : 0,
"DZ" : 0, "DZ" : 0,
"AD" : 0 "AD" : 0
} }));
} }
...@@ -151,7 +160,8 @@ ObjectData.prototype.write = function(addr,cb){ ...@@ -151,7 +160,8 @@ ObjectData.prototype.write = function(addr,cb){
} }
var fd=this.file; var fd=this.file;
var objBuffer = new Buffer(this.header.MZ + this.header.DZ) //var objBuffer = new Buffer(this.header.MZ + this.header.DZ)
var objBuffer = Buffer.alloc(this.header.MZ + this.header.DZ)
if(this.header.MZ>0){ if(this.header.MZ>0){
this.metaBuffer.copy(objBuffer); this.metaBuffer.copy(objBuffer);
} }
...@@ -183,7 +193,8 @@ module.exports.createByData = function(fd,prm){ ...@@ -183,7 +193,8 @@ module.exports.createByData = function(fd,prm){
case 'string': case 'string':
header.TY = STRING_TYPE; header.TY = STRING_TYPE;
header.DZ = Buffer.byteLength(data, 'utf8'); header.DZ = Buffer.byteLength(data, 'utf8');
dataBuffer = new Buffer(data); //dataBuffer = new Buffer(data);
dataBuffer = Buffer.from(data,'utf8');
break; break;
case 'object': case 'object':
if(data instanceof Buffer){ if(data instanceof Buffer){
......
...@@ -7,13 +7,28 @@ function ObjId(prm){ ...@@ -7,13 +7,28 @@ function ObjId(prm){
} }
this.bytes = prm this.bytes = prm
}else if(typeof prm == 'object'){ }else if(typeof prm == 'object'){
var fhn = (Buffer.isBuffer(prm.fhn) )?prm.fhn:new Buffer(4); //var fhn = (Buffer.isBuffer(prm.fhn) )?prm.fhn:new Buffer(4);
var fhn = (Buffer.isBuffer(prm.fhn) )?prm.fhn:Buffer.alloc(4);
var seq = prm.seq || 1; var seq = prm.seq || 1;
var ts = (prm.ts)?prm.ts:(Date.now() / 1000) & 0xFFFFFFFF; var ts = (prm.ts)?prm.ts:(Date.now() / 1000) & 0xFFFFFFFF;
seq = seq & 0xFFFFFFFF; seq = seq & 0xFFFFFFFF;
this.bytes = new Buffer([ // this.bytes = new Buffer([
// fhn[0],
// fhn[1],
// fhn[2],
// fhn[3],
// seq>>24,
// seq>>16,
// seq>>8,
// seq,
// ts>>24,
// ts>>16,
// ts>>8,
// ts
// ]);
this.bytes = Buffer.from([
fhn[0], fhn[0],
fhn[1], fhn[1],
fhn[2], fhn[2],
...@@ -52,5 +67,6 @@ var toHex = function(buffer) { ...@@ -52,5 +67,6 @@ var toHex = function(buffer) {
} }
var fromHex = function(string) { var fromHex = function(string) {
return new Buffer(string, 'hex') //return new Buffer(string, 'hex')
return Buffer.from(string, 'hex')
} }
var BSON = require('buffalo'); //var BSON = require('buffalo');
var BSON = require('./bsbson');
const ROOTSIZE = 256; const ROOTSIZE = 256;
const VERSION = "1.0"; const VERSION = "1.0";
const OATSIZE = 10000; const OATSIZE = 10000;
var root_struct = function (){ var root_struct = function (){
return { // return {
// "VER":VERSION,
// "FHN":new Buffer(4),
// "SEQ":0,
// "OATA":0,
// "OATZ":OATSIZE,
// "AOF":ROOTSIZE
// };
return JSON.parse(JSON.stringify({
"VER":VERSION, "VER":VERSION,
"FHN":new Buffer(4), "FHN":Buffer.alloc(4),
"SEQ":0, "SEQ":0,
"OATA":0, "OATA":0,
"OATZ":OATSIZE, "OATZ":OATSIZE,
"AOF":ROOTSIZE "AOF":ROOTSIZE
}; }));
} }
module.exports = Root; module.exports = Root;
...@@ -55,7 +64,8 @@ Root.prototype.load = function(cb){ ...@@ -55,7 +64,8 @@ Root.prototype.load = function(cb){
} }
Root.prototype.write = function(cb){ Root.prototype.write = function(cb){
var buffer = new Buffer(ROOTSIZE); //var buffer = new Buffer(ROOTSIZE);
var buffer = Buffer.alloc(ROOTSIZE);
BSON.serialize(this.data,buffer); BSON.serialize(this.data,buffer);
this.file.write(0,buffer,cb); this.file.write(0,buffer,cb);
} }
var async = require('async'); var async = require('async');
var BSON = require('buffalo'); //var BSON = require('buffalo');
var ObjId = BSON.ObjectId; var BSON = require('./bsbson');
var Oat = require('./oat'); var Oat = require('./oat');
var ObjectData = require('./objectdata'); var ObjectData = require('./objectdata');
......
const BSON = require('bson')
module.exports.parse = function (buff) {
return BSON.deserialize(buff,{allowObjectSmallerThanBufferSize:true,promoteBuffers:true})
}
module.exports.serialize = function (obj,buff) {
if(buff){
BSON.serializeWithBufferAndIndex(obj,buff)
return buff
}else{
return BSON.serialize(obj)
}
}
module.exports.deserialize = function (buff,opt) {
return BSON.deserialize(buff,opt)
}
\ No newline at end of file
var BSON = require('buffalo'); //var BSON = require('buffalo');
var BSON = require('../encode/bsbson');
module.exports.parse = function(obj) module.exports.parse = function(obj)
{ {
...@@ -8,7 +9,8 @@ module.exports.parse = function(obj) ...@@ -8,7 +9,8 @@ module.exports.parse = function(obj)
var bsobj = JSON.parse(obj); var bsobj = JSON.parse(obj);
var data = bsobj.data; var data = bsobj.data;
if(bsobj.data_type=='binary' && bsobj.encoding=='base64'){ if(bsobj.data_type=='binary' && bsobj.encoding=='base64'){
data = new Buffer(bsobj.data,'base64'); //data = new Buffer(bsobj.data,'base64');
data = Buffer.from(bsobj.data,'base64');
} }
return new BSData(data,bsobj.data_type); return new BSData(data,bsobj.data_type);
...@@ -21,7 +23,8 @@ module.exports.parse = function(obj) ...@@ -21,7 +23,8 @@ module.exports.parse = function(obj)
}else if( ((obj.object_type && obj.object_type == 'bsdata')||(obj.type && obj.type == 'bsdata')) && obj.data_type && obj.data){ }else if( ((obj.object_type && obj.object_type == 'bsdata')||(obj.type && obj.type == 'bsdata')) && obj.data_type && obj.data){
var oData = obj.data; var oData = obj.data;
if(obj.encoding == 'base64'){ if(obj.encoding == 'base64'){
oData = new Buffer(obj.data,'base64'); //oData = new Buffer(obj.data,'base64');
oData = Buffer.from(obj.data,'base64');
} }
return new BSData(oData,obj.data_type); return new BSData(oData,obj.data_type);
}else{ }else{
......
{ {
"name": "node-bigstream", "name": "node-bigstream",
"description": "", "description": "",
"version": "1.2.3", "version": "1.2.4",
"main": "./bigstream.js", "main": "./bigstream.js",
"author": { "author": {
"name": "Kamron Aroonrua", "name": "Kamron Aroonrua",
...@@ -14,9 +14,8 @@ ...@@ -14,9 +14,8 @@
"async": "^2.0.1", "async": "^2.0.1",
"axon": "^2.0.3", "axon": "^2.0.3",
"body-parser": "^1.15.2", "body-parser": "^1.15.2",
"bson": "^0.5.6", "bson": "^4.4.0",
"buffalo": "^0.1.3", "dateformat": "^4.5.1",
"dateformat": "^1.0.12",
"dot-prop": "^5.1.0", "dot-prop": "^5.1.0",
"express": "^4.14.0", "express": "^4.14.0",
"express-jwt": "^5.3.1", "express-jwt": "^5.3.1",
...@@ -25,11 +24,12 @@ ...@@ -25,11 +24,12 @@
"minimist": "^1.2.0", "minimist": "^1.2.0",
"moment": "^2.17.1", "moment": "^2.17.1",
"mqtt": "^3.0.0", "mqtt": "^3.0.0",
"node-cache": "^5.1.2",
"node-gyp": "^3.6.2", "node-gyp": "^3.6.2",
"node-persist": "^2.0.7", "node-persist": "^2.0.7",
"node-schedule": "^1.2.0", "node-schedule": "^1.2.0",
"object-hash": "^1.1.8", "object-hash": "^1.1.8",
"pm2": "^2.4.0", "pm2": "^5.1.0",
"qs": "^6.8.0", "qs": "^6.8.0",
"query-string": "^4.2.3", "query-string": "^4.2.3",
"quickq": "^0.8.1", "quickq": "^0.8.1",
......
...@@ -41,7 +41,7 @@ function execute_function(context,response){ ...@@ -41,7 +41,7 @@ function execute_function(context,response){
var http_headers = {}; var http_headers = {};
if(param.auth){ if(param.auth){
if(param.auth.type == 'basic'){ if(param.auth.type == 'basic'){
var auth_header = "Basic " + new Buffer(param.auth.username + ":" + param.auth.password).toString("base64"); var auth_header = "Basic " + Buffer.from(param.auth.username + ":" + param.auth.password).toString("base64");
http_headers.Authorization = auth_header; http_headers.Authorization = auth_header;
} }
} }
......
...@@ -22,7 +22,8 @@ function perform_function(context,response){ ...@@ -22,7 +22,8 @@ function perform_function(context,response){
var meta = {}; var meta = {};
var last_mod = {'fname':'','tts':0}; var last_mod = {'fname':'','tts':0};
var fs_continue = false; var fs_continue = false;
var buff_out = new Buffer(0); //var buff_out = new Buffer(0);
var buff_out = Buffer.alloc(0);
if(param.last_modify_ts) if(param.last_modify_ts)
{ {
......
...@@ -75,7 +75,7 @@ function getImage(url) { ...@@ -75,7 +75,7 @@ function getImage(url) {
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
request(url, function (error, resp, body) { request(url, function (error, resp, body) {
if (!error && resp.statusCode == 200) { if (!error && resp.statusCode == 200) {
resolve("data:" + resp.headers["content-type"] + ";base64," + new Buffer(body).toString('base64')); resolve("data:" + resp.headers["content-type"] + ";base64," + Buffer.from(body).toString('base64'));
}else{ }else{
return reject(error); return reject(error);
} }
......
...@@ -138,7 +138,7 @@ function perform_image(_result, filecontent) { ...@@ -138,7 +138,7 @@ function perform_image(_result, filecontent) {
function getImage(filecontent) { function getImage(filecontent) {
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
resolve("data:image/jpeg;base64," + new Buffer(filecontent).toString('base64')); resolve("data:image/jpeg;base64," + Buffer.from(filecontent).toString('base64'));
}); });
} }
......
...@@ -75,7 +75,7 @@ function getImage(url) { ...@@ -75,7 +75,7 @@ function getImage(url) {
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
request(url, function (error, resp, body) { request(url, function (error, resp, body) {
if (!error && resp.statusCode == 200) { if (!error && resp.statusCode == 200) {
resolve("data:" + resp.headers["content-type"] + ";base64," + new Buffer(body).toString('base64')); resolve("data:" + resp.headers["content-type"] + ";base64," + Buffer.from(body).toString('base64'));
}else{ }else{
return reject(error); return reject(error);
} }
......
...@@ -29,7 +29,7 @@ function perform_function(context,request,response){ ...@@ -29,7 +29,7 @@ function perform_function(context,request,response){
val.forEach(function(v){ val.forEach(function(v){
if(typeof v.data == 'object' && v.data.type == 'Buffer' && Array.isArray(v.data.data)) if(typeof v.data == 'object' && v.data.type == 'Buffer' && Array.isArray(v.data.data))
{ {
v.data = new Buffer(v.data.data); v.data = Buffer.from(v.data.data);
} }
ret.push(v); ret.push(v);
}); });
......
const NodeCache = require( "node-cache" )
module.exports.create = function(prm)
{
return new BSSCache(prm);
}
function BSSCache(prm={}){
this.defTTL = prm.TTL || 600
this.max_size = prm.max_size || 8 * 1024 *1024
this.cache = new NodeCache({stdTTL:this.defTTL})
}
BSSCache.prototype.setCache = function (prm,obj)
{
let sname = prm.s
let ktype = prm.t || 'seq'
let keyname = prm.k
let ver = prm.v || '0'
let osize = prm.z
let cachekey = sname + ':' + ktype + ':' + String(keyname) + ':' + String(ver)
if(!obj){return;}
if(osize && osize>this.max_size){
return;
}
this.cache.set(cachekey,obj);
}
BSSCache.prototype.getCache = function (prm)
{
let sname = prm.s || '.'
let ktype = prm.t || 'seq'
let keyname = prm.k
let ver = prm.v || '0'
let cachekey = sname + ':' + ktype + ':' + String(keyname) + ':' + String(ver)
let val = this.cache.get(cachekey)
if(!val){
return null
}
this.cache.ttl(cachekey)
return val
}
\ No newline at end of file
var redis = require('redis');
const PREFIX = 'bs:cache:storage';
module.exports.create = function(conf)
{
return new CacheStore(conf);
}
function CacheStore(conf){
this.prefix = PREFIX ;
if(conf.mem){
this.mem = conf.mem;
}else if(conf.conn){
this.mem = redis.createClient(conf.conn);
}
}
CacheStore.prototype.setIndex = function(storage,oid,object,cb){
var key = this.prefix + ":" + storage;
this.mem.hset(key,oid,object,cb);
}
CacheStore.prototype.getIndex = function(storage,oid,cb)
{
var key = this.prefix + ":" + storage;
this.mem.hget(key,oid,function(err,v){
var value = null;
if(!err && v){
value = v;
}
cb(err,value);
});
}
CacheStore.prototype.flush = function(storage,cb)
{
var key = this.prefix + ":" + storage;
this.mem.del(key);
if(typeof cb == 'function'){
cb();
}
}
...@@ -2,10 +2,13 @@ var ctx = require('../context'); ...@@ -2,10 +2,13 @@ var ctx = require('../context');
var ConnCtx = ctx.getLib('lib/conn/connection-context'); var ConnCtx = ctx.getLib('lib/conn/connection-context');
var rpcserver = ctx.getLib('lib/amqp/rpcserver'); var rpcserver = ctx.getLib('lib/amqp/rpcserver');
var RPCCaller = ctx.getLib('lib/amqp/rpccaller'); var RPCCaller = ctx.getLib('lib/amqp/rpccaller');
var SSServer = ctx.getLib('lib/axon/rpcserver');
var Db = ctx.getLib('storage-service/lib/db'); var Db = ctx.getLib('storage-service/lib/db');
var WorkerPool = ctx.getLib('storage-service/lib/worker_pool'); var WorkerPool = ctx.getLib('storage-service/lib/worker_pool');
var SSCaller = ctx.getLib('lib/axon/rpccaller'); var BSSCache = ctx.getLib('storage-service/lib/storage-bsscache');
var StorageIndexstore = ctx.getLib('storage-service/lib/storage-indexstore')
//var SSServer = ctx.getLib('lib/axon/rpcserver');
//var SSCaller = ctx.getLib('lib/axon/rpccaller');
var Tokenizer = ctx.getLib('lib/auth/tokenizer'); var Tokenizer = ctx.getLib('lib/auth/tokenizer');
var ACLValidator = ctx.getLib('lib/auth/acl-validator'); var ACLValidator = ctx.getLib('lib/auth/acl-validator');
...@@ -19,8 +22,8 @@ var bodyParser = require('body-parser'); ...@@ -19,8 +22,8 @@ var bodyParser = require('body-parser');
var EventPub = ctx.getLib('lib/amqp/event-pub'); var EventPub = ctx.getLib('lib/amqp/event-pub');
//var cfg = ctx.config; //var cfg = ctx.config;
var SS_LISTEN = ctx.getUnixSocketUrl('ss.sock'); // var SS_LISTEN = ctx.getUnixSocketUrl('ss.sock');
var SS_URL = ctx.getUnixSocketUrl('ss.sock'); // var SS_URL = ctx.getUnixSocketUrl('ss.sock');
// var SS_LISTEN = ctx.getServiceUrl(19030); // var SS_LISTEN = ctx.getServiceUrl(19030);
// var SS_URL = ctx.getClientUrl(19030); // var SS_URL = ctx.getClientUrl(19030);
...@@ -98,39 +101,40 @@ SS.prototype.amqp_start = function() ...@@ -98,39 +101,40 @@ SS.prototype.amqp_start = function()
}); });
} }
SS.prototype.ipc_start = function()
{
var self = this;
if(this.ipc_server){return;}
this.ipc_server = new SSServer({
url : SS_LISTEN,
name : 'storage_request'
});
this.ipc_server.set_remote_function(function(req,callback){
//console.log("IPC Command");
self.db.request(req,function(err,res){
if(err){
console.log(err);
}
callback(err,res);
});
}); // SS.prototype.ipc_start = function()
// {
this.ipc_server.start(function(err){ // var self = this;
if(!err){
console.log('SS:IPC START\t\t\t[OK]'); // if(this.ipc_server){return;}
}else{
console.log('SS:IPC START\t\t\t[ERR]'); // this.ipc_server = new SSServer({
console.log('SS:IPC ERROR Restarting ...'); // url : SS_LISTEN,
setTimeout(function(){ // name : 'storage_request'
process.exit(1); // });
},5000); // this.ipc_server.set_remote_function(function(req,callback){
} // //console.log("IPC Command");
}); // self.db.request(req,function(err,res){
} // if(err){
// console.log(err);
// }
// callback(err,res);
// });
// });
// this.ipc_server.start(function(err){
// if(!err){
// console.log('SS:IPC START\t\t\t[OK]');
// }else{
// console.log('SS:IPC START\t\t\t[ERR]');
// console.log('SS:IPC ERROR Restarting ...');
// setTimeout(function(){
// process.exit(1);
// },5000);
// }
// });
// }
SS.prototype.http_start = function() SS.prototype.http_start = function()
{ {
...@@ -146,17 +150,23 @@ SS.prototype.http_start = function() ...@@ -146,17 +150,23 @@ SS.prototype.http_start = function()
})); }));
var context = ctx.getLib('lib/ws/http-context'); var context = ctx.getLib('lib/ws/http-context');
//this.storagecaller = new SSCaller({'url':SS_URL});
this.storagecaller = new RPCCaller({ this.storagecaller = new RPCCaller({
url : amqp_cfg.url, url : amqp_cfg.url,
name :'storage_request' name :'storage_request'
}); });
this.bsscache = BSSCache.create()
this.idxstore = new StorageIndexstore({'mem':self.mem });
this.acl_validator = ACLValidator.create(auth_cfg); this.acl_validator = ACLValidator.create(auth_cfg);
this.worker_pool.initWorker(); this.worker_pool.initWorker();
app.use(context.middleware({ app.use(context.middleware({
'acl_validator':self.acl_validator, 'acl_validator':self.acl_validator,
'worker_pool' : self.worker_pool, 'worker_pool' : self.worker_pool,
'storagecaller':self.storagecaller 'storagecaller':self.storagecaller,
'bsscache':self.bsscache,
'idxstore':self.idxstore
})); }));
var tokenizer = Tokenizer.create(auth_cfg); var tokenizer = Tokenizer.create(auth_cfg);
......
...@@ -58,7 +58,7 @@ router.get('/:id/file',function (req, res) { ...@@ -58,7 +58,7 @@ router.get('/:id/file',function (req, res) {
}); });
function oid_parse(oid,caller,cb) function oid_parse(oid,idxstore,cb)
{ {
var ret = {'valid':true} var ret = {'valid':true}
if(!oid) if(!oid)
...@@ -82,19 +82,12 @@ function oid_parse(oid,caller,cb) ...@@ -82,19 +82,12 @@ function oid_parse(oid,caller,cb)
if(ret.key.length<=0){ if(ret.key.length<=0){
return cb(null,{'valid':false}); return cb(null,{'valid':false});
} }
var callreq = {
'object_type' : 'storage_request',
'command' : 'idxget',
'param' : {
'storage_name' : ret.storage_name,
'key' : ret.key
}
}
caller.call(callreq,function(err,resp){ //direct idxstore
if(!err && resp.status=='OK' && resp.resp.found){ idxstore.getIndex(ret.storage_name,ret.key,(err,val)=>{
if(!err && val){
ret.by = "obj"; ret.by = "obj";
ret.obj_id = resp.resp.object_id; ret.obj_id = val;
ret.seq = (new ObjId(ret.obj_id)).extract().seq; ret.seq = (new ObjId(ret.obj_id)).extract().seq;
cb(null,ret); cb(null,ret);
}else{ }else{
...@@ -102,6 +95,26 @@ function oid_parse(oid,caller,cb) ...@@ -102,6 +95,26 @@ function oid_parse(oid,caller,cb)
} }
}); });
// var callreq = {
// 'object_type' : 'storage_request',
// 'command' : 'idxget',
// 'param' : {
// 'storage_name' : ret.storage_name,
// 'key' : ret.key
// }
// }
// caller.call(callreq,function(err,resp){
// if(!err && resp.status=='OK' && resp.resp.found){
// ret.by = "obj";
// ret.obj_id = resp.resp.object_id;
// ret.seq = (new ObjId(ret.obj_id)).extract().seq;
// cb(null,ret);
// }else{
// cb(null,{'valid':false});
// }
// });
}else if(str_addr.startsWith('[') && str_addr.endsWith(']')){ }else if(str_addr.startsWith('[') && str_addr.endsWith(']')){
ret.by = "seq"; ret.by = "seq";
var str_num = str_addr.substr(1,str_addr.length-2); var str_num = str_addr.substr(1,str_addr.length-2);
...@@ -139,9 +152,11 @@ function get_object(reqHelper,respHelper,prm) ...@@ -139,9 +152,11 @@ function get_object(reqHelper,respHelper,prm)
var oid = prm.oid; var oid = prm.oid;
var opt = prm.opt || {}; var opt = prm.opt || {};
var storagecaller = reqHelper.request.context.storagecaller; //var storagecaller = reqHelper.request.context.storagecaller;
var bsscache = reqHelper.request.context.bsscache;
var idxstore = reqHelper.request.context.idxstore;
oid_parse(oid,storagecaller,(err,oid_result)=>{ oid_parse(oid,idxstore,(err,oid_result)=>{
if(!oid_result.valid){ if(!oid_result.valid){
respHelper.response404(); respHelper.response404();
...@@ -158,23 +173,67 @@ function get_object(reqHelper,respHelper,prm) ...@@ -158,23 +173,67 @@ function get_object(reqHelper,respHelper,prm)
var bss_full_path = storage_cfg.repository + "/" + oid_result.storage_name.split('.').join('/') + ".bss"; var bss_full_path = storage_cfg.repository + "/" + oid_result.storage_name.split('.').join('/') + ".bss";
fs.exists(bss_full_path,function(exists){ fs.stat(bss_full_path,function(f_error,stats){
if(!f_error && stats.isFile()){
var cobj = null
//cache
if(oid_result.seq>=0){
cobj = bsscache.getCache({
s:oid_result.storage_name,
t:'seq',
k:String(oid_result.seq),
v:stats.atimeMs
})
}
if(exists){ if(cobj == null){
//MISS Cache
//console.log('Cache MISS----->>')
BinStream.open(bss_full_path,function(err,bss){ BinStream.open(bss_full_path,function(err,bss){
var rd = bss.reader(); var rd = bss.reader();
var rec_count = rd.count(); var rec_count = rd.count();
var seq = (oid_result.seq>=0)?oid_result.seq:rec_count + oid_result.seq + 1; var seq = (oid_result.seq>=0)?oid_result.seq:rec_count + oid_result.seq + 1;
if(oid_result.seq<0){
cobj = bsscache.getCache({
s:oid_result.storage_name,
t:'seq',
k:String(seq),
v:stats.atimeMs
})
}
if(cobj == null){
rd.objectAt(seq,function(err,obj){ rd.objectAt(seq,function(err,obj){
bss.close(function(err){ bss.close(function(err){
if(obj && (oid_result.by == 'seq' || oid_result.obj_id == (new ObjId(obj.header.ID)).toString()) ){ if(obj && (oid_result.by == 'seq' || oid_result.obj_id == (new ObjId(obj.header.ID)).toString()) ){
bsscache.setCache({
s:oid_result.storage_name,
t:'seq',
k:String(seq),
v:stats.atimeMs,
z:obj.header.MZ+obj.header.DZ
},obj)
output(respHelper,obj,opt); output(respHelper,obj,opt);
}else{respHelper.response404();} }else{respHelper.response404();}
}); });
}); });
}else{
//console.log('Cache HIT2----->>')
output(respHelper,cobj,opt);
}
}); });
}else{
//HIT Cache
//console.log('Cache HIT----->>')
output(respHelper,cobj,opt);
}
}else{ }else{
respHelper.response404(); respHelper.response404();
} }
......
var BinStream = require('../lib/bss/binarystream_v1_0'); var BinStream = require('../lib/bss/binarystream_v1_1');
var async = require('async'); var async = require('async');
var FNAME = "D:\\testfile\\test-qq.bss"; var FNAME = "D:\\testfile\\test-qq.bss";
......
const fs = require('fs');
var fname = "f.txt"
// Getting information for a file
fs.stat(fname, (error, stats) => {
if (error) {
console.log(error);
}
else {
console.log("Stats object for: example_file.txt");
console.log(stats.atimeMs);
// Using methods of the Stats object
console.log("Path is file:", stats.isFile());
console.log("Path is directory:", stats.isDirectory());
}
});
\ No newline at end of file
{ {
"version":"1.2.3", "version":"1.2.4",
"build":"202105050000" "build":"202106211900"
} }
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment