Commit 6399c040 authored by project's avatar project

--no commit message

--no commit message
parent 2fcd4226
...@@ -4,6 +4,8 @@ var RPCCaller = ctx.getLib('lib/amqp/rpccaller'); ...@@ -4,6 +4,8 @@ var RPCCaller = ctx.getLib('lib/amqp/rpccaller');
var BinStream = ctx.getLib('lib/bss/binarystream_v1_1'); var BinStream = ctx.getLib('lib/bss/binarystream_v1_1');
var bsdata = ctx.getLib('lib/model/bsdata'); var bsdata = ctx.getLib('lib/model/bsdata');
var async = require('async');
function perform_function(context,request,response){ function perform_function(context,request,response){
var job_id = context.jobconfig.job_id; var job_id = context.jobconfig.job_id;
var transaction_id = context.transaction.id; var transaction_id = context.transaction.id;
...@@ -26,8 +28,48 @@ function perform_function(context,request,response){ ...@@ -26,8 +28,48 @@ function perform_function(context,request,response){
"_tid" : transaction_id, "_tid" : transaction_id,
"_ts" : Math.round((new Date).getTime() / 1000) "_ts" : Math.round((new Date).getTime() / 1000)
} }
var dc_data = bsdata.create(data).serialize('object-encoded');
if(Array.isArray(data)){
var idx = 0;
async.whilst(
function() { return idx < data.length; },
function(callback) {
var el_data = bsdata.create(data[idx]).serialize('object-encoded');
send_storage(caller,dc_meta,el_data,storage_name,function(err){
idx++;
if(!err){
callback(null);
}else{
callback(err);
}
});
},
function (err) {
if(!err){
response.success();
}else{
response.error("storage error");
}
}
);
}else{
var dc_data = bsdata.create(data).serialize('object-encoded');
send_storage(caller,dc_meta,dc_data,storage_name,function(err){
if(!err){
response.success();
}else{
response.error("storage error");
}
});
}
}
function send_storage(caller,dc_meta,dc_data,storage_name,cb)
{
var req = { var req = {
'object_type' : 'storage_request', 'object_type' : 'storage_request',
'command' : 'write', 'command' : 'write',
...@@ -44,16 +86,12 @@ function perform_function(context,request,response){ ...@@ -44,16 +86,12 @@ function perform_function(context,request,response){
caller.call(req,function(err,resp){ caller.call(req,function(err,resp){
if(!err && resp.status=='OK'){ if(!err && resp.status=='OK'){
response.success(); cb(null);
}else{ }else{
response.error("storage error") cb("error");
} }
}); });
// response.success();
// response.reject();
// response.error("error message")
} }
module.exports = perform_function; module.exports = perform_function;
...@@ -25,7 +25,7 @@ module.exports.newdata = function(prm,cb){ ...@@ -25,7 +25,7 @@ module.exports.newdata = function(prm,cb){
'resource_id' : objId, 'resource_id' : objId,
'resource_location' : obj_api_url + '/' + storageId + '.' + objId 'resource_location' : obj_api_url + '/' + storageId + '.' + objId
} }
//console.log(objMsg);
var msg = JSON.stringify(objMsg); var msg = JSON.stringify(objMsg);
ch.assertExchange(ex, 'topic', {durable: false}); ch.assertExchange(ex, 'topic', {durable: false});
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment