Commit 822c6e2f authored by Kamron Aroonrua's avatar Kamron Aroonrua 💬

update plugins

parent ceabb20d
var Client = require('ftp');
var async = require('async');
var fs = require('fs');
var host ="203.150.19.51";
var port = "21";
var user = "bs";
var pwd = "UF13kczHdCPXpBb";
var main_folder = "GISTDA_SOS_DATA"
var init_observed_date = "2017-10-18";
var init_observed_time = "09:00:00";
var di_plugin = "sftp-filesync";
var data_source = "gistda-air"
var job_path = "/Users/apple/Project@Nectec/i-bitz/jobs/gistda_air_job";
var stationTable = require('hashTable');
var config = {
host: host,
port: port,
user: user,
password: pwd
};
var c = new Client();
var stationTable = new stationTable();
fs.readFile('/Users/apple/Project@Nectec/i-bitz/data_sample/FromKPrasong/Gistda_Air_Station_Profile.json', function(err, data) {
if (err) throw err;
var profile = JSON.parse(data);
var features = profile.features;
for (var i=0; i<features.length; i++) {
var properties = features[i].properties;
stationTable.put(properties.ftp_folder_mapping, {latitude: properties.lat, longitude: properties.long});
}
// var location = stationTable.get('STATION1_KORAT');
});
c.on('ready', function() {
c.list(main_folder, function(err, list) {
if (err) throw err;
async.eachSeries(
list,
function(element, callback) {
// create job profile for .dat
create_job_profile(element.name, element.name, element.name, false);
// prepare the image path
var station_no = element.name;
station_no = station_no.replace("STATION","");
var n = station_no.search(/_/i);
station_no = station_no.substr(0, n);
var image_path_top = element.name + "/PIC" + station_no + "/TOP" + station_no;
var filename_top = element.name + "-PIC" + station_no + "-TOP" + station_no;
create_job_profile(element.name, image_path_top, filename_top, true);
var image_path_bottom = element.name + "/PIC" + station_no + "/BOTTOM" + station_no;
var filename_bottom = element.name + "-PIC" + station_no + "-BOTTOM" + station_no;
create_job_profile(element.name, image_path_bottom, filename_bottom, true);
console.log(element.name + ", " + image_path_top + " vs " + image_path_bottom);
callback();
}
);
});
});
c.connect(config);
var trigger = {};
trigger["type"] = "cron";
trigger["cmd"] = "0 * * * *";
var param = {};
param["source"] = data_source,
param["url"] = host,
param["port"] = port;
param["user"] = user;
param["password"] = pwd;
param["init_observed_date"] = init_observed_date;
param["init_observed_time"] = init_observed_time;
function create_job_profile(station_id, path, filename, is_image) {
var job = {};
var job_id = "sds." + data_source + "-" + filename;
job["job_id"] = job_id;
job["active"] = true;
job["trigger"] = trigger;
var location = stationTable.get(station_id);
var data_in = {};
data_in["type"] = data_source;
var profile = {};
profile["station_id"] = station_id;
profile["latitude"] = location.latitude;
profile["longitude"] = location.longitude;
data_in["profile"] = profile;
param["path"] = main_folder + "/" + path;
data_in["param"] = param;
if (!is_image) {
var data_transform = {};
data_transform["type"] = data_source;
} else {
var data_transform = [];
var transfrom1 = {};
transfrom1["type"] = data_source;
var script = {};
script["script"] = "data=Array.isArray(src.data)?src.data.pop():src.dat";
var transfrom2 = {};
transfrom2["type"] = "transform";
transfrom2["param"] = script;
data_transform.push(transfrom1);
data_transform.push(transfrom2);
}
var data_out = {};
data_out["type"] = "storage";
// data_out["type"] = "dir";
var data_out_param = {};
data_out_param["storage_name"] = "sds.gistda-air";
// data_out_param["path"] = "/Users/Naiyana/testdata";
data_out["param"] = data_out_param;
job["data_in"] = data_in;
job["data_transform"] = data_transform;
job["data_out"] = data_out;
fs.writeFile(job_path + "/" + job_id + ".json", JSON.stringify(job), function(err) {
if(err) {
return console.log(err);
}
console.log("The file was saved!");
});
}
...@@ -16,6 +16,8 @@ ...@@ -16,6 +16,8 @@
"path": "0.12.7", "path": "0.12.7",
"fs": "0.0.1-security", "fs": "0.0.1-security",
"dateformat":"2.0.0", "dateformat":"2.0.0",
"ftp":"0.3.10" "ftp":"0.3.10",
"moment":"2.18.1",
"hashtable":"2.0.2"
} }
} }
var path = require('path'); var path = require('path');
var fs = require('fs'); var fs = require('fs');
var async = require('async'); var async = require('async');
var dateFormat = require('dateformat'); var dateFormat = require('dateformat');
var Client = require('ftp'); var Client = require('ftp');
function execute_function(context,response){ function execute_function(context,response){
var job_id = context.jobconfig.job_id; var job_id = context.jobconfig.job_id;
var transaction_id = context.transaction.id; var transaction_id = context.transaction.id;
var param = context.jobconfig.data_in.param; var profile = context.jobconfig.data_in.profile;
var memstore = context.task.memstore var param = context.jobconfig.data_in.param;
var memstore = context.task.memstore
var output_type = 'object/gistda-air'
var output_type = 'object/gistda-air'
var config = {
host: param.url, var config = {
port: param.port, host: param.url,
user: param.user, port: param.port,
password: param.password user: param.user,
}; password: param.password
};
let result = {
"object_type": param.source, let result = {
"data":[] "object_type": param.source,
}; "data":[]
};
let maxdate;
let maxdate;
var c = new Client();
var c = new Client();
c.on('ready', function() {
c.list(function(err, list) { var key = param.path + '-lasttransaction';
if (err) throw err;
c.on('ready', function() {
memstore.getItem('lasttransaction',function(err,value) {
if (err) throw err; c.list(param.path, function(err, list) {
if (err) throw err;
var latestDate;
if (typeof value == 'undefined') { memstore.getItem(key,function(err,value) {
var latestDateStr = param.init_observed_date + ' ' + param.init_observed_time; //'2016-12-20T10:00:00+04:00'; if (err) throw err;
latestDate = new Date(latestDateStr);
} else { var latestDate;
// var date = value.substring(0, 10); if (!value) {
// var time = value.substring(11,19) var latestDateStr = param.init_observed_date + ' ' + param.init_observed_time; //'2016-12-20T10:00:00+04:00';
// latestDate = new Date(date + ' ' + time); latestDate = new Date(latestDateStr);
latestDate = new Date(value); } else {
} latestDate = new Date(value);
}
console.log(value + " !!! " + latestDate);
async.eachSeries(
async.eachSeries( list,
list, function(element, callback) {
function(element, callback) { if (typeof element !== 'undefined') {
if (typeof element !== 'undefined') { if (element.type !== 'd') { // filter out directories
if (element.type !== 'd') { // filter out directories var filename = element.name;
var filename = element.name; var filedate = element.date;
var filedate = element.date; var filetype = element.type;
var filetype = element.type; // if ((path.extname(filename) === '.dat' || path.extname(filename) === '.jpg') && filename.indexOf("debug") == -1) {
if ((path.extname(filename) === '.dat' &&
if (path.extname(filename) === '.dat' && filename.indexOf("debug") == -1) { (filename.indexOf("Every_5m") > 0 || (filename.indexOf("MS700") > 0 && filename.indexOf("debug") == -1)))
if (filedate - latestDate > 0) { // filter out old files || path.extname(filename) === '.jpg') {
c.get(filename, function (err, stream) {
if (err) throw err; var type = 'text';
var data = ''; if (path.extname(filename) === '.jpg')
stream.setEncoding('utf8'); type = 'image';
console.log("downloading .... : " + filename + ", " + dateFormat(filedate, "isoDateTime"));
stream.on('data', function(chunk) { // donwload each individual chunk as per a downloading file if (filedate - latestDate > 0) { // filter out old files
if (chunk != '') c.get(param.path+"/"+filename, function (err, stream) {
data = data + chunk; if (err) throw err;
}); var data = '';
stream.on('end', function () { // insert a data file stream.setEncoding('utf8');
result.data.push({ console.log("downloading .... : " + filename + ", " + dateFormat(filedate, "isoDateTime"));
"filename": filename, stream.on('data', function(chunk) { // donwload each individual chunk as per a downloading file
"value" : data if (chunk != '')
}); data = data + chunk;
if (typeof maxdate == 'undefined') { });
maxdate = filedate; stream.on('end', function () { // insert a data file
} else { result.data.push({
if (filedate - maxdate > 0) { "filename": filename,
maxdate = filedate; "station_id": profile.station_id,
} "latitude": profile.latitude,
} "longitude": profile.longitude,
memstore.setItem('lasttransaction',dateFormat(maxdate, "isoDateTime"),function(err){ "type": type,
if (err) throw err; "observeddatetime": dateFormat(filedate, 'yyyy-mm-dd HH:MM:ss'),
callback(); "value" : data
}); });
}); if (typeof maxdate == 'undefined') {
// stream.pipe(fs.createWriteStream(filename)); maxdate = filedate;
}); } else {
if (filedate - maxdate > 0) {
} else { maxdate = filedate;
async.setImmediate(callback); }
//callback(null); }
} memstore.setItem(key,dateFormat(maxdate, 'yyyy-mm-dd HH:MM:ss'),function(err){
} else { if (err) throw err;
async.setImmediate(callback); callback();
//callback(null); });
} });
} else // stream.pipe(fs.createWriteStream(filename));
async.setImmediate(callback); });
//callback(null);
} else } else {
async.setImmediate(callback); async.setImmediate(callback);
//callback(null); //callback(null);
}, }
function(err) { } else {
if (err) { async.setImmediate(callback);
response.error(err); //callback(null);
} else { }
if (result.data.length == 0) } else
response.reject(); // for no data async.setImmediate(callback);
else //callback(null);
response.success(result, output_type); } else
c.end(); async.setImmediate(callback);
} //callback(null);
} },
); // async close function(err) {
}); // memstore close if (err) {
}); response.error(err);
}); } else {
if (result.data.length == 0)
c.connect(config); response.reject(); // for no data
else
response.success(result, output_type);
//response.reject(); c.end();
}
} }
); // async close
}); // memstore close
});
module.exports = execute_function; });
c.connect(config);
//response.reject();
}
module.exports = execute_function;
...@@ -10,7 +10,7 @@ function execute_function(context,response){ ...@@ -10,7 +10,7 @@ function execute_function(context,response){
var url = param.url; var url = param.url;
var reject = true; var reject = true;
if(param.reject=='false'){reject=false;} if(param.reject==false){reject=false;}
var encode = 'utf8'; var encode = 'utf8';
if(param.encoding == 'binary'){ if(param.encoding == 'binary'){
......
var util = require('util');
var DIPlugin = require('../di-plugin');
function DITask(context){
DIPlugin.call(this,context);
this.name = "sftp-filesync";
this.output_type = "";
}
util.inherits(DITask,DIPlugin);
DITask.prototype.perform = require('./perform');
module.exports = DITask;
{
"name": "di-sftp-filesync",
"version": "1.0.0",
"description": "",
"main": "index.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
},
"author": "",
"license": "ISC",
"dependencies": {
"ssh2-sftp-client": "^2.0.1"
}
}
var Client = require("ssh2-sftp-client");
var path = require('path');
function perform_function(context,response){
var job_id = context.jobconfig.job_id;
var transaction_id = context.transaction.id;
var param = context.jobconfig.data_in.param;
var memstore = context.job.memstore;
var output_type = 'object'
var prm_host = param.host;
var prm_port = param.port || 22;
var prm_user = param.username || "";
var prm_pass = param.password || "";
var prm_dir = param.dir || "~";
var prm_encoding = param.encoding || "binary";
var prm_continue = (typeof param.continue == 'boolean' && param.continue.toString() == 'false')?false:true;
//filter.ext|filetype
var prm_filter = param.filter || {};
var meta = {};
var last_mod = {'fname':'','tts':0};
var fs_continue = false;
var buff_out = new Buffer(0);
memstore.getItem('lastmodify',function(err,value){
if(value){
last_mod=value;
}
getData();
});
function getData(){
var sftp = new Client();
sftp.connect({
host: prm_host,
port: prm_port,
username: prm_user,
password: prm_pass
}).then(() => {
return sftp.list(prm_dir + '/');
}).then((fList) => {
var f_target = null;
var last_tts = 0;
var sync_list = [];
fList.forEach((file)=>{
if(file.modifyTime > last_mod.tts && file.type == '-' && rulesMatch(prm_filter,file)){
sync_list.push(file);
if(f_target==null || (file.modifyTime < f_target.modifyTime) ){
f_target = file;
last_tts = file.modifyTime;
last_mod.fname = file.name;
}
}
});
if(sync_list.length > 1 && prm_continue){fs_continue = true;}
last_mod.tts = last_tts;
if(f_target){
meta = {
'filename' : f_target.name,
'fileext': path.extname(f_target.name),
'filesize': f_target.size,
'modify_ts' : Math.round(f_target.modifyTime/1000)
}
return sftp.get(prm_dir + '/' + f_target.name,null,null);
}else{
return null;
}
}).then((data) => {
if(data){
data.on('data',(dat)=>{
var nb = Buffer.concat([buff_out,dat]);
buff_out = nb;
})
data.on('end',()=>{
sftp.end()
memstore.setItem('lastmodify',last_mod,function(err){
var result=(prm_encoding=='binary')?buff_out:buff_out.toString('utf8');
response.success(result, {"meta":meta,"continue": fs_continue});
});
});
}else{
sftp.end();
response.reject();
}
}).catch((err) => {
sftp.end();
response.error(err);
console.log(err, 'catch error');
});
}
// memstore.setItem('lasttransaction',transaction_id,function(err){
// response.success(data);
// });
// memstore.getItem('lasttransaction2',function(err,value){
// console.log('key');
// console.log(value);
// response.success(value);
// });
//response.success(data,output_type);
//response.reject();
//response.error("error message")
}
function rulesMatch(r,fd)
{
var ret = true;
var fname = fd.name;
if(r.ext){
var extlist = (Array.isArray(r.ext))?r.ext:r.ext.split(',');
if(extlist.indexOf(path.extname(fname)) < 0){
ret = false;
}
}
if(r.filetype){
var ftlist = (Array.isArray(r.filetype))?r.filetype:r.filetype.split(',');
if(ftlist.indexOf(path.extname(fname)) < 0){
ret = false;
}
}
return ret;
}
module.exports = perform_function;
...@@ -31,6 +31,15 @@ function perform_function(context,request,response){ ...@@ -31,6 +31,15 @@ function perform_function(context,request,response){
"_ts" : Math.round((new Date).getTime() / 1000) "_ts" : Math.round((new Date).getTime() / 1000)
} }
if(meta && typeof meta == 'object')
{
Object.keys(meta).forEach((item)=>{
if(!item.startsWith('_')){
dc_meta[item] = meta[item];
}
});
}
var idx = 0; var idx = 0;
async.whilst( async.whilst(
function() { return idx < data.length; }, function() { return idx < data.length; },
...@@ -61,6 +70,7 @@ function perform_function(context,request,response){ ...@@ -61,6 +70,7 @@ function perform_function(context,request,response){
if(!err){ if(!err){
response.success(); response.success();
}else{ }else{
console.log(err);
response.error("storage error"); response.error("storage error");
} }
} }
......
var async = require('async');
function perform_function(context,request,response){ function perform_function(context,request,response){
var job_id = context.jobconfig.job_id; var job_id = context.jobconfig.job_id;
var transaction_id = context.transaction.id; var transaction_id = context.transaction.id;
...@@ -12,53 +14,132 @@ function perform_function(context,request,response){ ...@@ -12,53 +14,132 @@ function perform_function(context,request,response){
var nfiles = data.data.length; var nfiles = data.data.length;
var i = 0; var i = 0;
while (i < nfiles) { async.whilst(
var filename = data.data[i].filename; function() { return i < nfiles; },
var filecontent = data.data[i].value; function(callback) {
var arr = filecontent.toString().split("\r\n"); var filename = data.data[i].filename;
var arr_type = arr[1].split(","); var station_id = data.data[i].station_id;
var arr_unit = arr[2].split(","); var latitude = data.data[i].latitude;
var arr_value_type = arr[3].split(","); var longitude = data.data[i].longitude;
var ndata = arr_type.length; var data_type = data.data[i].type;
var col = 1; var filecontent = data.data[i].value;
var observeddatetime = data.data[i].observeddatetime;
let _result = { i++;
"object_type":"sds",
"station_id" : filename, // need to change to exact station let _result = {
"latitude":"", "object_type":"sds",
"longitude";"", "station_id" : station_id, // need to change to exact station
"altitude":"", "latitude":latitude,
"data":[] "longitude":longitude,
}; "data":[]
};
while (col < ndata) {
var row = 4; if (data_type == 'text') {
let values = []; _result = perform_text(_result, filecontent);
while (row < arr.length-1) { result.push(_result);
var rdata = arr[row].split(","); callback();
values.push({ } else if (data_type == 'image') {
"observeddatetime":rdata[0].replace('"','').replace('"',''), getImage(filecontent).then((base64) => {
"value":rdata[col] var values = [];
}); var avalue = {};
row++; avalue["observeddatetime"] = observeddatetime;
} avalue["value"] = base64;
_result.data.push({ values.push(avalue);
"type": arr_type[col].replace('"','').replace('"',''), _result.data.push({"type":"image", "values":values});
"unit": arr_unit[col].replace('"','').replace('"',''), result.push(_result);
"value_type" : arr_value_type[col].replace('"','').replace('"',''), callback();
"values":values }).catch((err) => {
}); throw err
});
col++; }
} },
result.push(_result); function (err, n) {
i++; response.success(result,output_type);
} }
);
response.success(result,output_type);
//response.reject(); // while (i < nfiles) {
//response.error("error message") // var filename = data.data[i].filename;
// var data_type = data.data[i].type;
// var filecontent = data.data[i].value;
// let _result = {
// "object_type":"sds",
// "station_id" : filename, // need to change to exact station
// "latitude":"",
// "longitude":"",
// "altitude":"",
// "data":[]
// };
// if (data_type == 'text')
// _result = perform_text(_result, filecontent);
// else if (data_type == 'image') {
// // _result = perform_image(_result, filecontent);
// getImage(filecontent).then((base64) => {
// _result.data.push({"values":base64});
// console.log("after get image");
// }).catch((err) => {
// throw err
// });
// }
// console.log("will result");
// result.push(_result);
// i++;
// }
// response.success(result,output_type);
// //response.reject();
// //response.error("error message")
}
function perform_text(_result, filecontent) {
var arr = filecontent.toString().split("\r\n");
var arr_type = arr[1].split(",");
var arr_unit = arr[2].split(",");
var arr_value_type = arr[3].split(",");
var ndata = arr_type.length;
var col = 1;
while (col < ndata) {
var row = 4;
let values = [];
while (row < arr.length-1) {
var rdata = arr[row].split(",");
values.push({
"observeddatetime":rdata[0].replace('"','').replace('"',''),
"value":rdata[col]
});
row++;
}
_result.data.push({
"type": arr_type[col].replace('"','').replace('"',''),
"unit": arr_unit[col].replace('"','').replace('"',''),
"value_type" : arr_value_type[col].replace('"','').replace('"',''),
"values":values
});
col++;
}
return _result;
}
function perform_image(_result, filecontent) {
getImage(filecontent).then((base64) => {
_result.data.push({"values":base64});
}).catch((err) => {
throw err
});
return _result;
}
function getImage(filecontent) {
return new Promise((resolve, reject) => {
resolve("data:image;base64," + new Buffer(filecontent).toString('base64'));
});
} }
module.exports = perform_function; module.exports = perform_function;
var ctx = require('../context'); var ctx = require('../context');
var amqp_cfg = ctx.config.amqp; var amqp_cfg = ctx.config.amqp;
var AMQP_URL = 'amqp://bigmaster.igridproject.info';
var QueueReceiver = ctx.getLib('lib/amqp/queuereceiver'); var QueueReceiver = ctx.getLib('lib/amqp/queuereceiver');
var server = new QueueReceiver({ var server = new QueueReceiver({
url : amqp_cfg.url, url : AMQP_URL,
name : 'bs_jobs_queue'
}); });
server.set_execute_function(function(data,callback){ server.set_execute_function(function(data,callback){
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment