Commit 0da621ee authored by Kamron Aroonrua's avatar Kamron Aroonrua 💬

do-mqtt plugin

parent fa04387b
package-lock.json
node_modules
\ No newline at end of file
...@@ -27,6 +27,7 @@ ...@@ -27,6 +27,7 @@
"minimatch": "^3.0.4", "minimatch": "^3.0.4",
"minimist": "^1.2.0", "minimist": "^1.2.0",
"moment": "^2.17.1", "moment": "^2.17.1",
"mqtt": "^3.0.0",
"node-gyp": "^3.6.2", "node-gyp": "^3.6.2",
"node-persist": "^2.0.7", "node-persist": "^2.0.7",
"node-schedule": "^1.2.0", "node-schedule": "^1.2.0",
......
var util = require('util');
var DOPlugin = require('../do-plugin');
function DOTask(context,request){
DOPlugin.call(this,context,request);
this.name = "bsspeak";
}
util.inherits(DOTask,DOPlugin);
DOTask.prototype.perform = require('./perform');
module.exports = DOTask;
var ctx = require('../../../context');
var Utils = ctx.getLib('lib/util/plugin-utils');
var mqtt = require('mqtt')
var async = require('async');
function perform_function(context,request,response){
var job_id = context.jobconfig.job_id;
var transaction_id = context.transaction.id;
var param = context.jobconfig.data_out.param;
var memstore = context.task.memstore;
var in_type = request.type;
var data = (Array.isArray(request.data))?request.data:[request.data];
var meta = request.meta;
var prm_url = param.url || "mqtt://127.0.0.1";
var prm_topic = param.topic;
var client = mqtt.connect(prm_url);
if(!prm_topic){
prm_topic = "/bsspeak/job/" + job_id;
}
client.on('connect', function () {
var idx = 0;
async.whilst(
function() { return idx < data.length; },
function(callback) {
var ev = {
'type' : in_type,
'meta' : meta,
'data' : data[idx]
}
var topic=Utils.vm_execute_text(ev,prm_topic);
client.publish(topic,data[idx],function(err){
idx++;
callback(err);
});
},
function (err) {
if(!err){
response.success();
}else{
console.log(err);
response.error("publish error");
}
client.end();
}
);
});
//response.success();
//response.reject();
//response.error("error message")
}
module.exports = perform_function;
var util = require('util');
var DOPlugin = require('../do-plugin');
function DOTask(context,request){
DOPlugin.call(this,context,request);
this.name = "mqtt";
}
util.inherits(DOTask,DOPlugin);
DOTask.prototype.perform = require('./perform');
module.exports = DOTask;
var ctx = require('../../../context');
var Utils = ctx.getLib('lib/util/plugin-utils');
var mqtt = require('mqtt')
var async = require('async');
function perform_function(context,request,response){
var job_id = context.jobconfig.job_id;
var transaction_id = context.transaction.id;
var param = context.jobconfig.data_out.param;
var memstore = context.task.memstore;
var in_type = request.type;
var data = (Array.isArray(request.data))?request.data:[request.data];
var meta = request.meta;
var prm_url = param.url || "mqtt://127.0.0.1";
var prm_topic = param.topic;
var client = mqtt.connect(prm_url);
client.on('connect', function () {
var idx = 0;
async.whilst(
function() { return idx < data.length; },
function(callback) {
var ev = {
'type' : in_type,
'meta' : meta,
'data' : data[idx]
}
var topic=Utils.vm_execute_text(ev,prm_topic);
client.publish(topic,data[idx],function(err){
idx++;
callback(err);
});
},
function (err) {
if(!err){
response.success();
}else{
console.log(err);
response.error("publish error");
}
client.end();
}
);
});
//response.success();
//response.reject();
//response.error("error message")
}
module.exports = perform_function;
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment