Commit dc0446c0 authored by project's avatar project

--no commit message

--no commit message
parent c2843f91
......@@ -20,6 +20,7 @@
"dateformat": "^1.0.12",
"express": "^4.14.0",
"ioredis": "^2.5.0",
"moment": "^2.17.1",
"node-persist": "^2.0.7",
"node-schedule": "^1.2.0",
"node-uuid": "^1.4.7",
......
var request = require('request');
var async = require('async');
var moment = require('moment');
var cts = moment();
function execute_function(context,response){
var job_id = context.jobconfig.job_id;
var transaction_id = context.transaction_id;
var param = context.jobconfig.data_in.param;
var memstore = context.task.memstore;
// var memstore = context.task.memstore;
var memstore = context.job.memstore;
var output_type = 'object/agritronics';
var data = 'hello world';
var ct = new Date().getTime();
cts = moment().hours(0).minutes(0).seconds(0);
//console.log("moment cts: " + cts.format("YYYY-MM-DD,HH:mm:ss"));
let result = {
"object_type": 'agritronic',
......@@ -24,22 +28,34 @@ function execute_function(context,response){
async.whilst(function() { return idx < param.data_types.length;}, function(callback) {
let dtype = param.data_types[idx].type;
let node_id = param.data_types[idx].node_id;
let lts = memstore.getItem(`${param.station_id}-${dtype}`);
console.log(`memstore: ${param.station_id}-${dtype} = ${lts}`);
if(typeof(lts) === 'undefined') lts = `${param.init_observed_date},${param.init_observed_time}`;
let url = param.url + `?appkey=${param.appkey}&p=${param.station_id},${node_id},${dtype},${lts}`;
idx++;
getData(url).then((data) => {
if(data.search("denied") === -1){
result.data.push({
"data_types": dtype,
"value" : data
});
callback();
}
}).catch((err) => {
callback(err);
});
memstore.getItem(`${param.station_id}-${dtype}`, function(err, lts){ //latest timestamp, format: yyyy-MM-dd HH:mm:ss
idx++;
console.log(`memstore: ${param.station_id}-${dtype} = ${lts}`);
if (typeof lts === 'undefined') lts = moment(`${param.init_observed_date} ${param.init_observed_time}`);
else lts = moment(lts);
console.log(`memstore: ${param.station_id}-${dtype} = ${lts}`);
let recvTime = cts.diff(lts, 'days');
if(recvTime > 20) lts = new moment().add(-20, 'day').hours(0).minutes(0).seconds(0);
//console.log(cts.format("YYYY-MM-DD,HH:mm:ss") + " <<>> " + lts.format("YYYY-MM-DD,HH:mm:ss"));
let url = param.url + `?appkey=${param.appkey}&p=${param.station_id},${node_id},${dtype}`;
getData(url, lts, (vals, err) => {
if(err) {
callback(err);
}
else{
result.data.push({
"data_types": dtype,
"value" : vals
});
callback();
}
});
});
}, function(err) {
if( err ) {
......@@ -53,7 +69,36 @@ function execute_function(context,response){
}
function getData(url) {
function getData(url, lts, callback) {
let vals = [];
let req = url + `,${lts.format("YYYY-MM-DD,HH:mm:ss")}`;
//console.log(req);
requestData(req).then((data) => {
if(data.search("denied") === -1 && data.search("invalid") === -1 && data.search("no data") === -1){
vals.push(data);
beforeDateCheck(cts, lts);
}
function beforeDateCheck(ct, lt){
if (lt.isBefore(ct, 'days')) {
lt.add(1, 'days').hours(0).minutes(0).seconds(0);
req = url + `,${lt.format("YYYY-MM-DD,HH:mm:ss")}`;
console.log(req);
requestData(req).then((val) => {
if(val.search("denied") === -1 && val.search("invalid") === -1 && val.search("no data") === -1){
vals.push(val);
beforeDateCheck(ct, lt);
}
})
}
else callback(vals)
}
}).catch((err) => {
callback(err);
});
}
function requestData(url) {
return new Promise((resolve, reject) => {
request(url, function (error, resp, body) {
if (!error && resp.statusCode == 200) {
......@@ -65,4 +110,5 @@ function getData(url) {
})
}
module.exports = execute_function;
var request = require('request').defaults({ encoding: null });;
var async = require('async');
var parser = require('xml2json');
exports.getValues = function(json, cb){
exports.getValues = function(dataSet, cb){
let dataTemplate = {
"type": json.xhr.IO.Type,
"unit": json.xhr.IO.Unit,
"value_type": json.xhr.IO.ValueType,
"image_type": json.xhr.IO.Name,
"type": "",
"unit": "",
"value_type": "",
"image_type": "",
"values": []
};
let values = [];
if(typeof json.xhr.IO.Data.length === "undefined"){
getImage(json.xhr.IO.Data.Value).then((data) =>{
values.push({"observeddatetime": json.xhr.IO.Data.IODateTime, "value": data});
//console.log(dataSet.length);
var i = 0;
async.whilst(function() { return i < dataSet.length;}, function(callbackFn) {
let json = parser.toJson(dataSet[i], {object: true});
i++;
dataTemplate.type = json.xhr.IO.Type;
dataTemplate.unit = json.xhr.IO.Unit;
dataTemplate.value_type = json.xhr.IO.ValueType;
dataTemplate.image_type = json.xhr.IO.Name;
if(parseInt(json.xhr.IO.Record) > 0){
if(parseInt(json.xhr.IO.Record) === 1){
getImage(json.xhr.IO.Data.Value).then((data) =>{
values.push({"observeddatetime": json.xhr.IO.Data.IODateTime, "value": data});
}).catch((err) => {
cb(err);
})
callbackFn();
}
else{
var idx = 0;
async.whilst(function() { return idx < json.xhr.IO.Data.length;}, function(callback) {
let d = json.xhr.IO.Data[idx];
idx++;
//console.log(d.Value);
getImage(d.Value).then((data) =>{
values.push({"observeddatetime": d.IODateTime, "value": data});
//var bitmap = new Buffer(data, 'base64');
//fs.writeFileSync("./result.jpg", bitmap);
callback();
}).catch((err) => {
callback(err);
});
}, function(err) {
if( err ) {
console.log(err);
}
callbackFn();
});
}
}
else callbackFn();
}, function(err) {
if( err ) {
console.log(err);
} else {
if(values.length > 0){
dataTemplate.values = values;
cb(dataTemplate);
}
else cb(null);
}).catch((err) => {
cb(err);
})
}
else{
var idx = 0;
async.whilst(function() { return idx < json.xhr.IO.Data.length;}, function(callback) {
let d = json.xhr.IO.Data[idx];
idx++;
}
});
getImage(d.Value).then((data) =>{
values.push({"observeddatetime": d.IODateTime, "value": data});
//var bitmap = new Buffer(data, 'base64');
//fs.writeFileSync("./result.jpg", bitmap);
callback();
}).catch((err) => {
callback(err);
});
}, function(err) {
if( err ) {
console.log(err);
} else {
if(values.length > 0){
dataTemplate.values = values;
cb(dataTemplate);
}
else cb(null);
}
});
}
}
......@@ -62,11 +81,11 @@ exports.getValues = function(json, cb){
function getImage(url) {
return new Promise((resolve, reject) => {
request(url, function (error, resp, body) {
if (!error && resp.statusCode == 200) {
resolve("data:" + resp.headers["content-type"] + ";base64," + new Buffer(body).toString('base64'));
}else{
return reject(error);
}
})
if (!error && resp.statusCode == 200) {
resolve("data:" + resp.headers["content-type"] + ";base64," + new Buffer(body).toString('base64'));
}else{
return reject(error);
}
})
})
}
\ No newline at end of file
exports.getValues = function(json, callback){
var parser = require('xml2json');
var idx = 0;
exports.getValues = function(dataSet, callback){
let dataTemplate = {
"type": json.xhr.IO.Type,
"unit": json.xhr.IO.Unit,
"value_type": json.xhr.IO.ValueType,
"type": "",
"unit": "",
"value_type": "",
"values": []
};
let values = [];
if(typeof json.xhr.IO.Data.length === "undefined"){
values.push({"observeddatetime": json.xhr.IO.Data.IODateTime, "value": json.xhr.IO.Data.Value});
}
else{
for (var i = 0; i < json.xhr.IO.Data.length; i++) {
let d = json.xhr.IO.Data[i];
values.push({"observeddatetime": d.IODateTime, "value": d.Value});
for (var i = 0; i < dataSet.length; i++) {
let json = parser.toJson(dataSet[i], {object: true});
dataTemplate.type = json.xhr.IO.Type;
dataTemplate.unit = json.xhr.IO.Unit;
dataTemplate.value_type = json.xhr.IO.ValueType;
if(parseInt(json.xhr.IO.Record) > 0){
if(parseInt(json.xhr.IO.Record) === 1){
values.push({"observeddatetime": json.xhr.IO.Data.IODateTime, "value": json.xhr.IO.Data.Value});
}
else{
for (var j = 0; j < json.xhr.IO.Data.length; j++) {
let d = json.xhr.IO.Data[j];
values.push({"observeddatetime": d.IODateTime, "value": d.Value});
}
}
}
}
//console.log("values.langth >>>" + values.length);
if(values.length > 0){
dataTemplate.values = values;
callback(dataTemplate);
}
else callback(null);
}
\ No newline at end of file
}
// exports.getValues = function(json, callback){
// let dataTemplate = {
// "type": json.xhr.IO.Type,
// "unit": json.xhr.IO.Unit,
// "value_type": json.xhr.IO.ValueType,
// "values": []
// };
// let values = [];
// if(typeof json.xhr.IO.Data.length === "undefined"){
// values.push({"observeddatetime": json.xhr.IO.Data.IODateTime, "value": json.xhr.IO.Data.Value});
// }
// else{
// for (var i = 0; i < json.xhr.IO.Data.length; i++) {
// let d = json.xhr.IO.Data[i];
// values.push({"observeddatetime": d.IODateTime, "value": d.Value});
// }
// }
// //console.log("values.langth >>>" + values.length);
// if(values.length > 0){
// dataTemplate.values = values;
// callback(dataTemplate);
// }
// else callback(null);
// }
\ No newline at end of file
exports.getValues = function(json, callback){
var parser = require('xml2json');
var idx = 0;
exports.getValues = function(dataSet, callback){
let dataTemplate = {
"type": json.xhr.IO.Type,
"unit": json.xhr.IO.Unit,
"value_type": json.xhr.IO.ValueType,
"type": "",
"unit": "",
"value_type": "",
"values": []
};
let values = [];
if(typeof json.xhr.IO.Data.length === "undefined"){
values.push({"observeddatetime": json.xhr.IO.Data.IODateTime, "value": json.xhr.IO.Data.Value});
}
else{
for (var i = 0; i < json.xhr.IO.Data.length; i++) {
let d = json.xhr.IO.Data[i];
values.push({"observeddatetime": d.IODateTime, "value": d.Value, "direction": d.Direction});
for (var i = 0; i < dataSet.length; i++) {
let json = parser.toJson(dataSet[i], {object: true});
dataTemplate.type = json.xhr.IO.Type;
dataTemplate.unit = json.xhr.IO.Unit;
dataTemplate.value_type = json.xhr.IO.ValueType;
if(parseInt(json.xhr.IO.Record) > 0){
if(parseInt(json.xhr.IO.Record) === 1){
values.push({"observeddatetime": json.xhr.IO.Data.IODateTime, "value": json.xhr.IO.Data.Value, "direction": json.xhr.IO.Data.Direction});
}
else{
for (var j = 0; j < json.xhr.IO.Data.length; j++) {
let d = json.xhr.IO.Data[j];
values.push({"observeddatetime": d.IODateTime, "value": d.Value, "direction": d.Direction});
}
}
}
}
//console.log("values.langth >>>" + values.length);
if(values.length > 0){
dataTemplate.values = values;
callback(dataTemplate);
}
else callback(null);
}
\ No newline at end of file
// async.whilst(() => {return idx < dataSet.length;}, (cb) =>{
// let json = parser.toJson(dataSet[idx], {object: true});
// dataTemplate.type = json.xhr.IO.Type;
// dataTemplate.unit = json.xhr.IO.Unit;
// dataTemplate.value_type = json.xhr.IO.ValueType;
// idx++;
// if(typeof json.xhr.IO.Data.length === "undefined"){
// values.push({"observeddatetime": json.xhr.IO.Data.IODateTime, "value": json.xhr.IO.Data.Value, "direction": json.xhr.IO.Data.Direction});
// process.nextTick(function(){
// cb();
// })
// }
// else{
// for (var i = 0; i < json.xhr.IO.Data.length; i++) {
// let d = json.xhr.IO.Data[i];
// values.push({"observeddatetime": d.IODateTime, "value": d.Value, "direction": d.Direction});
// }
// process.nextTick(function(){
// cb();
// })
// }
// }, function(err) {
// if( err ) {
// console.log(err);
// } else {
// //console.log("values.langth >>>" + values.length);
// if(values.length > 0){
// dataTemplate.values = values;
// callback(dataTemplate);
// }
// else callback(null);
// }
// });
}
// exports.getValues = function(json, callback){
// let dataTemplate = {
// "type": json.xhr.IO.Type,
// "unit": json.xhr.IO.Unit,
// "value_type": json.xhr.IO.ValueType,
// "values": []
// };
// let values = [];
// if(typeof json.xhr.IO.Data.length === "undefined"){
// values.push({"observeddatetime": json.xhr.IO.Data.IODateTime, "value": json.xhr.IO.Data.Value});
// }
// else{
// for (var i = 0; i < json.xhr.IO.Data.length; i++) {
// let d = json.xhr.IO.Data[i];
// values.push({"observeddatetime": d.IODateTime, "value": d.Value, "direction": d.Direction});
// }
// }
// //console.log("values.langth >>>" + values.length);
// if(values.length > 0){
// dataTemplate.values = values;
// callback(dataTemplate);
// }
// else callback(null);
// }
\ No newline at end of file
var async = require('async');
var parser = require('xml2json');
var fs = require('fs');
var agriParser = require('./parser/agri_parser_factory');
function perform_function(context,request,response){
var job_id = context.jobconfig.job_id;
var transaction_id = context.transaction.id;
var param = context.jobconfig.data_transform.param;
var memstore = context.task.memstore
var memstore = context.job.memstore
// var memstore = context.task.memstore
var output_type = request.input_type;
var di_data = request.data;
var agriParser = require('./parser/agri_parser_factory');
var async = require('async');
var parser = require('xml2json');
var fs = require('fs');
let idx = 0;
let result = {
"object_type": 'iBitz',
......@@ -21,23 +22,28 @@ function perform_function(context,request,response){
"data":[]
};
//console.log(json_table.length);
async.whilst(function() { return idx < di_data.data.length;}, function(callback) {
let dtype = di_data.data[idx].data_types;
let json = parser.toJson(di_data.data[idx].value, {object: true});
idx++;
if(typeof json.xhr.IO.Data !== 'undefined') {
agriParser.getParser(json.xhr.IO.Type).getValues(json, (values) => {
if(di_data.data[idx].value.length > 0){
let json = parser.toJson(di_data.data[idx].value[0], {object: true});
agriParser.getParser(json.xhr.IO.Type).getValues(di_data.data[idx].value, function(values) {
idx++;
if(values !== null){
result.latitude = json.xhr.IO.Latitude;
result.longitude = json.xhr.IO.Longitude;
result.data.push(values);
console.log(`${di_data.station_id}-${dtype} ::: ` + values.values[values.values.length-1].observeddatetime);
//memstore.setItem(`${di_data.station_id}-${dtype}`, values.values[di_data.data.length-1].observeddatetime);
callback();
//console.log(`STAMP : ${di_data.station_id}-${dtype} = `+ values.values[values.values.length-1].observeddatetime);
memstore.setItem(`${di_data.station_id}-${dtype}`, values.values[values.values.length-1].observeddatetime, (err) =>{
if(err) throw err;
callback();
});
}
else callback();
});
}
}, function(err) {
if( err ) {
......@@ -45,24 +51,10 @@ function perform_function(context,request,response){
} else {
fs.writeFileSync("./result.json", JSON.stringify(result));
//console.log(JSON.stringify(result));
response.success(result,output_type);
response.success(result, output_type);
}
});
// memstore.setItem('lasttransaction',transaction_id,function(err){
// response.success(data);
// });
// memstore.getItem('lasttransaction',function(err,value){
// response.success(value);
// });
//data = data + "--DT--"
//response.reject();
//response.error("error message")
}
module.exports = perform_function;
......@@ -3,7 +3,7 @@
"active" : true,
"trigger" : {
"type": "cron",
"cmd": "*/10 * * * *"
"cmd": "* */10 * * *"
},
"data_in" : {
"type": "agritronics",
......@@ -26,19 +26,19 @@
{"type": "8", "node_id": "4096"},
{"type": "10", "node_id": "4096"},
{"type": "2021", "node_id": "7328"},
{"type": "2022", "node_id": "7328"}
{"type": "2022", "node_id": "7328"}
],
"init_observed_date": "2017-01-20",
"init_observed_time": "00:00:00"
"init_observed_date": "2017-02-05",
"init_observed_time": "11:30:00"
}
},
"data_transform" : {
"type": "agritronics"
},
"data_out" : {
"type": "console",
"type": "storage",
"param" : {
"name": "sds.agritronics"
"storage_name": "sds.agritronics"
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment