Commit d15a1f6e authored by Kamron Aroonrua's avatar Kamron Aroonrua 💬

tanpibut plugin

parent f403b191
var util = require('util');
var DIPlugin = require('../di-plugin');
function DITask(context){
DIPlugin.call(this,context);
this.name = "tanpibut";
this.output_type = "text";
}
util.inherits(DITask,DIPlugin);
DITask.prototype.perform = require('./perform');
module.exports = DITask;
{
"name": "di-tanpibut",
"version": "0.0.1",
"lockfileVersion": 1
}
{
"name": "di-tanpibut",
"version": "0.0.1",
"description": "",
"main": "index.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
},
"keywords": [
"bigstream"
],
"author": "lsr.bigstream",
"license": "ISC",
"dependencies": {}
}
var request = require('request');
var moment = require('moment');
var cts = moment();
let result = null;
let loopCount = 0;
var param = null;
var continue_state = null;
function execute_function(context,response){
var job_id = context.jobconfig.job_id;
var transaction_id = context.transaction_id;
param = context.jobconfig.data_in.param;
// var memstore = context.task.memstore;
var memstore = context.job.memstore;
var output_type = "object/tanpibut";
// var data = 'hello world';
cts = moment().hours(0).minutes(0).seconds(0);
//console.log("moment cts: " + cts.format("YYYY-MM-DD,HH:mm:ss"));
result = {
"object_type": 'tanpibut',
"station_id": param.station_id,
"data_type": param.data_type.type,
"data":[]
};
let dtype = param.data_type.type;
let node_id = param.data_type.node_id;
memstore.getItem(`${param.station_id}-${dtype}`, function(err, lts){ //latest timestamp, format: yyyy-MM-dd HH:mm:ss
if (!lts) lts = moment(`${param.init_observed_date} ${param.init_observed_time}`);
else lts = moment(lts).add(1, 'seconds');
if(!param.recover){
let diffdate = cts.diff(lts, 'days') + 1;
// console.log('diff = ' + diffdate);
if(diffdate > param.limit){
lts = moment(cts).add(-(param.limit), 'days');
// console.log(lts.format("YYYY-MM-DD,HH:mm:ss"))
}
}
let url = param.url + `?appkey=${param.appkey}&p=${param.station_id},${node_id},${dtype}`;
getData(url, lts, dtype, (err) => {
if(err) {
console.log(err);
response.error(err);
}
else{
// console.log("result: " + result);
if(param.recover && continue_state){
response.success(result, {"output_type": output_type, "continue": true});
}
else {
response.success(result, {"output_type": output_type});
}
}
});
});
}
function getData(url, lts, dtype, callback) {
loopCount = 0;
continue_state = true;
let req = url + `,${lts.format("YYYY-MM-DD,HH:mm:ss")}`;
console.log(req);
requestData(req).then((data) => {
loopCount++;
if(data.search('result=""') > -1){
result.data.push({"value": data});
}
if(param.recover){
if(loopCount >= param.limit){
result['timestamp'] = lts.hours(23).minutes(55).seconds(0).format("YYYY-MM-DD HH:mm:ss");
callback();
}
else beforeDateCheck(cts, lts);
}
else beforeDateCheck(cts, lts);
function beforeDateCheck(ct, lt){
if (lt.isBefore(ct, 'days')) {
lt.add(1, 'days').hours(0).minutes(0).seconds(0);
req = url + `,${lt.format("YYYY-MM-DD,HH:mm:ss")}`;
console.log(req);
requestData(req).then((val) => {
loopCount++;
if(val.search('result=""') > -1){
result.data.push({"value": val});
}
if(param.recover){
if(loopCount >= param.limit){
result['timestamp'] = lts.hours(23).minutes(55).seconds(0).format("YYYY-MM-DD HH:mm:ss");
callback();
}
else beforeDateCheck(ct, lt);
}
else beforeDateCheck(ct, lt);
}).catch((err) => {
callback(err);
});
}
else {
continue_state = false;
result['timestamp'] = lts.format("YYYY-MM-DD HH:mm:ss");
callback();
}
}
}).catch((err) => {
callback(err);
});
}
function requestData(url) {
return new Promise((resolve, reject) => {
request(url, function (error, resp, body) {
if (!error && resp.statusCode == 200) {
resolve(body);
}else{
return reject(error);
}
})
})
}
module.exports = execute_function;
var util = require('util');
var DTPlugin = require('../dt-plugin');
function DTTask(context,request){
DTPlugin.call(this,context,request);
this.name = "tanpibut";
this.output_type = "";
}
util.inherits(DTTask,DTPlugin);
DTTask.prototype.perform = require('./perform');
module.exports = DTTask;
{
"name": "dt-tanpibut",
"version": "0.0.1",
"lockfileVersion": 1,
"requires": true,
"dependencies": {
"async": {
"version": "2.5.0",
"resolved": "https://registry.npmjs.org/async/-/async-2.5.0.tgz",
"integrity": "sha512-e+lJAJeNWuPCNyxZKOBdaJGyLGHugXVQtrAwtuAe2vhxTYxFTKE73p8JuTmdH0qdQZtDvI4dhJwjZc5zsfIsYw==",
"requires": {
"lodash": "4.17.4"
}
},
"bindings": {
"version": "1.3.0",
"resolved": "https://registry.npmjs.org/bindings/-/bindings-1.3.0.tgz",
"integrity": "sha512-DpLh5EzMR2kzvX1KIlVC0VkC3iZtHKTgdtZ0a3pglBZdaQFjt5S9g9xd1lE+YvXyfd6mtCeRnrUfOLYiTMlNSw=="
},
"hoek": {
"version": "4.2.0",
"resolved": "https://registry.npmjs.org/hoek/-/hoek-4.2.0.tgz",
"integrity": "sha512-v0XCLxICi9nPfYrS9RL8HbYnXi9obYAeLbSP00BmnZwCK9+Ih9WOjoZ8YoHCoav2csqn4FOz4Orldsy2dmDwmQ=="
},
"isemail": {
"version": "2.2.1",
"resolved": "https://registry.npmjs.org/isemail/-/isemail-2.2.1.tgz",
"integrity": "sha1-A1PT2aYpUQgMJiwqoKQrjqjp4qY="
},
"items": {
"version": "2.1.1",
"resolved": "https://registry.npmjs.org/items/-/items-2.1.1.tgz",
"integrity": "sha1-i9FtnIOxlSneWuoyGsqtp4NkoZg="
},
"joi": {
"version": "9.2.0",
"resolved": "https://registry.npmjs.org/joi/-/joi-9.2.0.tgz",
"integrity": "sha1-M4WseQGSEwy+Iw6ALsAskhW7/to=",
"requires": {
"hoek": "4.2.0",
"isemail": "2.2.1",
"items": "2.1.1",
"moment": "2.18.1",
"topo": "2.0.2"
}
},
"lodash": {
"version": "4.17.4",
"resolved": "https://registry.npmjs.org/lodash/-/lodash-4.17.4.tgz",
"integrity": "sha1-eCA6TRwyiuHYbcpkYONptX9AVa4="
},
"moment": {
"version": "2.18.1",
"resolved": "https://registry.npmjs.org/moment/-/moment-2.18.1.tgz",
"integrity": "sha1-w2GT3Tzhwu7SrbfIAtu8d6gbHA8="
},
"nan": {
"version": "2.6.2",
"resolved": "https://registry.npmjs.org/nan/-/nan-2.6.2.tgz",
"integrity": "sha1-5P805slf37WuzAjeZZb0NgWn20U="
},
"node-expat": {
"version": "2.3.16",
"resolved": "https://registry.npmjs.org/node-expat/-/node-expat-2.3.16.tgz",
"integrity": "sha512-e3HyQI0lk5CXyYQ4RsDYGiWdY5LJxNMlNCzo4/gwqY8lhYIeTf5VwGirGDa1EPrcZROmOR37wHuFVnoHmOWnOw==",
"requires": {
"bindings": "1.3.0",
"nan": "2.6.2"
}
},
"topo": {
"version": "2.0.2",
"resolved": "https://registry.npmjs.org/topo/-/topo-2.0.2.tgz",
"integrity": "sha1-zVYVdSU5BXwNwEkaYhw7xvvh0YI=",
"requires": {
"hoek": "4.2.0"
}
},
"xml2json": {
"version": "0.11.0",
"resolved": "https://registry.npmjs.org/xml2json/-/xml2json-0.11.0.tgz",
"integrity": "sha1-HVTx2GjbvQSJK4RdfLrZTFKOFuQ=",
"requires": {
"hoek": "4.2.0",
"joi": "9.2.0",
"node-expat": "2.3.16"
}
}
}
}
{
"name": "dt-tanpibut",
"version": "0.0.1",
"description": "",
"main": "index.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
},
"keywords": [
"bigstream"
],
"author": "lsr.bigstream",
"license": "ISC",
"dependencies": {
"async": "^2.1.2",
"xml2json": "^0.11.0"
}
}
var simpleParser = require('./simple_parser');
var windDirParser = require('./wind_direction_parser');
var imageParser = require('./image_parser');
exports.getParser = function(type){
if(type === "Camera") return imageParser;
else if(type === "Wind Direction Degree") return windDirParser;
else return simpleParser;
}
\ No newline at end of file
var request = require('request').defaults({ encoding: null });;
var async = require('async');
var parser = require('xml2json');
exports.getValues = function(dataSet, cb){
let dataTemplate = {
"type": "",
"unit": "",
"value_type": "",
"image_type": "",
"values": []
};
let values = [];
//console.log(dataSet.length);
let json = parser.toJson(dataSet, {object: true}).xhr.IO;
dataTemplate.type = json.Type;
dataTemplate.unit = json.Unit;
dataTemplate.value_type = json.ValueType;
dataTemplate.image_type = json.Name;
if(parseInt(json.Record) > 0){
if(parseInt(json.Record) === 1){
getImage(json.Data.Value).then((data) =>{
dataTemplate.values.push({"observeddatetime": json.Data.IODateTime, "value": data});
cb(dataTemplate);
}).catch((err) => {
cb(err);
})
}
else{
var idx = 0;
async.whilst(function() { return idx < json.Data.length;}, function(callback) {
let d = json.Data[idx];
idx++;
//console.log(d.Value);
getImage(d.Value).then((data) =>{
values.push({"observeddatetime": d.IODateTime, "value": data});
//var bitmap = new Buffer(data, 'base64');
//fs.writeFileSync("./result.jpg", bitmap);
callback();
}).catch((err) => {
callback(err);
});
}, function(err) {
if( err ) {
console.log(err);
cb(err);
}
if(values.length > 0){
dataTemplate.values = values;
cb(dataTemplate);
}
else cb(null);
});
}
}
else cb(null);
}
function getImage(url) {
return new Promise((resolve, reject) => {
request(url, function (error, resp, body) {
if (!error && resp.statusCode == 200) {
resolve("data:" + resp.headers["content-type"] + ";base64," + new Buffer(body).toString('base64'));
}else{
return reject(error);
}
})
})
}
\ No newline at end of file
var parser = require('xml2json');
var idx = 0;
exports.getValues = function(dataSet, callback){
let dataTemplate = {
"type": "",
"unit": "",
"value_type": "",
"latitude": "",
"longitude": "",
"values": []
};
let values = [];
let json = parser.toJson(dataSet, {object: true}).xhr.IO;
dataTemplate.type = json.Type;
dataTemplate.unit = json.Unit;
dataTemplate.value_type = json.ValueType;
dataTemplate.latitude = json.Latitude;
dataTemplate.longitude = json.Longitude;
if(parseInt(json.Record) > 0){
if(parseInt(json.Record) === 1){
values.push({"observeddatetime": json.Data.IODateTime, "value": json.Data.Value});
}
else{
for (var j = 0; j < json.Data.length; j++) {
let d = json.Data[j];
values.push({"observeddatetime": d.IODateTime, "value": d.Value});
}
}
}
if(values.length > 0){
dataTemplate.values = values;
callback(dataTemplate);
}
else callback(null);
}
// exports.getValues = function(json, callback){
// let dataTemplate = {
// "type": json.xhr.IO.Type,
// "unit": json.xhr.IO.Unit,
// "value_type": json.xhr.IO.ValueType,
// "values": []
// };
// let values = [];
// if(typeof json.xhr.IO.Data.length === "undefined"){
// values.push({"observeddatetime": json.xhr.IO.Data.IODateTime, "value": json.xhr.IO.Data.Value});
// }
// else{
// for (var i = 0; i < json.xhr.IO.Data.length; i++) {
// let d = json.xhr.IO.Data[i];
// values.push({"observeddatetime": d.IODateTime, "value": d.Value});
// }
// }
// //console.log("values.langth >>>" + values.length);
// if(values.length > 0){
// dataTemplate.values = values;
// callback(dataTemplate);
// }
// else callback(null);
// }
\ No newline at end of file
var parser = require('xml2json');
var idx = 0;
exports.getValues = function(dataSet, callback){
let dataTemplate = {
"type": "",
"unit": "",
"value_type": "",
"values": []
};
let values = [];
let json = parser.toJson(dataSet, {object: true}).xhr.IO;
dataTemplate.type = json.Type;
dataTemplate.unit = json.Unit;
dataTemplate.value_type = json.ValueType;
if(parseInt(json.Record) > 0){
if(parseInt(json.Record) === 1){
values.push({"observeddatetime": json.Data.IODateTime, "value": json.Data.Value, "direction": json.Data.Direction});
}
else{
for (var j = 0; j < json.Data.length; j++) {
let d = json.Data[j];
values.push({"observeddatetime": d.IODateTime, "value": d.Value, "direction": d.Direction});
}
}
}
if(values.length > 0){
dataTemplate.values = values;
callback(dataTemplate);
}
else callback(null);
// async.whilst(() => {return idx < dataSet.length;}, (cb) =>{
// let json = parser.toJson(dataSet[idx], {object: true});
// dataTemplate.type = json.xhr.IO.Type;
// dataTemplate.unit = json.xhr.IO.Unit;
// dataTemplate.value_type = json.xhr.IO.ValueType;
// idx++;
// if(typeof json.xhr.IO.Data.length === "undefined"){
// values.push({"observeddatetime": json.xhr.IO.Data.IODateTime, "value": json.xhr.IO.Data.Value, "direction": json.xhr.IO.Data.Direction});
// process.nextTick(function(){
// cb();
// })
// }
// else{
// for (var i = 0; i < json.xhr.IO.Data.length; i++) {
// let d = json.xhr.IO.Data[i];
// values.push({"observeddatetime": d.IODateTime, "value": d.Value, "direction": d.Direction});
// }
// process.nextTick(function(){
// cb();
// })
// }
// }, function(err) {
// if( err ) {
// console.log(err);
// } else {
// //console.log("values.langth >>>" + values.length);
// if(values.length > 0){
// dataTemplate.values = values;
// callback(dataTemplate);
// }
// else callback(null);
// }
// });
}
// exports.getValues = function(json, callback){
// let dataTemplate = {
// "type": json.xhr.IO.Type,
// "unit": json.xhr.IO.Unit,
// "value_type": json.xhr.IO.ValueType,
// "values": []
// };
// let values = [];
// if(typeof json.xhr.IO.Data.length === "undefined"){
// values.push({"observeddatetime": json.xhr.IO.Data.IODateTime, "value": json.xhr.IO.Data.Value});
// }
// else{
// for (var i = 0; i < json.xhr.IO.Data.length; i++) {
// let d = json.xhr.IO.Data[i];
// values.push({"observeddatetime": d.IODateTime, "value": d.Value, "direction": d.Direction});
// }
// }
// //console.log("values.langth >>>" + values.length);
// if(values.length > 0){
// dataTemplate.values = values;
// callback(dataTemplate);
// }
// else callback(null);
// }
\ No newline at end of file
var async = require('async');
//var fs = require('fs');
var agriParser = require('./parser/agri_parser_factory');
function perform_function(context,request,response){
var job_id = context.jobconfig.job_id;
var transaction_id = context.transaction.id;
var memstore = context.job.memstore
// var memstore = context.task.memstore
var output_type = "object/sds";
let di_data = request.data;
let dtype = di_data.data_type;
let out = {
"object_type": 'Tanpibut',
"station_id": di_data.station_id,
"data_type": dtype,
"latitude": "",
"longitude": "",
"altitude": "",
"unit": "",
"value_type": "",
"type": "",
"data":[]
};
// for (var k = 0; k < dataKeySeries.length; k++) {
// console.log(di_data.data[dataKeySeries[k]]);
// }
let idx = 0;
async.whilst(function() { return idx < di_data.data.length;}, function(callback) {
agriParser.getParser(context.jobconfig.data_in.param.data_type.name).getValues(di_data.data[idx].value, function(value) {
idx++;
if(value !== null){
out.latitude = value.latitude;
out.longitude = value.longitude;
out.unit = value.unit;
out.value_type = value.value_type;
out.type = value.type;
for(var i=0; i<value.values.length; i++){
out.data.push(value.values[i]);
}
//console.log(`STAMP : ${di_data.station_id}-${dtype} = `+ value.value[value.value.length-1].observeddatetime);
memstore.setItem(`${di_data.station_id}-${dtype}`, value.values[value.values.length-1].observeddatetime, (err) =>{
if(err){
throw err;
callback(err);
}
else callback();
});
}
else callback();
});
}, function(err) {
if( err ) {
console.log(err);
response.error(err);
} else {
//fs.writeFileSync("./result.json", JSON.stringify(out));
if(out.data.length > 0){
// console.log("Result" + JSON.stringify(out));
response.success(out, output_type);
}
else {
if(context.jobconfig.data_in.param.recover){
memstore.setItem(`${di_data.station_id}-${dtype}`, di_data.timestamp, (err) =>{
response.reject();
});
}
else response.reject();
}
}
});
}
module.exports = perform_function;
{
"job_id" : "123",
"active" : true,
"trigger" : {
"type": "cron",
"cmd": "15,45 * * * *"
},
"data_in" : {
"type": "tanpibut",
"profile": {
"station_id": "CM-LS-155",
"latitude": "",
"longitude": ""
},
"param": {
"recover": false,
"url": "http://122.155.1.142/ws/get2.php",
"appkey": "0c5a295bd8c07a0806411b79e1fd73",
"station_id": "CM-LS-150",
"data_type": {"type": "6", "name": "Rain", "node_id": "4096"},
"init_observed_date": "2017-08-05",
"init_observed_time": "00:00:00",
"limit": 5
}
},
"data_transform" : {
"type": "tanpibut"
},
"data_out" : {
"type": "bssfile",
"param": {
"filename" : "F:/testfile/MyBss.bss"
}
}
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment