Commit d73f1992 authored by project's avatar project

--no commit message

--no commit message
parent df63983f
{
"url" : "amqp://test:test@lab1.igridproject.info"
}
module.exports = {
'amqp' : require('./amqp.json')
}
var CONFIG_PATH = './conf/config';
module.exports.config = require(CONFIG_PATH);
module.exports.getLib = function(name){
if(name)
{
return require('./' + name);
}
return null;
}
module.exports.getPlugins = function(type,name)
{
var path = './plugins/' + type + '/' + type + '-' +name;
return require(path);
}
This diff is collapsed.
var randomAccessFile = require('random-access-file');
var async = require('async');
var BSON = require('buffalo');
var ObjectId = BSON.ObjectId;
var fileAccessBuffer = require('./file-access-buffer');
var Root = require('./root');
var ObjectData = require('./objectdata');
var ObjId = require('./objid');
var Oat = require('./oat');
var Reader = require('./reader');
const VERSION = "1.0";
const ROOTSIZE = 256;
const OATSLOT = 10000;
const OATMETASIZE = 100;
const HEADERSIZE = 80;
const STRING_TYPE = 1;
const BINARY_TYPE = 2;
const OBJECT_TYPE = 3;
module.exports.format = function(filename,opt,cb){
var options = {};
if(typeof opt == 'function'){
cb = opt;
}else{
options = opt || {};
}
var prm = {};
if(options.hashnumber && options.hashnumber instanceof Buffer){
prm.FHN = options.hashnumber;
}
var fd = fileAccessBuffer.create(filename,{truncate: true});
var root = new Root(fd);
root.newroot(prm);
root.write(cb);
};
module.exports.open = function(filename,opt,cb){
var options = {};
if(typeof opt == 'function'){
cb = opt;
}else{
options = opt || {};
}
var inst = new Storage(filename,options);
inst.open(cb);
}
function Storage(filename,opt)
{
this.options = opt;
this.filename = filename;
}
Storage.prototype.open = function(cb)
{
var self = this;
if(this.root){return cb(new Error('Already opened'));}
var fileOpt = null;
this.file = fileAccessBuffer.create(this.filename,fileOpt);
this.root = new Root(this.file);
this.root.load(function(err,obj){
cb(err,self);
})
}
Storage.prototype.write = function(data,opt,cb)
{
var self = this;
var options = {};
if(typeof opt == 'function'){
cb = opt;
}else{
options = opt || {};
}
var rootData = this.root.data;
var nSeq = rootData.SEQ+1;
var nFhn = rootData.FHN;
var oid = new ObjId({'fhn':nFhn,'seq':nSeq});
var objData = ObjectData.createByData(this.file,{
id:oid.bytes,
data:data,
meta:options.meta
})
if(!objData){
return cb(new Error('Data error'));
}
var slotNo = rootData.SEQ%rootData.OATZ;
async.waterfall(
[
function(callback){
//make oat buffer
if(rootData.SEQ == 0){
//First OAT
self.lastOat = Oat.create(self.file,
{
'address':rootData.AOF,
'SEQ':1,
});
rootData.OATA = self.lastOat.address;
rootData.AOF = rootData.AOF + self.lastOat.getSize();
self.lastOat.writeMeta(function(err){
callback(err,true);
})
}else if(!self.lastOat){
//load last Oat
Oat.load(self.file,rootData.OATA,function(err,oat){
self.lastOat = oat;
callback(err,false);
})
}else{
callback(null,false);
}
///end make oat
},
function(first,callback){
//new oat table
if(!first && slotNo==0){
var nextOat = Oat.create(self.file,
{
'address':rootData.AOF,
'SEQ':self.lastOat.oatmeta.SEQ + 1,
'PREV':self.lastOat.address
});
rootData.OATA = nextOat.address;
rootData.AOF = rootData.AOF + nextOat.getSize();
self.lastOat.setNextAddr(nextOat.address);
nextOat.writeMeta(function(err){
if(err){
callback(err);
}else{
self.lastOat.writeMeta(function(err){
self.lastOat = nextOat;
callback(err);
})
}
})
}else{
callback(null);
}
},
function(callback){
//write oat slot
objData.setAddress(rootData.AOF);
self.lastOat.writeSlot(slotNo,objData.getHeader(),function(err){
callback(err);
});
},
function(callback){
//update root
rootData.AOF = rootData.AOF + objData.getObjectSize();
rootData.SEQ = nSeq;
self.root.write(function(err){
callback(err);
})
},
function(callback){
//write data
objData.write(function(err){
callback(err,objData);
});
}
],
function(err){
cb(err,objData);
}
);
}
Storage.prototype.reader = function(prm){
var prm = prm || {};
var rd = new Reader(this.file,this.root);
return rd;
}
Storage.prototype.close = function(cb){
this.file.close(cb);
};
var randomAccessFile = require('random-access-file');
var fs = require("fs");
var async = require('async');
var BSON = require('buffalo');
var BUFFERSIZE = 1*1024*1024;
function FileAccessBuffer(filename,opt){
if (!opt) opt = {};
this.filename = filename;
this.size = opt.size || BUFFERSIZE;
this.buffer = null;
this.file = randomAccessFile(this.filename,opt);
}
FileAccessBuffer.prototype.bufferedRead = function (offset, length, cb) {
var self=this;
var readStart = offset;
var readEnd = (offset + length) - 1;
var ret_buffer = new Buffer(length);
if(!this.filesize){
var stats = fs.statSync(this.filename);
this.filesize = stats["size"];
}
if(this.buffer){
var buffStart = this.buffer.offset;
var buffEnd = (this.buffer.offset + this.buffer.data.length) - 1;
if(readStart >= buffStart && readEnd <= buffEnd){
this.buffer.data.copy(ret_buffer,0,readStart-buffStart,(readStart-buffStart)+length);
setImmediate(function (){
cb(null,ret_buffer);
//cb(null,self.buffer.data.slice(readStart-buffStart,(readStart-buffStart)+length));
});
}else if(readStart >= buffStart && readStart <= buffEnd){
//intersec back
var bytesDiff = (buffEnd-readStart)+1;
this.buffer.data.copy(ret_buffer,0,readStart-buffStart,(readStart-buffStart)+bytesDiff);
var ioffset = buffEnd+1;
var bytesRequire = length-bytesDiff;
var ilength = (self.size>bytesRequire)?self.size:bytesRequire;
if(ioffset+ilength > self.filesize){
ilength = self.filesize - ioffset;
}
self.file.read(ioffset,ilength,function(err,buff){
//console.log('intersec read');
if(!err){
self.buffer = {
"offset":ioffset,
"data":buff
};
buff.copy(ret_buffer,bytesDiff,0,bytesRequire);
}
cb(err,ret_buffer);
});
}else{
newbuffer(cb);
}
}else{
newbuffer(cb);
}
function newbuffer(callback){
var loffset = offset;
var llength = (self.size>length)?self.size:length;
if(loffset+llength > self.filesize){
llength = self.filesize - loffset;
}
self.file.read(loffset,llength,function(err,buff){
//console.log('new read');
if(!err){
self.buffer = {
"offset":loffset,
"data":buff
};
buff.copy(ret_buffer,0,0,length);
}
callback(err,ret_buffer);
});
}
}
FileAccessBuffer.prototype.read = function (offset, length, cb) {
//console.log('bypss read');
this.file.read(offset, length, cb);
}
FileAccessBuffer.prototype.write = function (offset, buf, cb) {
this.file.write(offset, buf, cb);
}
FileAccessBuffer.prototype.close = function (cb) {
this.file.close(cb);
}
module.exports.create = function(filename,opt)
{
return new FileAccessBuffer(filename,opt);
}
var BSON = require('buffalo');
var async = require('async');
const OATMETASIZE = 64;
const OBJHEADERSIZE = 80;
//OAT Header structure
var oatmeta_struct = function(){
return {
"SEQ" : 0,
"SZ" : 10000,
"PREV" : null,
"NEXT" : null,
}
}
var create = module.exports.create = function(fd,prm,opt){
var oatmeta = oatmeta_struct();
oatmeta.SEQ = (prm.SEQ)?prm.SEQ:1;
oatmeta.SZ = (prm.SZ)?prm.SZ:10000;
oatmeta.PREV = (prm.PREV)?prm.PREV:null;
oatmeta.NEXT = (prm.NEXT)?prm.NEXT:null;
var address = prm.address;
return new Oat(fd,address,oatmeta,opt);
};
module.exports.load = function(fd,address,opt,cb){
var options = {};
if(typeof opt == 'function'){
cb = opt;
}else{
options = opt || {};
}
fd.read(address, OATMETASIZE, function(err, buffer) {
var oatmeta = null;
if(!err){
oatmeta = BSON.parse(buffer);
var inst = new Oat(fd,address,oatmeta,options);
}
cb(err,inst);
});
}
function Oat(fd,addr,meta,opt){
this.options = opt || {};
this.file = fd;
this.slotbuffer = null;
this.address = addr;
this.oatmeta = meta;
}
Oat.prototype.setNextAddr = function(addr){
this.oatmeta.NEXT = addr;
}
Oat.prototype.oatAt = function(seq,cb){
var self=this;
var curSeq = this.oatmeta.SEQ;
var curOat = this;
if(seq>curSeq){
//forward
async.whilst(
function() { return ((curOat!=null) && (seq>curSeq)); },
function(callback) {
curOat.nextOat(function(err,oat){
curOat = oat;
if(curOat){
curSeq = oat.oatmeta.SEQ;
callback(err,curOat);
}else{
callback(null,null);
}
});
},
function (err, oat) {
cb(err,oat);
}
);
}else if(seq<curSeq){
//backward
async.whilst(
function() { return ((curOat!=null) && (seq<curSeq)); },
function(callback) {
curOat.prevOat(function(err,oat){
curOat = oat;
if(curOat){
curSeq = oat.oatmeta.SEQ;
callback(err,curOat);
}else{
callback(null,null);
}
});
},
function (err, oat) {
cb(err,oat);
}
);
}else{
callback(null,curOat);
}
}
Oat.prototype.nextOat = function(cb){
if(!this.oatmeta.NEXT){
return cb(null,null);
}
var inst = new Oat(this.file,this.oatmeta.NEXT);
inst.readMeta(function(err){
if(!err){
cb(null,inst);
}else{
cb(err);
}
})
}
Oat.prototype.prevOat = function(cb){
if(!this.oatmeta.PREV){
return cb(null,null);
}
var inst = new Oat(this.file,this.oatmeta.PREV);
inst.readMeta(function(err){
if(!err){
cb(null,inst);
}else{
cb(err);
}
})
}
Oat.prototype.readMeta = function(cb){
var self = this;
this.file.read(this.address, OATMETASIZE, function(err, buffer) {
if(!err){
self.oatmeta = BSON.parse(buffer);
}
cb(err);
});
}
Oat.prototype.writeMeta = function(cb){
var buffer = new Buffer(OATMETASIZE);
BSON.serialize(this.oatmeta,buffer);
this.file.write(this.address,buffer,cb);
}
Oat.prototype.writeSlot = function(index,data,cb){
if(index>=this.oatmeta.SZ){
return cb(new Error("Index out of bound"));
}
var slotAddr = this.address + OATMETASIZE + (index*OBJHEADERSIZE);
var buffer = new Buffer(OBJHEADERSIZE);
BSON.serialize(data,buffer);
this.file.write(slotAddr,buffer,cb);
}
Oat.prototype.readSlot = function(index,opt,cb){
self=this;
var options = {};
if(typeof opt == 'function'){
cb = opt;
}else{
options = opt || {};
}
if(index>=this.oatmeta.SZ){
return cb(new Error("Index out of bound"));
}
var slotStart = this.address + OATMETASIZE;
var slotOffset = index*OBJHEADERSIZE;
var slotAddr = slotStart + slotOffset;
if(this.slotbuffer){
var buff = this.slotbuffer.slice(slotOffset,slotOffset+OBJHEADERSIZE);
setImmediate(function (){
cb(null,BSON.parse(buff));
});
}else{
var readstart = (options.nobuffer)?slotAddr:slotStart;
var readlen = (options.nobuffer)?OBJHEADERSIZE:(OBJHEADERSIZE * this.oatmeta.SZ);
self.file.read(readstart, readlen, function(err, rdbuffer) {
if(options.nobuffer){
cb(err,BSON.parse(rdbuffer));
}else{
self.slotbuffer = rdbuffer;
var slicebuff = self.slotbuffer.slice(slotOffset,slotOffset+OBJHEADERSIZE);
cb(err,BSON.parse(slicebuff));
}
});
}
}
Oat.prototype.getSize = function(){
return OATMETASIZE + (this.oatmeta.SZ * OBJHEADERSIZE);
}
var BSON = require('buffalo');
var bsonp = require('bson')
var BSONP = new bsonp.BSONPure.BSON()
const OBJHEADERSIZE = 80;
const STRING_TYPE = 1;
const BINARY_TYPE = 2;
const OBJECT_TYPE = 3;
//Header structure
var header_struct = function(){
return {
"ID" : null,
"TY" : 1,
"FG" : null,
"MZ" : 0,
"DZ" : 0,
"AD" : 0
}
}
function ObjectData(fd,header,meta,data){
this.file = fd;
this.header = header;
//Buffer
this.metaBuffer = meta;
this.dataBuffer = data;
}
ObjectData.prototype.getHeader = function(){
return this.header;
}
ObjectData.prototype.getObjectSize = function(){
return this.header.MZ + this.header.DZ;
}
ObjectData.prototype.setAddress = function(addr){
return this.header.AD = addr;
}
ObjectData.prototype.readMeta = function(opt,cb){
self=this;
var options = {};
if(typeof opt == 'function'){
cb = opt;
}else{
options = opt || {};
}
var fd=this.file;
var oStart = this.header.AD;
var oLen = this.header.MZ;
fd.bufferedRead(oStart,oLen,function(err,buff){
if(!err){
cb(null,BSON.parse(buff));
}else{
cb(err);
}
});
}
ObjectData.prototype.readData = function(opt,cb){
var self=this;
var options = {};
if(typeof opt == 'function'){
cb = opt;
}else{
options = opt || {};
}
var fd=this.file;
var oStart = this.header.AD + this.header.MZ;
var oLen = this.header.DZ;
fd.bufferedRead(oStart,oLen,function(err,buff){
if(!err){
cb(null,parse_data(buff,self.header.TY));
}else{
cb(err);
}
});
}
ObjectData.prototype.readObject = function(opt,cb){
var self=this;
var options = {};
if(typeof opt == 'function'){
cb = opt;
}else{
options = opt || {};
}
var fd=this.file;
var oStart = this.header.AD;
var oLen = this.header.MZ + this.header.DZ;
fd.bufferedRead(oStart,oLen,function(err,buff){
if(!err){
var obj = {
header : self.header,
meta : BSON.parse(buff.slice(0,self.header.MZ)),
data : parse_data(buff.slice(self.header.MZ,self.header.MZ+self.header.DZ),self.header.TY),
}
cb(null,obj);
}else{
cb(err);
}
});
}
var parse_data = function(buffer,ty){
if(ty==STRING_TYPE){
return buffer.toString('utf8');
}else if(ty==OBJECT_TYPE){
return BSON.parse(buffer);
}else{
return buffer;
}
}
ObjectData.prototype.write = function(addr,cb){
var address = this.header.AD;
if(typeof addr == 'function'){
cb = addr;
}else if(Number(addr)>0){
address = addr;
this.header.AD = address;
}
var fd=this.file;
var objBuffer = new Buffer(this.header.MZ + this.header.DZ)
if(this.header.MZ>0){
this.metaBuffer.copy(objBuffer);
}
this.dataBuffer.copy(objBuffer,this.header.MZ)
fd.write(this.header.AD,objBuffer,function(err){
cb(err);
})
}
module.exports.HEADERSIZE = OBJHEADERSIZE;
module.exports.createByHeader = function(fd,header){
return new ObjectData(fd,header,null,null);
}
module.exports.createByData = function(fd,prm){
var id = prm.id;
var data = prm.data;
var meta = prm.meta;
var address = (prm.address)?prm.address:0;
var header = header_struct();
var metaBuffer = null;
var dataBuffer = null;
//data
switch (typeof data){
case 'string':
header.TY = STRING_TYPE;
header.DZ = Buffer.byteLength(data, 'utf8');
dataBuffer = new Buffer(data);
break;
case 'object':
if(data instanceof Buffer){
header.TY = BINARY_TYPE;
header.DZ = data.length;
dataBuffer = data;
}else{
var objData = BSON.serialize(data);
header.TY = OBJECT_TYPE;
header.DZ = objData.length
dataBuffer = objData;
}
break;
default :
return null;
}
if(meta){
metaBuffer = BSON.serialize(meta);
header.MZ = metaBuffer.length;
}
header.ID = id;
header.AD = address
return new ObjectData(fd,header,metaBuffer,dataBuffer);
}
module.exports = ObjId;
function ObjId(prm){
if (Buffer.isBuffer(prm)) {
if (prm.length != 12){
throw new Error("Buffer-based ObjectId must be 12 bytes")
}
this.bytes = prm
}else if(typeof prm == 'object'){
var fhn = (Buffer.isBuffer(prm.fhn) )?prm.fhn:new Buffer(4);
var seq = prm.seq || 1;
var ts = (prm.ts)?prm.ts:(Date.now() / 1000) & 0xFFFFFFFF;
seq = seq & 0xFFFFFFFF;
this.bytes = new Buffer([
fhn[0],
fhn[1],
fhn[2],
fhn[3],
seq>>24,
seq>>16,
seq>>8,
seq,
ts>>24,
ts>>16,
ts>>8,
ts
]);
}else if (typeof prm == 'string') {
if (prm.length != 24) throw new Error("String-based ObjectId must be 24 bytes")
if (!/^[0-9a-f]{24}$/i.test(prm)) throw new Error("String-based ObjectId must in hex-format:" + prm)
this.bytes = fromHex(prm)
}
}
ObjId.prototype.extract = function(){
var e_fhn = this.bytes.slice(0,4);
var e_seq = this.bytes.readUInt32BE(4);
var e_ts = this.bytes.readUInt32BE(8);
return {fhn:e_fhn,seq:e_seq,ts:e_ts};
}
ObjId.prototype.toString = function() {
return toHex(this.bytes)
}
var toHex = function(buffer) {
return buffer.toString('hex')
}
var fromHex = function(string) {
return new Buffer(string, 'hex')
}
var async = require('async');
var Root = require('./root');
var ObjectData = require('./objectdata');
var ObjId = require('./objid');
var Oat = require('./oat');
module.exports = Reader;
function Reader(fd,root){
this.file = fd;
this.root = root;
this.cursorIdx = 0;
this.oat = null;
}
Reader.prototype.moveTo = function(idx){
var rootData = this.root.getRoot();
if(idx>0 && idx <= rootData.SEQ){
this.cursorIdx = idx-1;
return true;
}else{
return false;
}
}
Reader.prototype.nextObject = function(cb){
var self=this;
this.readAt(++this.cursorIdx,function(err,obj){
if(!err && obj){
//cb(null,{})
obj.readObject(cb);
}else{
cb(err,null);
}
});
}
Reader.prototype.objectAt = function(seq,opt,cb){
var options = {};
if(typeof opt == 'function'){
cb = opt;
}else{
options = opt || {};
}
this.readAt(seq,options,function(err,obj){
if(!err && obj){
obj.readObject(cb);
}else{
cb(err,null);
}
});
}
Reader.prototype.readAt = function(seq,opt,cb){
var self = this;
var options = {};
if(typeof opt == 'function'){
cb = opt;
}else{
options = opt || {};
}
var rootData = this.root.getRoot();
if(seq<=0 || seq > rootData.SEQ){
return cb(new Error('unavailable'));
}
var oatNo = Math.ceil(seq / rootData.OATZ);
var slotNo = (seq-1)%rootData.OATZ;
// var hobj3 = {
// "ID":new Buffer(12),
// "TY":3,
// "FG":null,
// "MZ":94,
// "DZ":97,
// "AD":17060704
// }
// var obj = ObjectData.createByHeader(self.file,hobj3);
// process.nextTick(function(){
// cb(null,obj);
// })
async.waterfall([
function(callback) {
//load last oat
if(self.oat == null){
Oat.load(self.file,rootData.OATA,function(err,oat){
self.oat = oat;
if(oat){
callback(null);
}else{
callback(new Error('Oat Error'));
}
});
}else{
callback(null);
}
},
function(callback){
//move to current oat
if(self.oat.oatmeta.SEQ==oatNo){
callback(null);
}else{
self.oat.oatAt(oatNo,function(err,oat){
if(oat && oat.oatmeta.SEQ == oatNo){
self.oat = oat;
callback(null);
}else{
callback(new Error('Oat Error'));
}
});
}
},
function(callback){
//getSlot
self.oat.readSlot(slotNo,{nobuffer:options.nobuffer},function(err,slot){
callback(err,slot);
});
}
], function (err,slot) {
var obj = ObjectData.createByHeader(self.file,slot);
cb(err,obj);
});
}
Reader.prototype.remain = function(){
var rootData = this.root.getRoot();
return rootData.SEQ - this.cursorIdx;
}
Reader.prototype.count = function(){
var rootData = this.root.getRoot();
return rootData.SEQ;
}
var BSON = require('buffalo');
const ROOTSIZE = 256;
const VERSION = "1.0";
const OATSIZE = 10000;
var root_struct = function (){
return {
"VER":VERSION,
"FHN":new Buffer(4),
"SEQ":0,
"OATA":0,
"OATZ":OATSIZE,
"AOF":ROOTSIZE
};
}
module.exports = Root;
function Root(fd,opt)
{
this.options = opt || {};
this.file = fd;
this.data = null;
}
Root.prototype.setVal = function(name,val){
this.data[name] = val;
}
Root.prototype.getVal = function(name){
return this.data[name] = val;
}
Root.prototype.getRoot = function(){
return this.data;
}
Root.prototype.newroot = function(prm){
prm = prm || {};
this.data = root_struct();
if(prm.FHN){
this.data.FHN = prm.FHN;
}
}
Root.prototype.load = function(cb){
var self = this;
this.file.read(0, ROOTSIZE-1, function(err, buffer) {
if(!err){
var objroot = BSON.parse(buffer);
self.data = objroot;
}
cb(err,objroot);
});
}
Root.prototype.write = function(cb){
var buffer = new Buffer(ROOTSIZE);
BSON.serialize(this.data,buffer);
this.file.write(0,buffer,cb);
}
var async = require('async');
var BSON = require('buffalo');
var ObjId = BSON.ObjectId;
var Oat = require('./oat');
var ObjectData = require('./objectdata');
var ObjId = require('./objid');
module.exports = Writer;
function Writer(fd,root,opt){
this.file = fd;
this.root = root;
}
Writer.prototype.write = function(data,opt,cb){
var self = this;
var options = {};
if(typeof opt == 'function'){
cb = opt;
}else{
options = opt || {};
}
var nSeq = ++this.root.SEQ;
var nFhn = this.root.FHN;
var id = new ObjId({'fhn':nFhn,'seq':nSeq})
console.log(id.toString());
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment