Commit 2fcd4226 authored by project's avatar project

--no commit message

--no commit message
parent f9307a44
{ {
"api_hostname" : "http://localhost:19080",
"repository" : "D:/testfile" "repository" : "D:/testfile"
} }
var dateFormat = require('dateformat');
module.exports.create = create;
function create(req)
{
if(req)
{
var rqhp = new requestHelper(req);
return rqhp;
}else{
return null;
}
}
var requestHelper = function(req)
{
this.request= req;
}
requestHelper.prototype.getRequest = function() {
return this.request;
};
requestHelper.prototype.getHeaders= function() {
return this.request.headers;
};
requestHelper.prototype.getQuery = function() {
return this.request.query;
};
var queryString = require('query-string');
module.exports.create = create;
function create(resp)
{
if(resp)
{
var rhp = new responseHelper(resp);
return rhp;
}else{
return null;
}
}
var responseHelper = function(resp)
{
this.response = resp;
this.response.set('Access-Control-Allow-Origin','*');
}
function responseResult(data,pg)
{
var result = data;
this.response.json(result);
}
responseHelper.prototype.setHeader = function(header){
this.header = header;
}
responseHelper.prototype.setLastModified = function(lm){
if(lm){
this.response.set('Last-Modified',lm);
}
}
responseHelper.prototype.responseOK = responseResult;
responseHelper.prototype.response304 = function(){
this.response.status(304).send('Not Modified');
}
responseHelper.prototype.response500 = function(){
this.response.status(500).send('Internal Server Error');
}
responseHelper.prototype.response400 = function(msg){
if(msg){
this.response.status(400).json({response:'ERROR',message:msg});
}else{
this.response.status(400).send('Bad Request');
}
}
responseHelper.prototype.response404 = function(msg){
if(msg){
this.response.status(404).json({response:'ERROR',message:msg});
}else{
this.response.status(404).send('Not found');
}
}
responseHelper.prototype.response409 = function(msg){
if(msg){
this.response.status(409).json({response:'ERROR',message:msg});
}else{
this.response.status(409).send('Conflict');
}
}
responseHelper.prototype.response201 = function(msg,result){
var ret = {response:'OK'};
if(msg){
ret.message = msg;
}
if(result){
ret.result = result;
}
this.response.status(201).json(ret);
}
responseHelper.prototype.response200 = function(result){
var ret = {response:'OK'};
if(result){
ret.result = result;
}
this.response.status(200).json(ret);
}
var util = require('util');
var DOPlugin = require('../do-plugin');
function DOTask(context,request){
DOPlugin.call(this,context,request);
this.name = "storage";
}
util.inherits(DOTask,DOPlugin);
DOTask.prototype.perform = require('./perform');
module.exports = DOTask;
var ctx = require('../../../context');
var RPCCaller = ctx.getLib('lib/amqp/rpccaller');
var BinStream = ctx.getLib('lib/bss/binarystream_v1_1');
var bsdata = ctx.getLib('lib/model/bsdata');
function perform_function(context,request,response){
var job_id = context.jobconfig.job_id;
var transaction_id = context.transaction.id;
var param = context.jobconfig.data_out.param;
var memstore = context.task.memstore
var output_type = request.input_type;
var data = request.data;
var amqp_cfg = ctx.config.amqp;
var storage_name = param.storage_name;
var caller = new RPCCaller({
url : amqp_cfg.url,
name :'storage_request'
});
var dc_meta = {
"_jid" : job_id,
"_tid" : transaction_id,
"_ts" : Math.round((new Date).getTime() / 1000)
}
var dc_data = bsdata.create(data).serialize('object-encoded');
var req = {
'object_type' : 'storage_request',
'command' : 'write',
'param' : {
'storage_name' : storage_name,
'meta' : dc_meta,
'data' : {
'type' : 'bsdata',
'value' : dc_data
}
}
}
caller.call(req,function(err,resp){
if(!err && resp.status=='OK'){
response.success();
}else{
response.error("storage error")
}
});
// response.success();
// response.reject();
// response.error("error message")
}
module.exports = perform_function;
var fs = require('fs'); var fs = require('fs');
var ctx = require('../../context'); var ctx = require('../../context');
var BinStream = ctx.getLib('lib/bss/binarystream_v1_1'); var BinStream = ctx.getLib('lib/bss/binarystream_v1_1');
var ObjId = ctx.getLib('lib/bss/objid');
var bsdata = ctx.getLib('lib/model/bsdata');
var importer = require('./importer'); var importer = require('./importer');
var dataevent = require('./dataevent');
module.exports.create = function(prm) module.exports.create = function(prm)
{ {
...@@ -17,6 +21,7 @@ function BSSEngine(prm) ...@@ -17,6 +21,7 @@ function BSSEngine(prm)
// this.repos_dir = prm.repos_dir; // this.repos_dir = prm.repos_dir;
// this.name = prm.name; // this.name = prm.name;
this.file = prm.file; this.file = prm.file;
this.name = prm.name;
this.concurrent = 0; this.concurrent = 0;
} }
...@@ -83,14 +88,38 @@ BSSEngine.prototype.cmd = function(cmd,cb) ...@@ -83,14 +88,38 @@ BSSEngine.prototype.cmd = function(cmd,cb)
BSSEngine.prototype.cmd_write = function(prm,cb) BSSEngine.prototype.cmd_write = function(prm,cb)
{ {
var data = prm.data; var self = this;
var data = parseData(prm.data);
var meta = prm.meta; var meta = prm.meta;
this.bss.write(data,{'meta':meta},function(err,obj){ this.bss.write(data,{'meta':meta},function(err,obj){
if(!err){ if(!err){
cb(null); var head = obj.getHeader();
var obj_id = new ObjId(head.ID);
var resp = {
'resource_id' : obj_id.toString(),
'storage_name' : self.name
}
dataevent.newdata({'resourceId':obj_id.toString(),'storageId':self.name});
cb(null,resp);
}else { }else {
cb("write error"); cb("write error");
} }
}); });
} }
function parseData(dat)
{
if(!dat.value){return null}
var ret;
if(dat.type && dat.type == 'bsdata')
{
var bs = bsdata.parse(dat.value);
ret = bs.data;
}else{
ret = dat.value;
}
return ret;
}
...@@ -19,7 +19,7 @@ BSSPool.prototype.get = function(name,cb) ...@@ -19,7 +19,7 @@ BSSPool.prototype.get = function(name,cb)
cb(null,bss.engine); cb(null,bss.engine);
}); });
}else{ }else{
bss = BSSEngine.create(filepath); bss = BSSEngine.create({'file' : filepath,'name' : bssname});
bss.open(function(err){ bss.open(function(err){
if(!err){ if(!err){
self.pool.push({ self.pool.push({
......
var ctx = require('../../context');
var cfg = ctx.config;
var amqp = require('amqplib/callback_api');
module.exports.newdata = function(prm,cb){
var objId = prm.resourceId;
var storageId = prm.storageId;
var hostname = cfg.storage.api_hostname;
var obj_api_url = hostname + '/v0.1/object'
amqp.connect(cfg.amqp.url, function(err, conn) {
if(err){
console.log(err);
}else{
conn.createChannel(function(err, ch) {
if(err){
console.log(err);
}else{
var ex = 'bs_storage';
var key = 'storage.' + storageId + '.dataevent.newdata';
var objMsg = {
'event' : 'newdata',
'resourceId' : objId,
'resource_id' : objId,
'resource_location' : obj_api_url + '/' + storageId + '.' + objId
}
//console.log(objMsg);
var msg = JSON.stringify(objMsg);
ch.assertExchange(ex, 'topic', {durable: false});
ch.publish(ex, key, new Buffer(msg));
//console.log("[AMQP] Sent %s:'%s'", key, msg);
}
});
setTimeout(function() { conn.close();}, 500);
}
});
}
...@@ -20,12 +20,8 @@ Db.prototype.request = function(req,cb) ...@@ -20,12 +20,8 @@ Db.prototype.request = function(req,cb)
var cmd = req.command; var cmd = req.command;
switch (cmd) { switch (cmd) {
case 'write': case 'write':
var prm_w = { var prm = req.param
'storage' : req.storage_name, this.bsscmd_w(prm,cb)
'meta' : req.meta,
'data' : req.resource.value
}
this.bsscmd_w(prm_w,cb)
break; break;
default: default:
cb(null,result_error('invalid command')); cb(null,result_error('invalid command'));
...@@ -34,24 +30,24 @@ Db.prototype.request = function(req,cb) ...@@ -34,24 +30,24 @@ Db.prototype.request = function(req,cb)
} }
Db.prototype.bsscmd_w = function(cmd,cb) Db.prototype.bsscmd_w = function(prm,cb)
{ {
var self = this; var self = this;
var filepath = this.repos_dir + '/' + name2path(cmd.storage) + '.bss'; var filepath = this.repos_dir + '/' + name2path(prm.storage_name) + '.bss';
var bssname = cmd.storage; var bssname = prm.storage_name;
var w_cmd = { var w_cmd = {
'command' : 'write', 'command' : 'write',
'param' : { 'param' : {
'meta' : cmd.meta, 'meta' : prm.meta,
'data' : cmd.data 'data' : prm.data
} }
} }
this.bsspool.get(bssname,function(err,bss){ this.bsspool.get(bssname,function(err,bss){
if(!err){ if(!err){
bss.cmd(w_cmd,function(err){ bss.cmd(w_cmd,function(err,resp){
if(!err){ if(!err){
cb(null,result_ok('success')); cb(null,result_ok(resp));
}else{ }else{
cb(null,result_error('write error')); cb(null,result_error('write error'));
} }
...@@ -72,7 +68,7 @@ function result_error(msg) ...@@ -72,7 +68,7 @@ function result_error(msg)
return {'status':'ERR','msg':msg} return {'status':'ERR','msg':msg}
} }
function result_ok(msg) function result_ok(resp)
{ {
return {'status':'OK','msg':msg} return {'status':'OK','resp':resp}
} }
var ctx = require('../context'); var ctx = require('../context');
var rpcserver = ctx.getLib('lib/amqp/rpcserver'); var rpcserver = ctx.getLib('lib/amqp/rpcserver');
var Db = ctx.getLib('storage-service/lib/db');
var express = require('express');
var app = express();
var bodyParser = require('body-parser');
var cfg = ctx.config;
var storage_cfg = cfg.storage;
module.exports.create = function(cfg) module.exports.create = function(cfg)
{ {
...@@ -10,6 +18,8 @@ module.exports.create = function(cfg) ...@@ -10,6 +18,8 @@ module.exports.create = function(cfg)
var SS = function StorageService(cfg) var SS = function StorageService(cfg)
{ {
this.config = cfg; this.config = cfg;
this.db = Db.create({'repos_dir':storage_cfg.repository});
} }
SS.prototype.start = function() SS.prototype.start = function()
...@@ -21,6 +31,7 @@ SS.prototype.start = function() ...@@ -21,6 +31,7 @@ SS.prototype.start = function()
SS.prototype.amqp_start = function() SS.prototype.amqp_start = function()
{ {
var self = this;
var amqp_cfg = this.config.amqp; var amqp_cfg = this.config.amqp;
if(this.amqp_server){return;} if(this.amqp_server){return;}
...@@ -30,11 +41,12 @@ SS.prototype.amqp_start = function() ...@@ -30,11 +41,12 @@ SS.prototype.amqp_start = function()
name : 'storage_request' name : 'storage_request'
}); });
this.amqp_server.set_remote_function(function(req,callback){ this.amqp_server.set_remote_function(function(req,callback){
var n = parseInt(req.t);
console.log('REQUEST ' + req); self.db.request(req,function(err,res){
setTimeout(function(){ //console.log(res);
callback(null,{'time':n,'data':req.d}); callback(err,res);
},n); });
}); });
this.amqp_server.start(function(err){ this.amqp_server.start(function(err){
...@@ -49,13 +61,22 @@ SS.prototype.amqp_start = function() ...@@ -49,13 +61,22 @@ SS.prototype.amqp_start = function()
SS.prototype.http_start = function() SS.prototype.http_start = function()
{ {
var http = require('http'); var self = this;
http.createServer(function (req, res) {
res.writeHead(200, { var API_PORT = 19080;
'Content-Type': 'text/plain; charset=UTF-8'
});
res.end("req http " + String( (new Date()).getTime() ));
}).listen(19080, ""); app.use(bodyParser.json({limit: '5mb'}));
app.use(bodyParser.urlencoded({
extended: true
}));
app.use(require('./ws'));
app.listen(API_PORT, function () {
console.log('SS:DATA_API START\t\t[OK]'); console.log('SS:DATA_API START\t\t[OK]');
});
} }
var express = require('express')
, router = express.Router()
router.use('/v0.1',require('./v0.1'));
module.exports = router;
var express = require('express');
var router = express.Router();
router.use('/object',require('./service-object'));
//router.use('/storage',require('./service-storage'));
module.exports = router;
var ctx = require('../../../context');
var express = require('express');
var router = express.Router();
var path = require('path');
var fs = require('fs');
var cfg = ctx.config;
var storage_cfg = cfg.storage
var response = ctx.getLib('lib/ws/response');
var request = ctx.getLib('lib/ws/request');
var BinStream = ctx.getLib('lib/bss/binarystream_v1_1');
var ObjId = ctx.getLib('lib/bss/objid');
router.get('/:id',function (req, res) {
var reqHelper = request.create(req);
var respHelper = response.create(res);
var oid = req.params.id;
get_object(reqHelper,respHelper,oid);
});
function get_object(reqHelper,respHelper,oid)
{
if(!oid){
return respHelper.response404();
}
var bss_ab_path = "default.bss";
var path_token= oid.split('.');
if(path_token.length >= 2){
var col = "";
var file = path_token[path_token.length-2] + ".bss";
var col = path_token.slice(0,path_token.length-2).join('/');
if(col.length>0){col = col + '/'}
bss_ab_path = col + file;
}else{
//Only Id no storage name
//Default Action
}
var tkoId = path_token[path_token.length-1];
var bss_full_path = storage_cfg.repository + "/" + bss_ab_path;
var obj_id = '';
try{
obj_id = new ObjId(tkoId);
}catch(err){
return respHelper.response404();
}
var seq = obj_id.extract().seq;
fs.exists(bss_full_path,function(exists){
if(exists){
BinStream.open(bss_full_path,function(err,bss){
var rd = bss.reader();
rd.objectAt(seq,function(err,obj){
bss.close(function(err){
if(obj && obj_id.toString() == (new ObjId(obj.header.ID)).toString()){
respHelper.responseOK(obj_out(obj));
}else{respHelper.response404();}
});
});
});
}else{
respHelper.response404();
}
});
}
function obj_out(obj){
return {"_id" : (new ObjId(obj.header.ID)).toString(),
"meta" : obj.meta,
"data" : obj.data
}
}
module.exports = router;
var ctx = require('../../../context');
var express = require('express');
var router = express.Router();
var path = require('path');
var fs = require('fs');
var async = require('async');
var cfg = ctx.config;
var response = ctx.getlib('lib/ws/response');
var request = ctx.getlib('lib/ws/request');
var BinStream = ctx.getlib('lib/storage/binarystream_v1_0');
var ObjId = ctx.getlib('lib/storage/objid');
router.get('/:id/stats',function (req, res) {
var reqHelper = request.create(req);
var respHelper = response.create(res);
var sid = req.params.id;
if(!sid){
return respHelper.response404();
}
var storage_path = sid.split('.').join('/');
var bss_full_path = cfg.repository.path + "/" + storage_path + ".bss";
fs.exists(bss_full_path,function(exists){
if(exists){
BinStream.open(bss_full_path,function(err,bss){
var rd = bss.reader();
var obj_stat = {
"storagename" : sid,
"count" : rd.count(),
"filename" : storage_path + ".bss"
}
bss.close(function(err){
respHelper.responseOK(obj_stat);
});
});
}else{
respHelper.response404();
}
});
});
router.get('/:id/objects',function (req, res) {
var reqHelper = request.create(req);
var respHelper = response.create(res);
var sid = req.params.id;
var query = reqHelper.getQuery();
if(!query){query={};}
if(!sid){
return respHelper.response404();
}
var storage_path = sid.split('.').join('/');
var bss_full_path = cfg.repository.path + "/" + storage_path + ".bss";
var from_seq = 1;
var limit = 0;
if(query.obj_after){
var o_seq;
try{
var obj_id = new ObjId(query.obj_after);
o_seq = obj_id.extract().seq;
}catch(err){
return respHelper.response404();
}
from_seq = o_seq+1;
}
if(query.limit){
limit = Number(query.limit);
}
if(query.seq_from){
from_seq = Number(query.seq_from);
}
fs.exists(bss_full_path,function(exists){
if(exists){
BinStream.open(bss_full_path,function(err,bss){
var rd = bss.reader();
var rec_count = rd.count();
if(query.last){
var last_count=Number(query.last);
from_seq = (rec_count - last_count) + 1;
}
if(from_seq<1){from_seq=1;}
var idx = from_seq;
var obj_return = [];
var cont = true;
if(idx > rec_count){cont=false;}
rd.moveTo(idx);
async.whilst(
function() { return cont; },
function(callback){
rd.nextObject(function(err,obj){
idx++;
if(!obj){
cont=false;
}else{
//console.log(obj);
obj_return.push(obj_out(obj));
if(limit>0 && idx>=from_seq+limit){
cont=false;
}
}
callback();
});
},function(err){
bss.close(function(err){
respHelper.responseOK(obj_return);
});
});
});
}else{
respHelper.response404();
}
});
});
function obj_out(obj){
return {"_id" : (new ObjId(obj.header.ID)).toString(),
"meta" : obj.meta,
"data" : obj.data
}
}
module.exports = router;
{
"job_id" : "storage",
"active" : true,
"trigger" : {
"type": "cron",
"cmd": "*/10 * * * * *"
},
"data_in" : {
"type": "example"
},
"data_transform" : {
"type": "noop"
},
"data_out" : {
"type": "storage",
"param": {
"storage_name" : "test.plugin.do"
}
}
}
...@@ -33,13 +33,29 @@ var Db = ctx.getLib('storage-service/lib/db'); ...@@ -33,13 +33,29 @@ var Db = ctx.getLib('storage-service/lib/db');
var database = Db.create({'repos_dir':'D:/testfile'}); var database = Db.create({'repos_dir':'D:/testfile'});
// var req = {
// 'object_type' : 'storage_request',
// 'command' : 'write',
// 'storage_name' : 'gcs.file.test',
// 'meta' : {'name':'gcs'},
// 'resource' : {
// 'value' : 'Kamron Aroonrua'
// }
// }
var req = { var req = {
'object_type' : 'storage_request', 'object_type' : 'storage_request',
'command' : 'write', 'command' : 'write',
'param' : {
'storage_name' : 'gcs.file.test', 'storage_name' : 'gcs.file.test',
'meta' : {'name':'gcs'}, 'meta' : {'name':'gcs'},
'resource' : { 'data' : {
'value' : 'Kamron Aroonrua' 'type' : 'bsdata',
'value' : {
'data_type' : 'string',
'data' : 'AA00FFCC'
}
}
} }
} }
......
...@@ -8,8 +8,23 @@ var caller = new RPCCaller({ ...@@ -8,8 +8,23 @@ var caller = new RPCCaller({
name :'storage_request' name :'storage_request'
}); });
var req = {
'object_type' : 'storage_request',
'command' : 'write',
'param' : {
'storage_name' : 'gcs.file.test',
'meta' : {'name':'gcs'},
'data' : {
'type' : 'bsdata',
'value' : {
'data_type' : 'text',
'data' : 'kamron'
}
}
}
}
caller.call({t:10000,d:'hello'},function(err,resp){ caller.call(req,function(err,resp){
console.log(resp); console.log(resp);
}); });
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment