Commit a8c89740 authored by Kamron Aroonrua's avatar Kamron Aroonrua 💬

Merge branch 'ondev-gcs' into 'master'

Ondev gcs

See merge request !51
parents dcabe99b f6566ad4
stages:
- build
- push
variables:
IMAGE_NAME: "registry.igridproject.info/bs/node-bigstream"
docker-build:
stage: build
tags:
- igrid
- gitlab
script:
- echo "Building Docker image for $CI_COMMIT_REF_NAME"
- docker build -t $IMAGE_NAME:$CI_COMMIT_REF_NAME .
- if [ "$CI_COMMIT_REF_NAME" = "master" ]; then
docker tag $IMAGE_NAME:$CI_COMMIT_REF_NAME $IMAGE_NAME:latest;
fi
only:
- branches
- tags
docker-push:
stage: push
tags:
- igrid
- gitlab
script:
- echo "Logging into registry $CI_REGISTRY"
- echo "$CI_REGISTRY_PASSWORD" | docker login -u "$CI_REGISTRY_USER" --password-stdin $CI_REGISTRY
- echo "Pushing Docker image $IMAGE_NAME:$CI_COMMIT_REF_NAME"
- docker push $IMAGE_NAME:$CI_COMMIT_REF_NAME
- if [ "$CI_COMMIT_REF_NAME" = "master" ]; then
docker push $IMAGE_NAME:latest;
fi
only:
- branches
- tags
needs:
- job: docker-build
#Changelog
# Changelog
## [1.2.6dev] - 2024-12-01
### Added
- TRIGGER :: http trigger callback
- PLUGIN :: do-http-callback
- PLUGIN :: dt-pgsql
### Update
- PLUGIN :: dt-http add method delete patch
- PLUGIN :: do-http add method delete patch
- PLUGIN :: do-http,dt-http add body_type=form-data
### Removed
- PLUGIN :: di-gistda-air
- PLUGIN :: di-tanpibut
- PLUGIN :: di-agritronics
- PLUGIN :: dt-agritronics
- PLUGIN :: dt-agritronics-tpb
- PLUGIN :: dt-gistda-air
- PLUGIN :: dt-gistda-water
- PLUGIN :: dt-tanpibut
## [1.2.5] - 2021-10-07 -> 2023-05-21R
### Added
- DOCKER :: TZ Data
......
FROM node:12-alpine
FROM node:14-alpine
RUN apk add make gcc g++ linux-headers udev
......@@ -9,7 +9,7 @@ WORKDIR /app/node-bigstream
RUN npm install
RUN node script/install_plugins.js
FROM node:12-alpine
FROM node:14-alpine
RUN apk add --no-cache python3
RUN apk add --no-cache tzdata
......
FROM node:12-alpine
RUN apk add make gcc g++ linux-headers udev
COPY . /app/node-bigstream
WORKDIR /app/node-bigstream
RUN npm install
RUN node script/install_plugins.js
FROM node:12-alpine
RUN apk add --no-cache python3
RUN apk add --no-cache tzdata
COPY --from=0 /app/node-bigstream /app/node-bigstream
RUN npm install -y pm2@latest -g
RUN mkdir -p /var/bigstream/data
EXPOSE 19980 19080 19180
# start server
WORKDIR /app/node-bigstream
CMD pm2-runtime pm2.config.js
\ No newline at end of file
File mode changed from 100755 to 100644
......@@ -11,6 +11,8 @@ var EvenPub = ctx.getLib('lib/amqp/event-pub');
var QueueCaller = ctx.getLib('lib/amqp/queuecaller');
var EvenSub = ctx.getLib('lib/amqp/event-sub');
const EventEmitter = require('events')
const JOBCHANEL = 'bs_job_cmd';
const API_PORT = 19180;
......@@ -26,7 +28,8 @@ function HTTPListener(cfg)
this.httpacl = HttpACL.create({'conn':this.config.memstore.url});
this.jobcaller = new QueueCaller({'url':this.config.amqp.url,'name':'bs_jobs_cmd'});
this.evs = new EvenSub({'url':this.config.amqp.url,'name':'bs_trigger_cmd'});
//this.evp = new EvenPub({'url':this.config.amqp.url,'name':JOBCHANEL});
this.msgrecv = new EvenSub({'url':this.config.amqp.url,'name':'bs_msg_bus'});
this.httpcb = new EventEmitter()
}
HTTPListener.prototype.start = function()
......@@ -40,6 +43,11 @@ HTTPListener.prototype._http_start = function()
{
var self = this;
self.msgrecv.sub('msg.httpcb.#' ,function(err,msg){
var ssid = msg.topic.split('.')[2]
self.httpcb.emit(ssid,msg)
})
this.httpacl.update(function(err){
if(!err){
console.log('WWW:ACL Update\t\t[OK]');
......@@ -61,13 +69,13 @@ HTTPListener.prototype._http_start = function()
var context = require('./lib/http-context');
app.use(context.middleware({
'httpacl' : self.httpacl,
'jobcaller' : self.jobcaller
'jobcaller' : self.jobcaller,
'httpcb' : self.httpcb
}));
app.use(require('./ws'));
app.listen(API_PORT, function () {
console.log('WWW:HTTP START\t\t[OK]');
});
......
var ctx = require('../../context');
const uuid = require('uuid');
var async = require('async');
var express = require('express');
var router = express.Router();
......@@ -16,14 +17,20 @@ var process_req = function(req, res ,method) {
var appkey = req.params.akey;
var ctx = req.context;
var session_id = uuid.v4()
var httpacl = req.context.httpacl;
//var evp = req.context.evp;
var jobcaller = req.context.jobcaller;
var httpcb = req.context.httpcb;
var j = httpacl.findJob(appkey,method);
var jmatch = (j.length>0);
var topic_prex = 'cmd.execute.';
var resp_msg = {'status':'OK'}
var cb_timeout = 10000
var cb_response = false
j.forEach(function(item){
var httpdata = {
......@@ -45,6 +52,7 @@ var process_req = function(req, res ,method) {
'source' : 'http_listener',
'jobId' : '',
'option' : {},
'input_meta' : {'_sid':session_id},
'input_data' : {
'type' : 'bsdata',
'value' : {
......@@ -55,22 +63,43 @@ var process_req = function(req, res ,method) {
}
}
var topic = topic_prex + item.jobid;
//HTTP OPTION
var iopt = item.opt||{}
if(iopt.session){ resp_msg.session=session_id }
if(Number(iopt.timeout)>0){cb_timeout=Number(iopt.timeout)}
if(iopt.response){
cb_response=iopt.response
resp_msg.session=session_id
}
req.setTimeout(cb_timeout);
var msg = job_execute_msg;
msg.jobId = item.jobid;
jobcaller.send(msg);
//evp.send(topic,msg);
});
if(j.length > 0)
if(jmatch)
{
respHelper.responseOK({'status':'OK'});
if(cb_response){
httpcb.once(session_id,function(msg){
resp_msg.response=msg.data.data;
if(['data','data_only','json'].includes(String(cb_response).toLocaleLowerCase())){
resp_msg=msg.data.data
}
respHelper.responseOK(resp_msg);
})
}else{
respHelper.responseOK(resp_msg);
}
}else{
respHelper.response403();
}
}
router.get('/:akey',function(req, res){process_req(req,res,'get')});
router.post('/:akey',function(req, res){process_req(req,res,'post')});
......
......@@ -18,6 +18,7 @@ function JobTask (prm)
this.handle = prm.handle;
this.mem = prm.handle.mem;
this.jobcaller = prm.handle.jobcaller;
this.msgsender = prm.handle.msgsender;
this.storagecaller = prm.handle.storagecaller;
this.acl_validator = prm.handle.acl_validator
......@@ -300,10 +301,12 @@ function perform_do(prm,cb)
var DOTask = getPlugins('do',do_cfg.type);
var doMem = new memstore({'job_id':job_id,'cat':'do','mem':prm.handle.mem});
var jobcaller = prm.handle.jobcaller;
var msgsender = prm.handle.msgsender;
var storagecaller = prm.handle.storagecaller;
do_context.task = {
"memstore" : doMem,
"jobcaller" : jobcaller,
"msgsender" : msgsender,
"storagecaller" : storagecaller
}
......
......@@ -3,9 +3,10 @@ var cfg = ctx.config;
var QueueCaller = ctx.getLib('lib/amqp/queuecaller');
var QueueReceiver = ctx.getLib('lib/amqp/queuereceiver');
var EvenPub = ctx.getLib('lib/amqp/event-pub');
var ConnCtx = ctx.getLib('lib/conn/connection-context');
var JobRegistry = ctx.getLib('lib/mems/job-registry');
var SSCaller = ctx.getLib('lib/axon/rpccaller');
//var SSCaller = ctx.getLib('lib/axon/rpccaller');
var RPCCaller = ctx.getLib('lib/amqp/rpccaller');
var ACLValidator = ctx.getLib('lib/auth/acl-validator');
......@@ -30,6 +31,7 @@ var JW = function JobWorker (prm)
this.mem = this.conn.getMemstore();
this.jobcaller = new QueueCaller({'url':this.conn.getAmqpUrl(),'name':'bs_jobs_cmd'});
this.msgsender = new EvenPub({'url':this.conn.getAmqpUrl(),'name':'bs_msg_bus'});
this.job_registry = JobRegistry.create({'redis':this.mem});
this.acl_validator = ACLValidator.create(this.auth_cfg);
......
......@@ -69,7 +69,8 @@ HttpACL.prototype.update = function(cb)
{
var _vo = trigger.vo || '';
var akey = (_vo=='$' || _vo=='')?trigger.appkey:_vo + '.' + trigger.appkey;
var acl = mkACL(akey,trigger.method,trigger.job_id);
var vopt = trigger.opt || trigger.param || {}
var acl = mkACL(akey,trigger.method,trigger.job_id,vopt);
self.add(acl);
}
}
......@@ -79,16 +80,6 @@ HttpACL.prototype.update = function(cb)
});
}
// HttpACL.prototype.commit = function(cb)
// {
// var stracl = JSON.stringify(this.acl);
// this.mem.set(PREFIX,stracl);
//
// if(typeof cb == 'function'){
// cb();
// }
// }
HttpACL.prototype.findJob= function(appkey,method)
{
var jobs = [];
......
{
"name": "node-bigstream",
"description": "",
"version": "1.2.5",
"version": "1.2.6",
"main": "./bigstream.js",
"author": {
"name": "Kamron Aroonrua",
......@@ -36,8 +36,10 @@
"random-access-file": "^1.3.0",
"redis": "^3.1.2",
"request": "^2.79.0",
"socket.io": "^4.6.1",
"thunky": "^1.0.2",
"tiny-worker": "^2.1.1"
"tiny-worker": "^2.1.1",
"uuid": "^9.0.0"
},
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1",
......
var request = require('request');
var async = require('async');
function execute_function(context,response){
var job_id = context.jobconfig.job_id;
var transaction_id = context.transaction_id;
var param = context.jobconfig.data_in.param;
var memstore = context.task.memstore;
var output_type = 'object/agritronics';
var data = 'hello world';
let result = {
"object_type": 'agritronic',
"station_id": param.station_id,
"data":[]
};
let idx = 0;
//console.log(json_table.length);
async.whilst(function() { return idx < param.data_types.length;}, function(callback) {
let dtype = param.data_types[idx].type;
let node_id = param.data_types[idx].node_id;
let ts = memstore.getItem(`${param.station_id}-${dtype}`);
if(typeof(ts) === 'undefined') ts = `${param.init_observed_date},${param.init_observed_time}`;
let url = param.url + `?appkey=${param.appkey}&p=${param.station_id},${node_id},${dtype},${ts}`;
idx++;
getData(url).then((data) => {
if(data.search("denied") === -1){
result.data.push({
"data_types": dtype,
"value" : data
});
callback();
}
}).catch((err) => {
callback(err);
});
}, function(err) {
if( err ) {
response.error(err);
} else {
//console.log(JSON.stringify(result));
response.success(result, output_type);
}
});
}
function getData(url) {
return new Promise((resolve, reject) => {
request(url, function (error, resp, body) {
if (!error && resp.statusCode == 200) {
resolve(body);
}else{
return reject(error);
}
})
})
}
module.exports = execute_function;
var util = require('util');
var DIPlugin = require('../di-plugin');
function DITask(context){
DIPlugin.call(this,context);
this.name = "agritronics";
this.output_type = "text";
}
util.inherits(DITask,DIPlugin);
DITask.prototype.perform = require('./perform');
module.exports = DITask;
{
"name": "di-agritronics-ibitz",
"version": "0.0.1",
"description": "",
"main": "index.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
},
"keywords": [
"bigstream"
],
"author": "lsr.bigstream",
"license": "ISC",
"dependencies": {
"async": "^2.1.2",
"xml2json": "^0.10.0"
}
}
var request = require('request');
var async = require('async');
var moment = require('moment');
var cts = moment();
let result = null;
function execute_function(context,response){
var job_id = context.jobconfig.job_id;
var transaction_id = context.transaction_id;
var param = context.jobconfig.data_in.param;
// var memstore = context.task.memstore;
var memstore = context.job.memstore;
var output_type = "object/agritronics";
var data = 'hello world';
cts = moment().hours(0).minutes(0).seconds(0);
//console.log("moment cts: " + cts.format("YYYY-MM-DD,HH:mm:ss"));
result = {
"object_type": 'agritronic',
"station_id": param.station_id,
"data":{}
};
let idx = 0;
//console.log(json_table.length);
async.whilst(function() { return idx < param.data_types.length;}, function(callback) {
let dtype = param.data_types[idx].type;
let node_id = param.data_types[idx].node_id;
memstore.getItem(`${param.station_id}-${dtype}`, function(err, lts){ //latest timestamp, format: yyyy-MM-dd HH:mm:ss
idx++;
//console.log(`memstore: ${param.station_id}-${dtype} = ${lts}`);
if (!lts) lts = moment(`${param.init_observed_date} ${param.init_observed_time}`);
else lts = moment(lts).add(1, 'seconds');
//console.log(`memstore: ${param.station_id}-${dtype} = ${lts}`);
//let recvTime = cts.diff(lts, "days");
//if(recvTime > 20) lts = new moment().add(-20, 'day').hours(0).minutes(0).seconds(0);
//console.log(cts.format("YYYY-MM-DD,HH:mm:ss") + " <<>> " + lts.format("YYYY-MM-DD,HH:mm:ss"));
let url = param.url + `?appkey=${param.appkey}&p=${param.station_id},${node_id},${dtype}`;
getData(url, lts, dtype, (err) => {
if(err) {
callback(err);
}
else{
callback();
}
});
});
}, function(err) {
if( err ) {
response.error(err);
//response.reject();
} else {
//console.log(JSON.stringify(result));
response.success(result, output_type);
}
});
}
function getData(url, lts, dtype, callback) {
let req = url + `,${lts.format("YYYY-MM-DD,HH:mm:ss")}`;
console.log(req);
requestData(req).then((data) => {
if(data.search("denied") === -1 && data.search("invalid") === -1 && data.search("no data") === -1){
if(typeof result.data[lts.format("YYYYMMDD")] === "undefined") result.data[lts.format("YYYYMMDD")] = [];
result.data[lts.format("YYYYMMDD")].push({
"data_types": dtype,
"value" : data
});
beforeDateCheck(cts, lts);
}
else callback();
function beforeDateCheck(ct, lt){
if (lt.isBefore(ct, 'days')) {
lt.add(1, 'days').hours(0).minutes(0).seconds(0);
req = url + `,${lt.format("YYYY-MM-DD,HH:mm:ss")}`;
console.log(req);
requestData(req).then((val) => {
if(val.search("denied") === -1 && val.search("invalid") === -1 && val.search("no data") === -1){
if(typeof result.data[lt.format("YYYYMMDD")] === "undefined") result.data[lt.format("YYYYMMDD")] = [];
result.data[lt.format("YYYYMMDD")].push({
"data_types": dtype,
"value" : val
});
beforeDateCheck(ct, lt);
}
else callback();
}).catch((err) => {
callback(err);
});
}
else callback();
}
}).catch((err) => {
callback(err);
});
}
function requestData(url) {
return new Promise((resolve, reject) => {
request(url, function (error, resp, body) {
if (!error && resp.statusCode == 200) {
resolve(body);
}else{
return reject(error);
}
})
})
}
module.exports = execute_function;
var util = require('util');
var DIPlugin = require('../di-plugin');
function DITask(context){
DIPlugin.call(this,context);
this.name = "gistda-air";
this.output_type = "jsonobject";
}
util.inherits(DITask,DIPlugin);
DITask.prototype.perform = require('./perform');
module.exports = DITask;
var Client = require('ftp');
var async = require('async');
var fs = require('fs');
var host ="203.150.19.51";
var port = "21";
var user = "bs";
var pwd = "UF13kczHdCPXpBb";
var main_folder = "GISTDA_SOS_DATA"
var init_observed_date = "2017-11-21";
var init_observed_time = "09:00:00";
var di_plugin = "sftp-filesync";
var data_source = "gistda-air"
var job_path = "/Users/apple/Project@Nectec/i-bitz/jobs/gistda_air_job";
var stationTable = require('hashTable');
var config = {
host: host,
port: port,
user: user,
password: pwd
};
var c = new Client();
var stationTable = new stationTable();
fs.readFile('/Users/apple/Project@Nectec/i-bitz/data_sample/FromKPrasong/Gistda_Air_Station_Profile.json', function(err, data) {
if (err) throw err;
var profile = JSON.parse(data);
var features = profile.features;
for (var i=0; i<features.length; i++) {
var properties = features[i].properties;
stationTable.put(properties.ftp_folder_mapping, {latitude: properties.lat, longitude: properties.long});
}
// var location = stationTable.get('STATION1_KORAT');
});
c.on('ready', function() {
c.list(main_folder, function(err, list) {
if (err) throw err;
async.eachSeries(
list,
function(element, callback) {
// create job profile for .dat
create_job_profile(element.name, element.name, element.name, false);
// prepare the image path
var station_no = element.name;
station_no = station_no.replace("STATION","");
var n = station_no.search(/_/i);
station_no = station_no.substr(0, n);
var image_path_top = element.name + "/PIC" + station_no + "/TOP" + station_no;
var filename_top = element.name + "-PIC" + station_no + "-TOP" + station_no;
create_job_profile(element.name, image_path_top, filename_top, true);
var image_path_bottom = element.name + "/PIC" + station_no + "/BOTTOM" + station_no;
var filename_bottom = element.name + "-PIC" + station_no + "-BOTTOM" + station_no;
create_job_profile(element.name, image_path_bottom, filename_bottom, true);
console.log(element.name + ", " + image_path_top + " vs " + image_path_bottom);
callback();
}
);
});
});
c.connect(config);
var trigger = {};
trigger["type"] = "cron";
trigger["cmd"] = "0 * * * *";
var param = {};
param["source"] = data_source,
param["url"] = host,
param["port"] = port;
param["user"] = user;
param["password"] = pwd;
param["init_observed_date"] = init_observed_date;
param["init_observed_time"] = init_observed_time;
function create_job_profile(station_id, path, filename, is_image) {
var job = {};
var job_id = "sds." + data_source + "-" + filename;
job["job_id"] = job_id;
job["active"] = true;
job["trigger"] = trigger;
var location = stationTable.get(station_id);
var data_in = {};
data_in["type"] = data_source;
var profile = {};
profile["station_id"] = station_id;
profile["latitude"] = location.latitude;
profile["longitude"] = location.longitude;
data_in["profile"] = profile;
param["path"] = main_folder + "/" + path;
data_in["param"] = param;
if (!is_image) {
var data_transform = {};
data_transform["type"] = data_source;
} else {
var data_transform = [];
var transfrom1 = {};
transfrom1["type"] = data_source;
var script = {};
script["script"] = "data=Array.isArray(src.data)?src.data.pop():src.dat";
var transfrom2 = {};
transfrom2["type"] = "transform";
transfrom2["param"] = script;
data_transform.push(transfrom1);
data_transform.push(transfrom2);
}
var data_out = {};
data_out["type"] = "storage";
// data_out["type"] = "dir";
var data_out_param = {};
data_out_param["storage_name"] = "sds.gistda-air";
// data_out_param["path"] = "/Users/Naiyana/testdata";
data_out["param"] = data_out_param;
job["data_in"] = data_in;
job["data_transform"] = data_transform;
job["data_out"] = data_out;
fs.writeFile(job_path + "/" + job_id + ".json", JSON.stringify(job), function(err) {
if(err) {
return console.log(err);
}
console.log("The file was saved!");
});
}
{
"name": "di-gistda-air-ibitz",
"version": "0.0.1",
"description": "",
"main": "index.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
},
"keywords": [
"bigstream"
],
"author": "lsr.bigstream",
"license": "ISC",
"dependencies": {
"async": "2.1.4",
"path": "0.12.7",
"fs": "0.0.1-security",
"dateformat":"2.0.0",
"ftp":"0.3.10",
"moment":"2.18.1",
"hashtable":"2.0.2"
}
}
var path = require('path');
var fs = require('fs');
var async = require('async');
var dateFormat = require('dateformat');
var Client = require('ftp');
function execute_function(context,response){
var job_id = context.jobconfig.job_id;
var transaction_id = context.transaction.id;
var profile = context.jobconfig.data_in.profile;
var param = context.jobconfig.data_in.param;
var memstore = context.task.memstore
var output_type = 'object/gistda-air'
var config = {
host: param.url,
port: param.port,
user: param.user,
password: param.password
};
let result = {
"object_type": param.source,
"data":[]
};
let maxdate;
var c = new Client();
var key = param.path + '-lasttransaction';
c.on('ready', function() {
c.list(param.path, function(err, list) {
if (err) throw err;
memstore.getItem(key,function(err,value) {
if (err) throw err;
var latestDate;
if (!value) {
var latestDateStr = param.init_observed_date + ' ' + param.init_observed_time; //'2016-12-20T10:00:00+04:00';
latestDate = new Date(latestDateStr);
} else {
latestDate = new Date(value);
}
async.eachSeries(
list,
function(element, callback) {
if (typeof element !== 'undefined') {
if (element.type !== 'd') { // filter out directories
var filename = element.name;
var filedate = element.date;
var filetype = element.type;
if ((path.extname(filename) === '.dat' &&
(filename.indexOf("Every_5m") > 0 || (filename.indexOf("MS700") > 0 && filename.indexOf("debug") == -1)))
|| path.extname(filename) === '.jpg') {
var type = 'text';
if (path.extname(filename) === '.jpg')
type = 'image';
if (filedate - latestDate > 0) { // filter out old files
c.get(param.path+"/"+filename, function (err, stream) {
if (err) throw err;
// for text only, not for binary data
// var data = '';
// stream.setEncoding('utf8');
// console.log("downloading .... : " + filename + ", " + dateFormat(filedate, "isoDateTime"));
// stream.on('data', function(chunk) { // donwload each individual chunk as per a downloading file
// if (chunk != '')
// data = data + chunk;
// });
var buf_data = Buffer.from('');
console.log("downloading .... : " + filename + ", " + dateFormat(filedate, "isoDateTime"));
var nb;
stream.on('data', function(chunk) { // donwload each individual chunk as per a downloading file
if (chunk != '') {
var buf_chunk = Buffer.from(chunk);
buf_data = Buffer.concat([buf_data, buf_chunk]);
}
});
stream.on('end', function () { // insert a data file
result.data.push({
"filename": filename,
"station_id": profile.station_id,
"latitude": profile.latitude,
"longitude": profile.longitude,
"type": type,
"observeddatetime": dateFormat(filedate, 'yyyy-mm-dd HH:MM:ss'),
"value" : buf_data // data if text download only
});
console.log(buf_data);
if (typeof maxdate == 'undefined') {
maxdate = filedate;
} else {
if (filedate - maxdate > 0) {
maxdate = filedate;
}
}
memstore.setItem(key,dateFormat(maxdate, 'yyyy-mm-dd HH:MM:ss'),function(err){
if (err) throw err;
callback();
});
});
// stream.pipe(fs.createWriteStream(filename));
});
} else {
async.setImmediate(callback);
//callback(null);
}
} else {
async.setImmediate(callback);
//callback(null);
}
} else
async.setImmediate(callback);
//callback(null);
} else
async.setImmediate(callback);
//callback(null);
},
function(err) {
if (err) {
response.error(err);
} else {
if (result.data.length == 0)
response.reject(); // for no data
else
response.success(result, output_type);
c.end();
}
}
); // async close
}); // memstore close
});
});
c.connect(config);
//response.reject();
}
module.exports = execute_function;
var util = require('util');
var DIPlugin = require('../di-plugin');
function DITask(context){
DIPlugin.call(this,context);
this.name = "tanpibut";
this.output_type = "text";
}
util.inherits(DITask,DIPlugin);
DITask.prototype.perform = require('./perform');
module.exports = DITask;
{
"name": "di-tanpibut",
"version": "0.0.1",
"lockfileVersion": 1
}
var request = require('request');
var moment = require('moment');
var cts = moment();
let result = null;
let loopCount = 0;
var param = null;
var continue_state = null;
function execute_function(context,response){
var job_id = context.jobconfig.job_id;
var transaction_id = context.transaction_id;
param = context.jobconfig.data_in.param;
// var memstore = context.task.memstore;
var memstore = context.job.memstore;
var output_type = "object/tanpibut";
// var data = 'hello world';
cts = moment().hours(0).minutes(0).seconds(0);
//console.log("moment cts: " + cts.format("YYYY-MM-DD,HH:mm:ss"));
result = {
"object_type": 'tanpibut',
"station_id": param.station_id,
"data_type": param.data_type.type,
"data":[]
};
let dtype = param.data_type.type;
let node_id = param.data_type.node_id;
memstore.getItem(`${param.station_id}-${dtype}`, function(err, lts){ //latest timestamp, format: yyyy-MM-dd HH:mm:ss
if (!lts) lts = moment(`${param.init_observed_date} ${param.init_observed_time}`);
else lts = moment(lts).add(1, 'seconds');
if(!param.recover){
let diffdate = cts.diff(lts, 'days') + 1;
// console.log('diff = ' + diffdate);
if(diffdate > param.limit){
lts = moment(cts).add(-(param.limit), 'days');
// console.log(lts.format("YYYY-MM-DD,HH:mm:ss"))
}
}
let url = param.url + `?appkey=${param.appkey}&p=${param.station_id},${node_id},${dtype}`;
getData(url, lts, dtype, (err) => {
if(err) {
console.log(err);
response.error(err);
}
else{
// console.log("result: " + result);
if(param.recover && continue_state){
response.success(result, {"output_type": output_type, "continue": true});
}
else {
response.success(result, {"output_type": output_type});
}
}
});
});
}
function getData(url, lts, dtype, callback) {
loopCount = 0;
continue_state = true;
let req = url + `,${lts.format("YYYY-MM-DD,HH:mm:ss")}`;
console.log(req);
requestData(req).then((data) => {
loopCount++;
if(data.search('result=""') > -1){
result.data.push({"value": data});
}
if(param.recover){
if(loopCount >= param.limit){
result['timestamp'] = lts.hours(23).minutes(55).seconds(0).format("YYYY-MM-DD HH:mm:ss");
callback();
}
else beforeDateCheck(cts, lts);
}
else beforeDateCheck(cts, lts);
function beforeDateCheck(ct, lt){
if (lt.isBefore(ct, 'days')) {
lt.add(1, 'days').hours(0).minutes(0).seconds(0);
req = url + `,${lt.format("YYYY-MM-DD,HH:mm:ss")}`;
console.log(req);
requestData(req).then((val) => {
loopCount++;
if(val.search('result=""') > -1){
result.data.push({"value": val});
}
if(param.recover){
if(loopCount >= param.limit){
result['timestamp'] = lts.hours(23).minutes(55).seconds(0).format("YYYY-MM-DD HH:mm:ss");
callback();
}
else beforeDateCheck(ct, lt);
}
else beforeDateCheck(ct, lt);
}).catch((err) => {
callback(err);
});
}
else {
continue_state = false;
result['timestamp'] = lts.format("YYYY-MM-DD HH:mm:ss");
callback();
}
}
}).catch((err) => {
callback(err);
});
}
function requestData(url) {
return new Promise((resolve, reject) => {
request(url, function (error, resp, body) {
if (!error && resp.statusCode == 200) {
resolve(body);
}else{
return reject(error);
}
})
})
}
module.exports = execute_function;
var util = require('util');
var DOPlugin = require('../do-plugin');
function DOTask(context,request){
DOPlugin.call(this,context,request);
this.name = "http-callback";
}
util.inherits(DOTask,DOPlugin);
DOTask.prototype.perform = require('./perform');
module.exports = DOTask;
var ctx = require('../../../context');
var Utils = ctx.getLib('lib/util/plugin-utils');
function perform_function(context,request,response){
var job_id = context.jobconfig.job_id;
var job_vo = context.jobconfig._vo || '';
var transaction_id = context.transaction.id;
var param = context.jobconfig.data_out.param;
var memstore = context.task.memstore;
var msgsender = context.task.msgsender;
var in_type = request.type;
var data = request.data;
var meta = request.meta;
var prm_session = param.session || meta._sid;
var ev = {
'type' : in_type,
'meta' : meta,
'data' : data
}
prm_session = Utils.vm_execute_text(ev,prm_session)
var topic = "msg.httpcb." + prm_session
var msg = {
'err':null,
'meta':meta,
'data':data
}
msgsender.send(topic,msg)
response.success();
//response.reject();
//response.error("error message")
}
module.exports = perform_function;
......@@ -47,13 +47,20 @@ function send_request(prm,cb)
{ 'cache-control': 'no-cache' }
};
if(prm.method.toLowerCase()=='post' || prm.method.toLowerCase()=='put')
if(['post','put','delete','patch'].includes(prm.method.toLowerCase()))
{
options.method = prm.method.toUpperCase();
if(prm.body_type=='json' && typeof prm.body == 'object'){
options.headers['content-type'] = 'application/json';
options.json = prm.body;
}else if(prm.body_type=='form-data' && typeof prm.body == 'object'){
options.formData={}
Object.keys(prm.body).forEach((el)=>{
if(typeof prm.body[el]=='string'){
options.formData[el]=prm.body[el]
}
})
}else if(prm.body_type=='text' || typeof prm.body == 'string'){
options.headers['content-type'] = 'text/plain';
options.body = prm.body;
......
var util = require('util');
var util = require('util');
var DTPlugin = require('../dt-plugin');
function DTTask(context,request){
DTPlugin.call(this,context,request);
this.name = "agritronics-tpb";
this.output_type = "";
}
util.inherits(DTTask,DTPlugin);
DTTask.prototype.perform = require('./perform');
module.exports = DTTask;
var async = require('async');
function perform_function(context,request,response){
var job_id = context.jobconfig.job_id;
var transaction_id = context.transaction.id;
var param = context.jobconfig.data_transform.param;
var memstore = context.task.memstore
var output_type = "object/tpb"
var data = request.data;
let result = {
"object_type": "tpb",
"station_id": data.station_id,
"latitude": data.latitude,
"longiude": data.logitude,
"altitude": data.altitude,
};
var data_type = 'Rain';
var dataset = data.data;
var counter = 0;
async.each(
dataset,
function(adata, callback) {
if (adata.type == data_type) {
result.type = adata.type;
result.unit = adata.unit;
result.value_type = adata.value_type;
result.values = adata.values;
console.log(result);
counter = 1;
}
callback();
},
function(err) {
if (err) {
response.error(err);
} else if (counter == 0) {
response.reject() ;
}
}
);
response.success(result,output_type);
//response.reject();
//response.error("error message")
}
module.exports = perform_function;
var util = require('util');
var DTPlugin = require('../dt-plugin');
function DTTask(context,request){
DTPlugin.call(this,context,request);
this.name = "agritronics";
this.output_type = "";
}
util.inherits(DTTask,DTPlugin);
DTTask.prototype.perform = require('./perform');
module.exports = DTTask;
{
"name": "dt-agritronics-ibitz",
"version": "0.0.1",
"description": "",
"main": "index.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
},
"keywords": [
"bigstream"
],
"author": "lsr.bigstream",
"license": "ISC",
"dependencies": {
"async": "^2.1.2",
"xml2json": "^0.10.0"
}
}
var simpleParser = require('./simple_parser');
var windDirParser = require('./wind_direction_parser');
var imageParser = require('./image_parser');
exports.getParser = function(type){
if(type === "Camera") return imageParser;
else if(type === "Wind Direction Degree") return windDirParser;
else return simpleParser;
}
\ No newline at end of file
var request = require('request').defaults({ encoding: null });;
var async = require('async');
var parser = require('xml2json');
exports.getValues = function(dataSet, cb){
let dataTemplate = {
"type": "",
"unit": "",
"value_type": "",
"image_type": "",
"values": []
};
let values = [];
//console.log(dataSet.length);
let json = parser.toJson(dataSet, {object: true});
dataTemplate.type = json.xhr.IO.Type;
dataTemplate.unit = json.xhr.IO.Unit;
dataTemplate.value_type = json.xhr.IO.ValueType;
dataTemplate.image_type = json.xhr.IO.Name;
if(parseInt(json.xhr.IO.Record) > 0){
if(parseInt(json.xhr.IO.Record) === 1){
getImage(json.xhr.IO.Data.Value).then((data) =>{
dataTemplate.values.push({"observeddatetime": json.xhr.IO.Data.IODateTime, "value": data});
cb(dataTemplate);
}).catch((err) => {
cb(err);
})
}
else{
var idx = 0;
async.whilst(function() { return idx < json.xhr.IO.Data.length;}, function(callback) {
let d = json.xhr.IO.Data[idx];
idx++;
//console.log(d.Value);
getImage(d.Value).then((data) =>{
values.push({"observeddatetime": d.IODateTime, "value": data});
//var bitmap = new Buffer(data, 'base64');
//fs.writeFileSync("./result.jpg", bitmap);
callback();
}).catch((err) => {
callback(err);
});
}, function(err) {
if( err ) {
console.log(err);
cb(err);
}
if(values.length > 0){
dataTemplate.values = values;
cb(dataTemplate);
}
else cb(null);
});
}
}
else cb(null);
}
function getImage(url) {
return new Promise((resolve, reject) => {
request(url, function (error, resp, body) {
if (!error && resp.statusCode == 200) {
resolve("data:" + resp.headers["content-type"] + ";base64," + Buffer.from(body).toString('base64'));
}else{
return reject(error);
}
})
})
}
\ No newline at end of file
var parser = require('xml2json');
var idx = 0;
exports.getValues = function(dataSet, callback){
let dataTemplate = {
"type": "",
"unit": "",
"value_type": "",
"values": []
};
let values = [];
let json = parser.toJson(dataSet, {object: true});
dataTemplate.type = json.xhr.IO.Type;
dataTemplate.unit = json.xhr.IO.Unit;
dataTemplate.value_type = json.xhr.IO.ValueType;
if(parseInt(json.xhr.IO.Record) > 0){
if(parseInt(json.xhr.IO.Record) === 1){
values.push({"observeddatetime": json.xhr.IO.Data.IODateTime, "value": json.xhr.IO.Data.Value});
}
else{
for (var j = 0; j < json.xhr.IO.Data.length; j++) {
let d = json.xhr.IO.Data[j];
values.push({"observeddatetime": d.IODateTime, "value": d.Value});
}
}
}
if(values.length > 0){
dataTemplate.values = values;
callback(dataTemplate);
}
else callback(null);
}
// exports.getValues = function(json, callback){
// let dataTemplate = {
// "type": json.xhr.IO.Type,
// "unit": json.xhr.IO.Unit,
// "value_type": json.xhr.IO.ValueType,
// "values": []
// };
// let values = [];
// if(typeof json.xhr.IO.Data.length === "undefined"){
// values.push({"observeddatetime": json.xhr.IO.Data.IODateTime, "value": json.xhr.IO.Data.Value});
// }
// else{
// for (var i = 0; i < json.xhr.IO.Data.length; i++) {
// let d = json.xhr.IO.Data[i];
// values.push({"observeddatetime": d.IODateTime, "value": d.Value});
// }
// }
// //console.log("values.langth >>>" + values.length);
// if(values.length > 0){
// dataTemplate.values = values;
// callback(dataTemplate);
// }
// else callback(null);
// }
\ No newline at end of file
var parser = require('xml2json');
var idx = 0;
exports.getValues = function(dataSet, callback){
let dataTemplate = {
"type": "",
"unit": "",
"value_type": "",
"values": []
};
let values = [];
let json = parser.toJson(dataSet, {object: true});
dataTemplate.type = json.xhr.IO.Type;
dataTemplate.unit = json.xhr.IO.Unit;
dataTemplate.value_type = json.xhr.IO.ValueType;
if(parseInt(json.xhr.IO.Record) > 0){
if(parseInt(json.xhr.IO.Record) === 1){
values.push({"observeddatetime": json.xhr.IO.Data.IODateTime, "value": json.xhr.IO.Data.Value, "direction": json.xhr.IO.Data.Direction});
}
else{
for (var j = 0; j < json.xhr.IO.Data.length; j++) {
let d = json.xhr.IO.Data[j];
values.push({"observeddatetime": d.IODateTime, "value": d.Value, "direction": d.Direction});
}
}
}
if(values.length > 0){
dataTemplate.values = values;
callback(dataTemplate);
}
else callback(null);
// async.whilst(() => {return idx < dataSet.length;}, (cb) =>{
// let json = parser.toJson(dataSet[idx], {object: true});
// dataTemplate.type = json.xhr.IO.Type;
// dataTemplate.unit = json.xhr.IO.Unit;
// dataTemplate.value_type = json.xhr.IO.ValueType;
// idx++;
// if(typeof json.xhr.IO.Data.length === "undefined"){
// values.push({"observeddatetime": json.xhr.IO.Data.IODateTime, "value": json.xhr.IO.Data.Value, "direction": json.xhr.IO.Data.Direction});
// process.nextTick(function(){
// cb();
// })
// }
// else{
// for (var i = 0; i < json.xhr.IO.Data.length; i++) {
// let d = json.xhr.IO.Data[i];
// values.push({"observeddatetime": d.IODateTime, "value": d.Value, "direction": d.Direction});
// }
// process.nextTick(function(){
// cb();
// })
// }
// }, function(err) {
// if( err ) {
// console.log(err);
// } else {
// //console.log("values.langth >>>" + values.length);
// if(values.length > 0){
// dataTemplate.values = values;
// callback(dataTemplate);
// }
// else callback(null);
// }
// });
}
// exports.getValues = function(json, callback){
// let dataTemplate = {
// "type": json.xhr.IO.Type,
// "unit": json.xhr.IO.Unit,
// "value_type": json.xhr.IO.ValueType,
// "values": []
// };
// let values = [];
// if(typeof json.xhr.IO.Data.length === "undefined"){
// values.push({"observeddatetime": json.xhr.IO.Data.IODateTime, "value": json.xhr.IO.Data.Value});
// }
// else{
// for (var i = 0; i < json.xhr.IO.Data.length; i++) {
// let d = json.xhr.IO.Data[i];
// values.push({"observeddatetime": d.IODateTime, "value": d.Value, "direction": d.Direction});
// }
// }
// //console.log("values.langth >>>" + values.length);
// if(values.length > 0){
// dataTemplate.values = values;
// callback(dataTemplate);
// }
// else callback(null);
// }
\ No newline at end of file
var async = require('async');
var parser = require('xml2json');
//var fs = require('fs');
var agriParser = require('./parser/agri_parser_factory');
function perform_function(context,request,response){
var job_id = context.jobconfig.job_id;
var transaction_id = context.transaction.id;
var param = context.jobconfig.data_transform.param;
var memstore = context.job.memstore
// var memstore = context.task.memstore
var output_type = "object/sds";
var di_data = request.data;
let out = [];
var dataKeySeries = Object.keys(di_data.data);
// for (var k = 0; k < dataKeySeries.length; k++) {
// console.log(di_data.data[dataKeySeries[k]]);
// }
let i = 0;
async.whilst(function() { return i < dataKeySeries.length;}, function(cb) {
let result = {
"object_type": 'iBitz',
"station_id": di_data.station_id,
"latitude": "",
"longitude": "",
"altitude": "",
"data":[]
};
let vals = di_data.data[dataKeySeries[i]];
i++;
let idx = 0;
async.whilst(function() { return idx < vals.length;}, function(callback) {
let dtype = vals[idx].data_types;
//console.log('[DT] di_data length = ' + vals[idx].value.length);
let json = parser.toJson(vals[idx].value, {object: true});
//console.log('Type = ' + json.xhr.IO.Type);
agriParser.getParser(json.xhr.IO.Type).getValues(vals[idx].value, function(values) {
idx++;
if(values !== null){
result.latitude = json.xhr.IO.Latitude;
result.longitude = json.xhr.IO.Longitude;
result.data.push(values);
//console.log(`STAMP : ${di_data.station_id}-${dtype} = `+ values.values[values.values.length-1].observeddatetime);
memstore.setItem(`${di_data.station_id}-${dtype}`, values.values[values.values.length-1].observeddatetime, (err) =>{
if(err){
throw err;
callback(err);
}
else callback();
});
}
else callback();
});
}, function(err) {
if( err ) {
console.log(err);
cb(err);
//response.error(err);
} else {
if(result.data.length > 0){
out.push(result);
}
cb();
}
});
}, function(err) {
if( err ) {
console.log(err);
//response.error(err);
} else {
//fs.writeFileSync("./result.json", JSON.stringify(out));
//console.log(JSON.stringify(out));
if(out.length > 0)
response.success(out, output_type);
else response.reject();
}
});
}
module.exports = perform_function;
var util = require('util');
var DTPlugin = require('../dt-plugin');
function DTTask(context,request){
DTPlugin.call(this,context,request);
this.name = "noop";
this.output_type = "";
}
util.inherits(DTTask,DTPlugin);
DTTask.prototype.perform = require('./perform');
module.exports = DTTask;
var async = require('async');
function perform_function(context,request,response){
var job_id = context.jobconfig.job_id;
var transaction_id = context.transaction.id;
var param = context.jobconfig.data_transform.param;
var memstore = context.task.memstore
var output_type = "object/sds"
var data = request.data;
let result = [];
var nfiles = data.data.length;
var i = 0;
async.whilst(
function() { return i < nfiles; },
function(callback) {
var filename = data.data[i].filename;
var station_id = data.data[i].station_id;
var latitude = data.data[i].latitude;
var longitude = data.data[i].longitude;
var data_type = data.data[i].type;
var filecontent = data.data[i].value;
var observeddatetime = data.data[i].observeddatetime;
i++;
let _result = {
"object_type":"sds",
"station_id" : station_id, // need to change to exact station
"latitude":latitude,
"longitude":longitude,
"data":[]
};
if (data_type == 'text') {
_result = perform_text(_result, filecontent);
result.push(_result);
callback();
} else if (data_type == 'image') {
getImage(filecontent).then((base64) => {
var values = [];
var avalue = {};
avalue["observeddatetime"] = observeddatetime;
avalue["value"] = base64;
values.push(avalue);
_result.data.push({"type":"image", "values":values});
result.push(_result);
callback();
}).catch((err) => {
throw err
});
}
},
function (err, n) {
response.success(result,output_type);
}
);
// while (i < nfiles) {
// var filename = data.data[i].filename;
// var data_type = data.data[i].type;
// var filecontent = data.data[i].value;
// let _result = {
// "object_type":"sds",
// "station_id" : filename, // need to change to exact station
// "latitude":"",
// "longitude":"",
// "altitude":"",
// "data":[]
// };
// if (data_type == 'text')
// _result = perform_text(_result, filecontent);
// else if (data_type == 'image') {
// // _result = perform_image(_result, filecontent);
// getImage(filecontent).then((base64) => {
// _result.data.push({"values":base64});
// console.log("after get image");
// }).catch((err) => {
// throw err
// });
// }
// console.log("will result");
// result.push(_result);
// i++;
// }
// response.success(result,output_type);
// //response.reject();
// //response.error("error message")
}
function perform_text(_result, filecontent) {
var arr = filecontent.toString().split("\r\n");
var arr_type = arr[1].split(",");
var arr_unit = arr[2].split(",");
var arr_value_type = arr[3].split(",");
var ndata = arr_type.length;
var col = 1;
while (col < ndata) {
var row = 4;
let values = [];
while (row < arr.length-1) {
var rdata = arr[row].split(",");
values.push({
"observeddatetime":rdata[0].replace('"','').replace('"',''),
"value":rdata[col]
});
row++;
}
_result.data.push({
"type": arr_type[col].replace('"','').replace('"',''),
"unit": arr_unit[col].replace('"','').replace('"',''),
"value_type" : arr_value_type[col].replace('"','').replace('"',''),
"values":values
});
col++;
}
return _result;
}
function perform_image(_result, filecontent) {
getImage(filecontent).then((base64) => {
_result.data.push({"values":base64});
}).catch((err) => {
throw err
});
return _result;
}
function getImage(filecontent) {
return new Promise((resolve, reject) => {
resolve("data:image/jpeg;base64," + Buffer.from(filecontent).toString('base64'));
});
}
module.exports = perform_function;
var util = require('util');
var util = require('util');
var DTPlugin = require('../dt-plugin');
function DTTask(context,request){
DTPlugin.call(this,context,request);
this.name = "gistda-water";
this.output_type = "";
}
util.inherits(DTTask,DTPlugin);
DTTask.prototype.perform = require('./perform');
module.exports = DTTask;
function perform_function(context,request,response){
var job_id = context.jobconfig.job_id;
var transaction_id = context.transaction.id;
var param = context.jobconfig.data_transform.param;
var memstore = context.task.memstore
var output_type = "object/sds"
var data = request.data;
let result = {
"object_type": "sds",
"station_id": data.DEVID,
"latitude": data.LATI,
"longiude": data.LOGI,
"altitude": data.Z,
"data":[]
};
let VBATT_values = [];
VBATT_values.push({
"observeddatetime": data.TIME,
"value": data.VBATT
})
result.data.push({
"type": "VBATT",
"unit": "",
"value_type" : "",
"values": VBATT_values
});
let LEVEL_values = [];
LEVEL_values.push({
"observeddatetime": data.TIME,
"value": data.LEVEL
})
result.data.push({
"type": "LEVEL",
"unit": "",
"value_type" : "",
"values": LEVEL_values
});
response.success(result,output_type);
//response.reject();
//response.error("error message")
}
module.exports = perform_function;
......@@ -84,13 +84,20 @@ function send_request(prm,cb)
{ 'cache-control': 'no-cache' }
};
if(prm.method.toLowerCase()=='post' || prm.method.toLowerCase()=='put')
if(['post','put','delete','patch'].includes(prm.method.toLowerCase()))
{
options.method = prm.method.toUpperCase();
if(prm.body_type=='json' && typeof prm.body == 'object'){
options.headers['content-type'] = 'application/json';
options.json = prm.body;
}else if(prm.body_type=='form-data' && typeof prm.body == 'object'){
options.formData={}
Object.keys(prm.body).forEach((el)=>{
if(typeof prm.body[el]=='string'){
options.formData[el]=prm.body[el]
}
})
}else if(prm.body_type=='text' || typeof prm.body == 'string'){
options.headers['content-type'] = 'text/plain';
options.body = prm.body;
......
......@@ -3,7 +3,7 @@ var DTPlugin = require('../dt-plugin');
function DTTask(context,request){
DTPlugin.call(this,context,request);
this.name = "tanpibut";
this.name = "pgsql";
this.output_type = "";
}
util.inherits(DTTask,DTPlugin);
......
{
"name": "di-tanpibut",
"version": "0.0.1",
"name": "dt-pgsql",
"version": "1.0.0",
"description": "",
"main": "index.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
},
"keywords": [
"bigstream"
],
"author": "lsr.bigstream",
"author": "",
"license": "ISC",
"dependencies": {}
"dependencies": {
"pg": "^8.5.1"
}
}
var pg = require('pg');
var ctx = require('../../../context');
var Utils = ctx.getLib('lib/util/plugin-utils');
function perform_function(context,request,response){
var job_id = context.jobconfig.job_id;
var transaction_id = context.transaction.id;
var param = context.task.config.param || {};
var memstore = context.task.memstore
var output_type = request.input_type;
var data = request.data;
var meta = request.meta || {};
var req_host = param.host || "localhost";
var req_port = param.port || "3306"
var req_user = param.user || "";
var req_pass = param.password || "";
var req_db = param.database || "";
var req_sql = param.sql || "";
var env = {
'type' : output_type,
'data' : data,
'meta' : meta
}
if(typeof data == 'string' && req_sql == ""){
req_sql = "${data}"
}
req_sql = Utils.vm_execute_text(env,req_sql);
//parsing param from meta
if(typeof meta._param == 'object')
{
var _prm = meta._param;
req_host = (_prm.host)?_prm.host:req_host;
req_port = (_prm.port)?_prm.host:req_port;
req_user = (_prm.user)?_prm.user:req_user;
req_pass = (_prm.password)?_prm.password:req_pass;
req_db = (_prm.database)?_prm.database:req_db;
req_sql = (_prm.sql)?_prm.sql:req_sql;
}
var conf = {
"host" : req_host,
"port" : Number(req_port),
"user" : req_user,
"password" : req_pass,
"database" : req_db
}
response.meta = meta;
pgexcute(conf,req_sql,function(err,result){
if(!err){
response.success(result,output_type);
}else{
response.error("pgsql error");
}
});
}
function pgexcute(conf,sql,callback){
var client = new pg.Client(conf);
client.connect(function(err) {
if(err) {
callback(err);
return console.error('could not connect to postgres', err);
}
client.query(sql, function(err, result) {
if(err) {
callback(err);
client.end();
return console.error('error running query', err);
}
callback(null,result);
client.end();
});
});
}
module.exports = perform_function;
{
"name": "dt-tanpibut",
"version": "0.0.1",
"lockfileVersion": 1,
"requires": true,
"dependencies": {
"async": {
"version": "2.5.0",
"resolved": "https://registry.npmjs.org/async/-/async-2.5.0.tgz",
"integrity": "sha512-e+lJAJeNWuPCNyxZKOBdaJGyLGHugXVQtrAwtuAe2vhxTYxFTKE73p8JuTmdH0qdQZtDvI4dhJwjZc5zsfIsYw==",
"requires": {
"lodash": "4.17.4"
}
},
"bindings": {
"version": "1.3.0",
"resolved": "https://registry.npmjs.org/bindings/-/bindings-1.3.0.tgz",
"integrity": "sha512-DpLh5EzMR2kzvX1KIlVC0VkC3iZtHKTgdtZ0a3pglBZdaQFjt5S9g9xd1lE+YvXyfd6mtCeRnrUfOLYiTMlNSw=="
},
"hoek": {
"version": "4.2.0",
"resolved": "https://registry.npmjs.org/hoek/-/hoek-4.2.0.tgz",
"integrity": "sha512-v0XCLxICi9nPfYrS9RL8HbYnXi9obYAeLbSP00BmnZwCK9+Ih9WOjoZ8YoHCoav2csqn4FOz4Orldsy2dmDwmQ=="
},
"isemail": {
"version": "2.2.1",
"resolved": "https://registry.npmjs.org/isemail/-/isemail-2.2.1.tgz",
"integrity": "sha1-A1PT2aYpUQgMJiwqoKQrjqjp4qY="
},
"items": {
"version": "2.1.1",
"resolved": "https://registry.npmjs.org/items/-/items-2.1.1.tgz",
"integrity": "sha1-i9FtnIOxlSneWuoyGsqtp4NkoZg="
},
"joi": {
"version": "9.2.0",
"resolved": "https://registry.npmjs.org/joi/-/joi-9.2.0.tgz",
"integrity": "sha1-M4WseQGSEwy+Iw6ALsAskhW7/to=",
"requires": {
"hoek": "4.2.0",
"isemail": "2.2.1",
"items": "2.1.1",
"moment": "2.18.1",
"topo": "2.0.2"
}
},
"lodash": {
"version": "4.17.4",
"resolved": "https://registry.npmjs.org/lodash/-/lodash-4.17.4.tgz",
"integrity": "sha1-eCA6TRwyiuHYbcpkYONptX9AVa4="
},
"moment": {
"version": "2.18.1",
"resolved": "https://registry.npmjs.org/moment/-/moment-2.18.1.tgz",
"integrity": "sha1-w2GT3Tzhwu7SrbfIAtu8d6gbHA8="
},
"nan": {
"version": "2.6.2",
"resolved": "https://registry.npmjs.org/nan/-/nan-2.6.2.tgz",
"integrity": "sha1-5P805slf37WuzAjeZZb0NgWn20U="
},
"node-expat": {
"version": "2.3.16",
"resolved": "https://registry.npmjs.org/node-expat/-/node-expat-2.3.16.tgz",
"integrity": "sha512-e3HyQI0lk5CXyYQ4RsDYGiWdY5LJxNMlNCzo4/gwqY8lhYIeTf5VwGirGDa1EPrcZROmOR37wHuFVnoHmOWnOw==",
"requires": {
"bindings": "1.3.0",
"nan": "2.6.2"
}
},
"topo": {
"version": "2.0.2",
"resolved": "https://registry.npmjs.org/topo/-/topo-2.0.2.tgz",
"integrity": "sha1-zVYVdSU5BXwNwEkaYhw7xvvh0YI=",
"requires": {
"hoek": "4.2.0"
}
},
"xml2json": {
"version": "0.11.0",
"resolved": "https://registry.npmjs.org/xml2json/-/xml2json-0.11.0.tgz",
"integrity": "sha1-HVTx2GjbvQSJK4RdfLrZTFKOFuQ=",
"requires": {
"hoek": "4.2.0",
"joi": "9.2.0",
"node-expat": "2.3.16"
}
}
}
}
{
"name": "dt-tanpibut",
"version": "0.0.1",
"description": "",
"main": "index.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
},
"keywords": [
"bigstream"
],
"author": "lsr.bigstream",
"license": "ISC",
"dependencies": {
"async": "^2.1.2",
"xml2json": "^0.11.0"
}
}
var simpleParser = require('./simple_parser');
var windDirParser = require('./wind_direction_parser');
var imageParser = require('./image_parser');
exports.getParser = function(type){
if(type === "Camera") return imageParser;
else if(type === "Wind Direction Degree") return windDirParser;
else return simpleParser;
}
\ No newline at end of file
var request = require('request').defaults({ encoding: null });;
var async = require('async');
var parser = require('xml2json');
exports.getValues = function(dataSet, cb){
let dataTemplate = {
"type": "",
"unit": "",
"value_type": "",
"image_type": "",
"values": []
};
let values = [];
//console.log(dataSet.length);
let json = parser.toJson(dataSet, {object: true}).xhr.IO;
dataTemplate.type = json.Type;
dataTemplate.unit = json.Unit;
dataTemplate.value_type = json.ValueType;
dataTemplate.image_type = json.Name;
if(parseInt(json.Record) > 0){
if(parseInt(json.Record) === 1){
getImage(json.Data.Value).then((data) =>{
dataTemplate.values.push({"observeddatetime": json.Data.IODateTime, "value": data});
cb(dataTemplate);
}).catch((err) => {
cb(err);
})
}
else{
var idx = 0;
async.whilst(function() { return idx < json.Data.length;}, function(callback) {
let d = json.Data[idx];
idx++;
//console.log(d.Value);
getImage(d.Value).then((data) =>{
values.push({"observeddatetime": d.IODateTime, "value": data});
//var bitmap = new Buffer(data, 'base64');
//fs.writeFileSync("./result.jpg", bitmap);
callback();
}).catch((err) => {
callback(err);
});
}, function(err) {
if( err ) {
console.log(err);
cb(err);
}
if(values.length > 0){
dataTemplate.values = values;
cb(dataTemplate);
}
else cb(null);
});
}
}
else cb(null);
}
function getImage(url) {
return new Promise((resolve, reject) => {
request(url, function (error, resp, body) {
if (!error && resp.statusCode == 200) {
resolve("data:" + resp.headers["content-type"] + ";base64," + Buffer.from(body).toString('base64'));
}else{
return reject(error);
}
})
})
}
\ No newline at end of file
var parser = require('xml2json');
var idx = 0;
exports.getValues = function(dataSet, callback){
let dataTemplate = {
"type": "",
"unit": "",
"value_type": "",
"latitude": "",
"longitude": "",
"values": []
};
let values = [];
let json = parser.toJson(dataSet, {object: true}).xhr.IO;
dataTemplate.type = json.Type;
dataTemplate.unit = json.Unit;
dataTemplate.value_type = json.ValueType;
dataTemplate.latitude = json.Latitude;
dataTemplate.longitude = json.Longitude;
if(parseInt(json.Record) > 0){
if(parseInt(json.Record) === 1){
values.push({"observeddatetime": json.Data.IODateTime, "value": json.Data.Value});
}
else{
for (var j = 0; j < json.Data.length; j++) {
let d = json.Data[j];
values.push({"observeddatetime": d.IODateTime, "value": d.Value});
}
}
}
if(values.length > 0){
dataTemplate.values = values;
callback(dataTemplate);
}
else callback(null);
}
// exports.getValues = function(json, callback){
// let dataTemplate = {
// "type": json.xhr.IO.Type,
// "unit": json.xhr.IO.Unit,
// "value_type": json.xhr.IO.ValueType,
// "values": []
// };
// let values = [];
// if(typeof json.xhr.IO.Data.length === "undefined"){
// values.push({"observeddatetime": json.xhr.IO.Data.IODateTime, "value": json.xhr.IO.Data.Value});
// }
// else{
// for (var i = 0; i < json.xhr.IO.Data.length; i++) {
// let d = json.xhr.IO.Data[i];
// values.push({"observeddatetime": d.IODateTime, "value": d.Value});
// }
// }
// //console.log("values.langth >>>" + values.length);
// if(values.length > 0){
// dataTemplate.values = values;
// callback(dataTemplate);
// }
// else callback(null);
// }
\ No newline at end of file
var parser = require('xml2json');
var idx = 0;
exports.getValues = function(dataSet, callback){
let dataTemplate = {
"type": "",
"unit": "",
"value_type": "",
"values": []
};
let values = [];
let json = parser.toJson(dataSet, {object: true}).xhr.IO;
dataTemplate.type = json.Type;
dataTemplate.unit = json.Unit;
dataTemplate.value_type = json.ValueType;
if(parseInt(json.Record) > 0){
if(parseInt(json.Record) === 1){
values.push({"observeddatetime": json.Data.IODateTime, "value": json.Data.Value, "direction": json.Data.Direction});
}
else{
for (var j = 0; j < json.Data.length; j++) {
let d = json.Data[j];
values.push({"observeddatetime": d.IODateTime, "value": d.Value, "direction": d.Direction});
}
}
}
if(values.length > 0){
dataTemplate.values = values;
callback(dataTemplate);
}
else callback(null);
// async.whilst(() => {return idx < dataSet.length;}, (cb) =>{
// let json = parser.toJson(dataSet[idx], {object: true});
// dataTemplate.type = json.xhr.IO.Type;
// dataTemplate.unit = json.xhr.IO.Unit;
// dataTemplate.value_type = json.xhr.IO.ValueType;
// idx++;
// if(typeof json.xhr.IO.Data.length === "undefined"){
// values.push({"observeddatetime": json.xhr.IO.Data.IODateTime, "value": json.xhr.IO.Data.Value, "direction": json.xhr.IO.Data.Direction});
// process.nextTick(function(){
// cb();
// })
// }
// else{
// for (var i = 0; i < json.xhr.IO.Data.length; i++) {
// let d = json.xhr.IO.Data[i];
// values.push({"observeddatetime": d.IODateTime, "value": d.Value, "direction": d.Direction});
// }
// process.nextTick(function(){
// cb();
// })
// }
// }, function(err) {
// if( err ) {
// console.log(err);
// } else {
// //console.log("values.langth >>>" + values.length);
// if(values.length > 0){
// dataTemplate.values = values;
// callback(dataTemplate);
// }
// else callback(null);
// }
// });
}
// exports.getValues = function(json, callback){
// let dataTemplate = {
// "type": json.xhr.IO.Type,
// "unit": json.xhr.IO.Unit,
// "value_type": json.xhr.IO.ValueType,
// "values": []
// };
// let values = [];
// if(typeof json.xhr.IO.Data.length === "undefined"){
// values.push({"observeddatetime": json.xhr.IO.Data.IODateTime, "value": json.xhr.IO.Data.Value});
// }
// else{
// for (var i = 0; i < json.xhr.IO.Data.length; i++) {
// let d = json.xhr.IO.Data[i];
// values.push({"observeddatetime": d.IODateTime, "value": d.Value, "direction": d.Direction});
// }
// }
// //console.log("values.langth >>>" + values.length);
// if(values.length > 0){
// dataTemplate.values = values;
// callback(dataTemplate);
// }
// else callback(null);
// }
\ No newline at end of file
var async = require('async');
//var fs = require('fs');
var agriParser = require('./parser/agri_parser_factory');
function perform_function(context,request,response){
var job_id = context.jobconfig.job_id;
var transaction_id = context.transaction.id;
var memstore = context.job.memstore
// var memstore = context.task.memstore
var output_type = "object/sds";
let di_data = request.data;
let dtype = di_data.data_type;
let out = {
"object_type": 'Tanpibut',
"station_id": di_data.station_id,
"data_type": dtype,
"latitude": "",
"longitude": "",
"altitude": "",
"unit": "",
"value_type": "",
"type": "",
"data":[]
};
// for (var k = 0; k < dataKeySeries.length; k++) {
// console.log(di_data.data[dataKeySeries[k]]);
// }
let idx = 0;
async.whilst(function() { return idx < di_data.data.length;}, function(callback) {
agriParser.getParser(context.jobconfig.data_in.param.data_type.name).getValues(di_data.data[idx].value, function(value) {
idx++;
if(value !== null){
out.latitude = value.latitude;
out.longitude = value.longitude;
out.unit = value.unit;
out.value_type = value.value_type;
out.type = value.type;
for(var i=0; i<value.values.length; i++){
out.data.push(value.values[i]);
}
//console.log(`STAMP : ${di_data.station_id}-${dtype} = `+ value.value[value.value.length-1].observeddatetime);
memstore.setItem(`${di_data.station_id}-${dtype}`, value.values[value.values.length-1].observeddatetime, (err) =>{
if(err){
throw err;
callback(err);
}
else callback();
});
}
else callback();
});
}, function(err) {
if( err ) {
console.log(err);
response.error(err);
} else {
//fs.writeFileSync("./result.json", JSON.stringify(out));
if(out.data.length > 0){
// console.log("Result" + JSON.stringify(out));
response.success(out, output_type);
}
else {
if(context.jobconfig.data_in.param.recover){
memstore.setItem(`${di_data.station_id}-${dtype}`, di_data.timestamp, (err) =>{
response.reject();
});
}
else response.reject();
}
}
});
}
module.exports = perform_function;
{
"version":"1.2.5",
"build":"202305211800"
"version":"1.2.6dev",
"build":"202412011200"
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment