Commit 0d663f02 authored by Kamron Aroonrua's avatar Kamron Aroonrua 💬

Merge branch 'storecache' into 'dev'

Storecache

See merge request !34
parents b59c0795 6ef77b5f
#Changelog
## [1.2.4 Dev] - 2021-05-31
### Added
- STORAGE :: Object Cache
### Update
- SS :: Direct Idxstore
## [1.2.3] - 2021-05-05
### Update
- AMQPLib Support Heartbeat Monitoring
......
......@@ -25,6 +25,7 @@
"minimist": "^1.2.0",
"moment": "^2.17.1",
"mqtt": "^3.0.0",
"node-cache": "^5.1.2",
"node-gyp": "^3.6.2",
"node-persist": "^2.0.7",
"node-schedule": "^1.2.0",
......
const NodeCache = require( "node-cache" )
module.exports.create = function(prm)
{
return new BSSCache(prm);
}
function BSSCache(prm={}){
this.defTTL = prm.TTL || 600
this.max_size = prm.max_size || 8 * 1024 *1024
this.cache = new NodeCache({stdTTL:this.defTTL})
}
BSSCache.prototype.setCache = function (prm,obj)
{
let sname = prm.s
let ktype = prm.t || 'seq'
let keyname = prm.k
let ver = prm.v || '0'
let osize = prm.z
let cachekey = sname + ':' + ktype + ':' + String(keyname) + ':' + String(ver)
if(!obj){return;}
if(osize && osize>this.max_size){
return;
}
this.cache.set(cachekey,obj);
}
BSSCache.prototype.getCache = function (prm)
{
let sname = prm.s || '.'
let ktype = prm.t || 'seq'
let keyname = prm.k
let ver = prm.v || '0'
let cachekey = sname + ':' + ktype + ':' + String(keyname) + ':' + String(ver)
let val = this.cache.get(cachekey)
if(!val){
return null
}
this.cache.ttl(cachekey)
return val
}
\ No newline at end of file
var redis = require('redis');
const PREFIX = 'bs:cache:storage';
module.exports.create = function(conf)
{
return new CacheStore(conf);
}
function CacheStore(conf){
this.prefix = PREFIX ;
if(conf.mem){
this.mem = conf.mem;
}else if(conf.conn){
this.mem = redis.createClient(conf.conn);
}
}
CacheStore.prototype.setIndex = function(storage,oid,object,cb){
var key = this.prefix + ":" + storage;
this.mem.hset(key,oid,object,cb);
}
CacheStore.prototype.getIndex = function(storage,oid,cb)
{
var key = this.prefix + ":" + storage;
this.mem.hget(key,oid,function(err,v){
var value = null;
if(!err && v){
value = v;
}
cb(err,value);
});
}
CacheStore.prototype.flush = function(storage,cb)
{
var key = this.prefix + ":" + storage;
this.mem.del(key);
if(typeof cb == 'function'){
cb();
}
}
......@@ -2,10 +2,13 @@ var ctx = require('../context');
var ConnCtx = ctx.getLib('lib/conn/connection-context');
var rpcserver = ctx.getLib('lib/amqp/rpcserver');
var RPCCaller = ctx.getLib('lib/amqp/rpccaller');
var SSServer = ctx.getLib('lib/axon/rpcserver');
var Db = ctx.getLib('storage-service/lib/db');
var WorkerPool = ctx.getLib('storage-service/lib/worker_pool');
var SSCaller = ctx.getLib('lib/axon/rpccaller');
var BSSCache = ctx.getLib('storage-service/lib/storage-bsscache');
var StorageIndexstore = ctx.getLib('storage-service/lib/storage-indexstore')
//var SSServer = ctx.getLib('lib/axon/rpcserver');
//var SSCaller = ctx.getLib('lib/axon/rpccaller');
var Tokenizer = ctx.getLib('lib/auth/tokenizer');
var ACLValidator = ctx.getLib('lib/auth/acl-validator');
......@@ -19,8 +22,8 @@ var bodyParser = require('body-parser');
var EventPub = ctx.getLib('lib/amqp/event-pub');
//var cfg = ctx.config;
var SS_LISTEN = ctx.getUnixSocketUrl('ss.sock');
var SS_URL = ctx.getUnixSocketUrl('ss.sock');
// var SS_LISTEN = ctx.getUnixSocketUrl('ss.sock');
// var SS_URL = ctx.getUnixSocketUrl('ss.sock');
// var SS_LISTEN = ctx.getServiceUrl(19030);
// var SS_URL = ctx.getClientUrl(19030);
......@@ -98,39 +101,40 @@ SS.prototype.amqp_start = function()
});
}
SS.prototype.ipc_start = function()
{
var self = this;
if(this.ipc_server){return;}
this.ipc_server = new SSServer({
url : SS_LISTEN,
name : 'storage_request'
});
this.ipc_server.set_remote_function(function(req,callback){
//console.log("IPC Command");
self.db.request(req,function(err,res){
if(err){
console.log(err);
}
callback(err,res);
});
});
this.ipc_server.start(function(err){
if(!err){
console.log('SS:IPC START\t\t\t[OK]');
}else{
console.log('SS:IPC START\t\t\t[ERR]');
console.log('SS:IPC ERROR Restarting ...');
setTimeout(function(){
process.exit(1);
},5000);
}
});
}
// SS.prototype.ipc_start = function()
// {
// var self = this;
// if(this.ipc_server){return;}
// this.ipc_server = new SSServer({
// url : SS_LISTEN,
// name : 'storage_request'
// });
// this.ipc_server.set_remote_function(function(req,callback){
// //console.log("IPC Command");
// self.db.request(req,function(err,res){
// if(err){
// console.log(err);
// }
// callback(err,res);
// });
// });
// this.ipc_server.start(function(err){
// if(!err){
// console.log('SS:IPC START\t\t\t[OK]');
// }else{
// console.log('SS:IPC START\t\t\t[ERR]');
// console.log('SS:IPC ERROR Restarting ...');
// setTimeout(function(){
// process.exit(1);
// },5000);
// }
// });
// }
SS.prototype.http_start = function()
{
......@@ -146,17 +150,23 @@ SS.prototype.http_start = function()
}));
var context = ctx.getLib('lib/ws/http-context');
//this.storagecaller = new SSCaller({'url':SS_URL});
this.storagecaller = new RPCCaller({
url : amqp_cfg.url,
name :'storage_request'
});
this.bsscache = BSSCache.create()
this.idxstore = new StorageIndexstore({'mem':self.mem });
this.acl_validator = ACLValidator.create(auth_cfg);
this.worker_pool.initWorker();
app.use(context.middleware({
'acl_validator':self.acl_validator,
'worker_pool' : self.worker_pool,
'storagecaller':self.storagecaller
'storagecaller':self.storagecaller,
'bsscache':self.bsscache,
'idxstore':self.idxstore
}));
var tokenizer = Tokenizer.create(auth_cfg);
......
......@@ -58,7 +58,7 @@ router.get('/:id/file',function (req, res) {
});
function oid_parse(oid,caller,cb)
function oid_parse(oid,idxstore,cb)
{
var ret = {'valid':true}
if(!oid)
......@@ -82,19 +82,12 @@ function oid_parse(oid,caller,cb)
if(ret.key.length<=0){
return cb(null,{'valid':false});
}
var callreq = {
'object_type' : 'storage_request',
'command' : 'idxget',
'param' : {
'storage_name' : ret.storage_name,
'key' : ret.key
}
}
caller.call(callreq,function(err,resp){
if(!err && resp.status=='OK' && resp.resp.found){
//direct idxstore
idxstore.getIndex(ret.storage_name,ret.key,(err,val)=>{
if(!err && val){
ret.by = "obj";
ret.obj_id = resp.resp.object_id;
ret.obj_id = val;
ret.seq = (new ObjId(ret.obj_id)).extract().seq;
cb(null,ret);
}else{
......@@ -102,6 +95,26 @@ function oid_parse(oid,caller,cb)
}
});
// var callreq = {
// 'object_type' : 'storage_request',
// 'command' : 'idxget',
// 'param' : {
// 'storage_name' : ret.storage_name,
// 'key' : ret.key
// }
// }
// caller.call(callreq,function(err,resp){
// if(!err && resp.status=='OK' && resp.resp.found){
// ret.by = "obj";
// ret.obj_id = resp.resp.object_id;
// ret.seq = (new ObjId(ret.obj_id)).extract().seq;
// cb(null,ret);
// }else{
// cb(null,{'valid':false});
// }
// });
}else if(str_addr.startsWith('[') && str_addr.endsWith(']')){
ret.by = "seq";
var str_num = str_addr.substr(1,str_addr.length-2);
......@@ -139,9 +152,11 @@ function get_object(reqHelper,respHelper,prm)
var oid = prm.oid;
var opt = prm.opt || {};
var storagecaller = reqHelper.request.context.storagecaller;
//var storagecaller = reqHelper.request.context.storagecaller;
var bsscache = reqHelper.request.context.bsscache;
var idxstore = reqHelper.request.context.idxstore;
oid_parse(oid,storagecaller,(err,oid_result)=>{
oid_parse(oid,idxstore,(err,oid_result)=>{
if(!oid_result.valid){
respHelper.response404();
......@@ -158,23 +173,67 @@ function get_object(reqHelper,respHelper,prm)
var bss_full_path = storage_cfg.repository + "/" + oid_result.storage_name.split('.').join('/') + ".bss";
fs.exists(bss_full_path,function(exists){
fs.stat(bss_full_path,function(f_error,stats){
if(!f_error && stats.isFile()){
var cobj = null
//cache
if(oid_result.seq>=0){
cobj = bsscache.getCache({
s:oid_result.storage_name,
t:'seq',
k:String(oid_result.seq),
v:stats.atimeMs
})
}
if(exists){
if(cobj == null){
//MISS Cache
//console.log('Cache MISS----->>')
BinStream.open(bss_full_path,function(err,bss){
var rd = bss.reader();
var rec_count = rd.count();
var seq = (oid_result.seq>=0)?oid_result.seq:rec_count + oid_result.seq + 1;
if(oid_result.seq<0){
cobj = bsscache.getCache({
s:oid_result.storage_name,
t:'seq',
k:String(seq),
v:stats.atimeMs
})
}
if(cobj == null){
rd.objectAt(seq,function(err,obj){
bss.close(function(err){
if(obj && (oid_result.by == 'seq' || oid_result.obj_id == (new ObjId(obj.header.ID)).toString()) ){
bsscache.setCache({
s:oid_result.storage_name,
t:'seq',
k:String(seq),
v:stats.atimeMs,
z:obj.header.MZ+obj.header.DZ
},obj)
output(respHelper,obj,opt);
}else{respHelper.response404();}
});
});
}else{
//console.log('Cache HIT2----->>')
output(respHelper,cobj,opt);
}
});
}else{
//HIT Cache
//console.log('Cache HIT----->>')
output(respHelper,cobj,opt);
}
}else{
respHelper.response404();
}
......
const fs = require('fs');
var fname = "f.txt"
// Getting information for a file
fs.stat(fname, (error, stats) => {
if (error) {
console.log(error);
}
else {
console.log("Stats object for: example_file.txt");
console.log(stats.atimeMs);
// Using methods of the Stats object
console.log("Path is file:", stats.isFile());
console.log("Path is directory:", stats.isDirectory());
}
});
\ No newline at end of file
{
"version":"1.2.3",
"build":"202105050000"
"version":"1.2.4",
"build":"202105310000"
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment