Commit 8842e6ac authored by Krit Punpreuk's avatar Krit Punpreuk

Add Query service base on ts.

parent 7a6492e0
......@@ -3,5 +3,5 @@ var router = express.Router();
router.use('/object',require('./service-object'));
router.use('/storage',require('./service-storage'));
router.use('/query',require('./service-query'));
module.exports = router;
var ctx = require('../../../context');
var express = require('express');
var router = express.Router();
var fs = require('fs');
var async = require('async');
//var Worker = require("tiny-worker");
/* Btree index support*/
// var btree = require("btreejs");
// const BUILDING_CAHCE = "NOT_READY"
// const TREE_ORDER = 4
var cfg = ctx.config;
var storage_cfg = cfg.storage;
var response = ctx.getLib('lib/ws/response');
var request = ctx.getLib('lib/ws/request');
var BinStream = ctx.getLib('lib/bss/binarystream_v1_1');
var ObjId = ctx.getLib('lib/bss/objid');
var Tokenizer = ctx.getLib('lib/auth/tokenizer');
var StorageUtils = ctx.getLib('storage-service/lib/storage-utils');
const ACL_SERVICE_NAME = "storage";
// Debug mode
var debug = false
router.get('/', function (req, res) {
var reqHelper = request.create(req);
var respHelper = response.create(res);
var s_list = StorageUtils.list(storage_cfg.repository);
var s_result = [];
var acl_validator = req.context.acl_validator;
var tInfo = Tokenizer.info(req.auth);
s_list.forEach((sname) => {
var acc_l = acl_validator.isAccept(tInfo.acl, {
"vo": tInfo.vo, "service": ACL_SERVICE_NAME, "resource": sname, "mode": "l"
});
if (acc_l) {
s_result.push(sname);
}
});
respHelper.responseOK(s_result);
});
router.get('/:id/stats', async function (req, res) {
var reqHelper = request.create(req);
var respHelper = response.create(res);
var sid = req.params.id;
var query = reqHelper.getQuery();
if (!query) { query = {}; }
var build_cache = false
if (query.cache == "true") {
build_cache = true
}
if (!sid) {
return respHelper.response404();
}
var acl_validator = req.context.acl_validator;
var tInfo = Tokenizer.info(req.auth);
var acp = acl_validator.isAccept(tInfo.acl, {
"vo": tInfo.vo, "service": ACL_SERVICE_NAME, "resource": sid, "mode": "r"
});
if (!acp) { return respHelper.response403(); }
var storage_path = sid.split('.').join('/');
var bss_full_path = storage_cfg.repository + "/" + storage_path + ".bss";
// var bss_cache_path = storage_cfg.repository + "/" + storage_path + ".bsc";
fs.exists(bss_full_path, function (exists) {
if (exists) {
// var cache = true
// fs.exists(bss_cache_path, async function (cache_exists) {
// var tree
// if (build_cache) {
// init_cache(bss_full_path, bss_cache_path)
// }
// if (!cache_exists) {
// cache = false
// } else {
// tree = await readCache(bss_cache_path)
// }
var fstat = stats = fs.statSync(bss_full_path);
BinStream.open(bss_full_path, function (err, bss) {
var rd = bss.reader();
var obj_stat = {
"storagename": sid,
"count": rd.count(),
"filename": storage_path + ".bss",
"filesize": fstat.size
// ,"index": cache,
// "tree_h": (tree) ? TREE_ORDER : "building"
}
bss.close(function (err) {
respHelper.responseOK(obj_stat);
});
});
// });
} else {
respHelper.response404();
}
});
});
// search by timestamp
router.get('/:id/bsstime', async (req, res, next) => {
try {
var reqHelper = request.create(req);
var respHelper = response.create(res);
var sid = req.params.id;
var query = reqHelper.getQuery();
if (!query) { query = {}; }
if (!sid) {
return respHelper.response404();
}
var acl_validator = req.context.acl_validator;
var tInfo = Tokenizer.info(req.auth);
var acp = acl_validator.isAccept(tInfo.acl, {
"vo": tInfo.vo, "service": ACL_SERVICE_NAME, "resource": sid, "mode": "r"
});
if (!acp) { return respHelper.response403(); }
var storage_path = sid.split('.').join('/');
var bss_full_path = storage_cfg.repository + "/" + storage_path + ".bss";
// var bss_cache_path = storage_cfg.repository + "/" + storage_path + ".bsc";
var from_seq = 1;
var limit = 0;
var sizelimit = 64 * 1000 * 1000;
var t = [new Date().getTime()]
// var tree;
var mode;
if (query.debug == 'true') {
debug = true
}
// Pick only one
// param => seek
if (query.seek) {
t = [Number(query.seek)]
mode = 'seek'
} else if (query.start && query.end) {
t = [Number(query.start), Number(query.end)]
if (t[0] > t[1]) {
return respHelper.response400('start cannot more than end parameter');
}
mode = 'range'
} else {
return respHelper.response400('Invalid query string');
}
var cache = true
// param => cache
// if (String(query.cache).toLocaleLowerCase() == 'false') {
// cache = false
// }
printLog('t: ' + t,true);
// fs.exists(bss_cache_path, async function (cache_exists) {
// if (!cache_exists && cache) {
// cache = false
// init_cache(bss_full_path, bss_cache_path)
// } else if (cache) {
// cache = false
// tree = await readCache(bss_cache_path)
// if (tree) { cache = true }
// }
// Set default start, end
let l = 1
let u = -1
switch (mode) {
// Seek
case 'seek':
// if (cache) {
// list = []
// tree.walkAsc(t[0], function(key, val) {
// list.push({key,val});
// if(list.size > 1){
// return
// }
// });
// printLog("tree walk seek mode: " + JSON.stringify(list),true)
// l = list[0].val
// u = list[0].val
// }
// check if time query is less then first data object (cache not found)
if (!l) {
from_seq = 1
limit = 1
u = -1;
} else {
let range = await searchSeq(bss_full_path, t, l, u, "seek")
from_seq = range[0]
limit = (range[range.length - 1] - range[0]) + 1
}
printLog("found: " + from_seq + " limit: " + limit,true)
break;
// Search (start,end)
case 'range':
let l1 = 1
let l2 = 1
let u1 = -1
let u2 = -1
// if (cache) {
// list = []
// tree.walkAsc(t[0],t[1], function(key, val) {
// list.push({key,val});
// if(list.size > 1){
// return
// }
// });
// var range = tree.fetchRange(t[0], t[1])
// l1 = list[0].val
// u1 = list[0].val
// l2 = list[list.length - 1].val
// u2 = list[list.length - 1].val
// if (!l1) {
// l1 = 1
// u1 = -1
// }
// if (!l2) {
// l2 = 1
// u2 = -1
// }
// }
found = await searchSeq(bss_full_path, t[0], l1, u1, "left")
from_seq = found[0]
found = await searchSeq(bss_full_path, t[1], from_seq, u2, "right")
limit = found[0] - from_seq + 1
// prevent no limit output
if (limit == 0 || limit < 0) {
limit = 1;
}
printLog("found: " + from_seq + " limit: " + limit,true)
break;
default:
break;
}
// param => sizelimit
if (query.limit) {
let qlimit = Number(query.limit)
limit = (limit <= qlimit)? limit:qlimit ;
}
// param => sizelimit
if (query.sizelimit) {
sizelimit = Number(query.sizelimit) * 1000 * 1000;
}
// param => sizelimit
if (query.sizelimit) {
sizelimit = Number(query.sizelimit) * 1000 * 1000;
}
// param => output = [object],stream
var output_type = (query.output) ? query.output : 'object';
//compat with v1
//from_seq = 100000000
// param => field = id,meta,data,_id,_meta,_data[all]
var objOpt = { 'id': true, 'meta': true, 'data': true, 'field': query.field }
if (query.field == 'id' || query.field == '_id') {
objOpt.meta = false;
objOpt.data = false;
} else if (query.field == 'meta') {
objOpt.data = false;
} else if (query.field == 'data') {
objOpt.meta = false;
} else if (query.field == '_meta') {
objOpt.data = false;
objOpt.id = false;
} else if (query.field == '_data') {
objOpt.meta = false;
objOpt.id = false;
}
// param => last
var tail_no = query.last;
var rd_prm = {
'bss_full_path': bss_full_path,
'tail_no': tail_no,
'from_seq': from_seq,
'limit': limit,
'output_type': output_type,
'objOpt': objOpt,
'sizelimit': sizelimit
}
var worker_pool = req.context.worker_pool;
var worker = worker_pool.get();
worker.resp = respHelper;
worker.output_type = output_type;
worker.execute({ 'cmd': 'read', 'prm': rd_prm });
worker.on('start', function (data) {
stream_start();
});
var firstline = true;
worker.on('data', function (data) {
if (!firstline) {
stream_newrec();
} else { firstline = false; }
stream_data(data);
});
worker.on('end', function (code) {
if (code == '404') {
end(404);
} else if (code == '200') {
stream_end();
end(200);
}
worker.shutdown();
//worker_pool.push(worker);
});
function stream_start() {
var resp = worker.resp;
var type = worker.output_type;
if (type == 'stream') {
resp.type('text');
} else {
resp.type('application/json');
resp.write('[');
}
}
function stream_newrec() {
var resp = worker.resp;
var type = worker.output_type;
if (type == 'stream') {
resp.write('\n');
} else {
resp.write(',');
}
}
function stream_data(data) {
var resp = worker.resp;
resp.write(data);
}
function stream_end() {
var resp = worker.resp;
var type = worker.output_type;
if (type == 'stream') {
resp.write('');
} else {
resp.write(']');
}
}
function end(code) {
var resp = worker.resp;
if (code == 404) {
resp.response404()
} else {
resp.status(code).end();
}
}
// });
} catch (e) {
console.error(e)
}
});
// async function init_cache(bss_full_path, bss_cache_path) {
// try {
// console.log('Cache file not found. Start rebuild cache...');
// let fname = bss_cache_path
// write(fname, BUILDING_CAHCE)
// read_cache = await createInitcache(bss_full_path)
// write(fname, JSON.stringify(read_cache))
// } catch (err) {
// console.error(err);
// }
// return addBtree(read_cache)
// }
async function searchSeq(FNAME, t, lowerbound, upperbound, mode) {
return new Promise(function (resolve, reject) {
BinStream.open(FNAME, function (err, bss) {
if (err !== null) reject(err);
let l = Number(lowerbound)
let u = Number(upperbound)
let result = []
let rd = bss.reader();
let rec_count = rd.count();
let last_seq = rec_count - 1
let OATZ = Number(rd.rootData.OATZ)
// Setup default if no index use
if (u < 1) {
u = last_seq
l = 1
}
// Range query mode
switch (mode) {
case 'left': l = l - OATZ; break;
case 'right': u = u + OATZ; break;
case 'seek':
l = l - OATZ;
u = u + OATZ;
break;
default: throw Error('wrong use in searchSeq Method');
}
// Prevent out of bound
if (u >= rec_count) { u = last_seq }
if (l < 1) { l = 1 }
let idx = l
let cont = true;
let resultIdx = 0
if (idx > rec_count) { cont = false; }
rd.moveTo(idx);
let start = new Date()
async.whilst(
function () { return cont; },
function (callback) {
rd.nextObject(function (err, obj) {
if (idx > rec_count || !obj) {
cont = false;
} else {
printLog("l m u : " + l + ", " + idx + ", " + u,debug);
let id = (new ObjId(obj.header.ID)).toString();
let dataout = {};
// dataout.ts = parseInt(id.substring(16, id.length), 16)
dataout.ts = obj.meta._ts
dataout.seq = parseInt(id.substring(2, 16), 16)
if (l == u) {
if (dataout.ts < t) {
if (mode == "seek") {
result.push(dataout.seq)
result.push(dataout.seq + 1)
} else if (mode == "left") {
result.push(dataout.seq + 1)
} else if (mode == "right") {
result.push(dataout.seq)
} else {
throw Error('wrong use in searchSeq Method');
}
} else {
if (mode == "seek") {
result.push(dataout.seq - 1)
result.push(dataout.seq)
} else if (mode == "left") {
result.push(dataout.seq)
} else if (mode == "right") {
result.push(dataout.seq - 1)
} else {
throw Error('wrong use in searchSeq Method');
}
}
cont = false;
} else if (l > u) {
result.push(dataout.seq)
cont = false;
}
if (dataout.ts < t) {
l = Number(dataout.seq)
} else {
u = Number(dataout.seq) - 1
}
idx = Math.ceil((Number(l) + Number(u)) / 2)
rd.moveTo(idx);
}
resultIdx++
callback();
});
}, function (err) {
bss.close(function (err) {
let end = new Date()
printLog('total lookup: ' + resultIdx + " . time use :" + (end - start)+"ms",true);
resolve(result)
});
})
});
});
}
// function createInitcache(FNAME) {
// return new Promise(function (resolve, reject) {
// BinStream.open(FNAME, function (err, bss) {
// if (err !== null) reject(err);
// var result = []
// var rd = bss.reader();
// var rec_count = rd.count();
// var OATZ = Number(rd.rootData.OATZ)
// var custom_mod = 1000000
// if (rec_count.toString().length > 7) {
// custom_mod = Math.pow(10, rec_count.toString().length - 1)
// }
// var objOpt = { "id": true };
// var idx = 1
// var result = [];
// var cont = true;
// var resultIdx = 0
// if (idx > rec_count) { cont = false; }
// rd.moveTo(idx);
// var start = new Date()
// async.whilst(
// function () { return cont; },
// function (callback) {
// rd.nextObject(function (err, obj) {
// if (idx > rec_count || !obj) {
// console.log("building cache : '" + FNAME + "' completed.");
// cont = false;
// } else {
// var id = (new ObjId(obj.header.ID)).toString();
// var dataout = {};
// dataout.ts = parseInt(id.substring(16, id.length), 16)
// dataout.seq = parseInt(id.substring(2, 16), 16)
// result.push(dataout)
// resultIdx++;
// // Skip to next block
// idx = idx + OATZ
// rd.moveTo(idx);
// }
// // Custom log output
// if ((idx - (OATZ + 1)) % custom_mod == 0) {
// console.log("building cache : '" + FNAME + "' " + parseFloat(idx * 100 / (rec_count)).toFixed(2) + "%" + " done.");
// }
// callback();
// });
// }, function (err) {
// bss.close(function (err) {
// var end = new Date()
// console.log('total key: ' + resultIdx + " . time use :" + (end - start));
// //console.log('close');
// //console.log(result);
// resolve(result)
// });
// })
// });
// });
// }
// async function readCache(bss_cache_path) {
// var tree = null
// try {
// let contents = fs.readFileSync(bss_cache_path, 'utf8');
// if (contents == BUILDING_CAHCE || contents.length == 0) {
// return null;
// } else {
// var read_cache = JSON.parse(contents)
// tree = await addBtree(read_cache)
// return tree
// }
// } catch (err) {
// console.error(err);
// }
// }
// async function addBtree(array) {
// var Tree = btree.create(4, btree.numcmp);
// var tree = new Tree();
// for (el in array) {
// tree.put(array[el].ts, array[el].seq);
// }
// return tree
// }
// function write(file, text) {
// return new Promise((resolve, reject) => {
// fs.writeFile(file, text, err => {
// if (err) reject(err);
// else resolve();
// });
// });
// }
function printLog(msg, debug = false) {
if(debug){
return console.log(msg)
} else{
return;
}
}
module.exports = router;
  • Query data by timestamp. Search using "_ts" attribute in meta

    • seek nearest data given by unix timestamp Return the closest data timestamp left and right side if not exactly match) http://localhost:19080/v1.2/query/<storage-id>/bsstime?seek=1492563941
    • Search by start and end unixtimestamp
      http://localhost:19080/v1.2/query/<storage-id>/bsstime?start=1492563941&end=1492663941 Can use limit parameter to limit ouput
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment