Commit 09e4572b authored by Kamron Aroonrua's avatar Kamron Aroonrua 💬

new update

parent 9ac434ca
......@@ -25,18 +25,39 @@ function JobTask (prm)
this.transaction_id = prm.transaction_id;
this.job_timeout = prm.opt.job_timeout || 60000;
this.flag_continue = false;
//0=>IDLE,1=>RUNNING,2=>DONE
this.state = 0;
this.stats = {
'start_time':0,
'end_time':0,
'di':null,
'dt':null,
'do':null
}
};
util.inherits(JobTask, EventEmitter);
JobTask.prototype.stop = function (status)
{
var self=this;
if(this.state==1){
this.state = 2;
this.emit('done', status);
if(this.flag_continue && status.status != "error"){
//console.log(status);
console.log('FLAG :: Continue JOB Transaction >>');
repeat_job(this);
}
this.stats.end_time = (new Date).getTime();
var dmsg = {
'job_id':self.jobcfg.job_id,
'transaction_id':self.transaction_id
'stats':self.stats,
'result':status
}
this.emit('done', dmsg);
}
}
......@@ -44,11 +65,15 @@ JobTask.prototype.run = function ()
{
var self=this;
var transaction_id = this.transaction_id || genTransactionId();
this.transaction_id = transaction_id;
var input_meta = this.input_meta;
var obj_input_data = getInputData(this.input_data);
var job_tr_config = this.jobcfg;
var job_id = job_tr_config.job_id;
this.stats.start_time = (new Date).getTime();
self.state = 1;
var ctx_transaction = {
......@@ -72,14 +97,23 @@ JobTask.prototype.run = function ()
dm_i.on('error', function(err) {
console.log('[DI] plugins error');
console.log(err);
callback(err)
self.stats.di = {'status':'error','data':'plugins error'};
callback({'status':'error','data':'plugins error'});
});
dm_i.run(function() {
perform_di({'context':context,'handle':self} ,function(err,resp){
if(resp){console.log('[DI STATUS]\t\t: ' + resp.status);}
if(resp){
console.log('[DI STATUS]\t\t: ' + resp.status);
self.stats.di = resp;
}
if(resp.status == 'success'){
self.flag_continue = resp.flag.continue;
callback(null,resp);
}else if(resp.status == 'reject'){
self.flag_continue = resp.flag.continue;
callback(resp);
}else{
callback(resp);
}
......@@ -94,7 +128,8 @@ JobTask.prototype.run = function ()
dm_t.on('error', function(err) {
console.log('[DT] plugins error');
console.log(err);
callback(err)
self.stats.dt = {'status':'error','data':'plugins error'};
callback({'status':'error','data':'plugins error'});
});
dm_t.run(function() {
......@@ -110,7 +145,9 @@ JobTask.prototype.run = function ()
perform_dt({'cfg':cur_cfg,'name':dt_name,'context':context,'request':dt_request,'handle':self},function(err,dt_resp){
if(dt_resp){console.log('[DT:' + dt_name + ' STATUS]\t\t: ' + dt_resp.status);}
if(dt_resp){
console.log('[DT:' + dt_name + ' STATUS]\t\t: ' + dt_resp.status);
}
idx++;
if(dt_resp.status == 'success'){
......@@ -143,7 +180,7 @@ JobTask.prototype.run = function ()
dm_o.on('error', function(err) {
console.log('[DO] plugins error');
console.log(err);
callback(err)
callback({'status':'error','data':'plugins error'});
});
dm_o.run(function() {
......@@ -182,8 +219,24 @@ JobTask.prototype.run = function ()
}
function repeat_job(self)
{
var jobcaller = self.jobcaller
var cmd = {
'object_type':'job_execute',
'source' : 'worker',
'jobId' : self.jobcfg.job_id,
'option' : {'exe_level':'secondary'},
'input_meta' : self.input_meta,
'input_data' : self.input_data
}
jobcaller.send(cmd);
}
function perform_di(prm,cb)
{
var di_context = prm.context;
var job_id = di_context.jobconfig.job_id;
......
var async = require('async');
var path = require('path');
var fs = require('fs')
const PLUGINS_DIR = __dirname + '/../../plugins';
module.exports.create = function ()
{
return new PluginManager();
}
function PluginManager()
{
}
PluginManager.prototype.list = function (type,opt)
{
var result = [];
if(['di','dt','do'].indexOf(type) < 0 ){return result;}
var ns_all = [''].concat(list_ns(type));
ns_all.forEach((ns)=>{
result = result.concat(list_plugin_ns(type,ns));
});
return result;
}
PluginManager.prototype.npm_install_all = function (cb)
{
var self=this;
var di_list = this.list('di');
var dt_list = this.list('dt');
var do_list = this.list('do');
console.log('PM :: installing bigstream plugin...');
npm_all(di_list,()=>{
npm_all(dt_list,()=>{
npm_all(do_list,()=>{
console.log('PM :: Finish');
if(typeof cb == 'function'){cb()}
});
});
});
function npm_all(list,callback)
{
async.eachSeries(list,function (plugin,callb){
self.npm_install_plugin(plugin.type,plugin.name,function(){
callb();
});
},function(err){
callback(err);
});
}
}
const os = require('os');
var spawn = require('child_process').spawn;
PluginManager.prototype.npm_install_plugin = function (type,name,cb)
{
var p_path = __dirname + '/../../';
var p_type = type;
var p_name = name;
if(typeof type == 'object')
{
p_path = p_path + type.path;
p_type = type.type;
p_name = type.name;
cb = name;
}else{
p_path = p_path + this.getPath(type,name);
}
if(typeof cb != 'function'){cb=function(){}}
var pack = p_path + '/package.json';
var cmd = 'npm';
if (os.platform() === 'win32') {
cmd = 'npm.cmd';
}
fs.access(pack,(err)=>{
if(err){
console.log('PM :: install plugin module ' + type + ':' + name + '\t\t[SKIP]');
cb();
}else{
console.log('PM :: installing plugin ' + type + ':' + name);
var npm = spawn(cmd,['install'],{'cwd':p_path});
// npm.stdout.on('data', (data) => {
// console.log(`${data}`);
// });
npm.stderr.on('data', (data) => {
console.log(`${data}`);
});
npm.on('close', (code) => {
if(code){
console.log('PM :: install plugin module ' + type + ':' + name + '\t\t[FAILED]');
}else{
console.log('PM :: install plugin module ' + type + ':' + name + '\t\t[OK]');
}
cb();
});
}
});
}
PluginManager.prototype.getPath = function(type,name)
{
var ns='';
var n=name;
var tok = name.split('.');
if(tok.length>1){
ns = tok[0] + '/';
n = tok.slice(1).join('.');
}
return 'plugins/' + type + '/' + ns + type + '-' + n;
}
function list_ns (type)
{
var result = [];
var p_dir = path.join(PLUGINS_DIR,type);
var files = fs.readdirSync(p_dir);
files.forEach((item)=>{
if(fs.statSync(path.join(p_dir, item)).isDirectory() && item.startsWith(type + '-')==false)
{
result.push(item);
}
});
return result;
}
function list_plugin_ns (type,ns)
{
var result = [];
var p_dir = path.join(PLUGINS_DIR,type);
if(ns && ns!='')
{
p_dir = path.join(p_dir,ns);
}
var files = fs.readdirSync(p_dir);
files.forEach((item)=>{
var pref = type + '-'
if(fs.statSync(path.join(p_dir, item)).isDirectory() && item.startsWith(pref)){
var nspath = (ns && ns!='')?ns+'/':'';
var n = (ns && ns!='')?ns+'.'+item.substr(pref.length):item.substr(pref.length);
var res = {
'type':type,
'name':n,
'ns':(ns)?ns:'',
'path': 'plugins/' + type + '/' + nspath + item
}
result.push(res);
}
});
return result;
}
......@@ -35,6 +35,7 @@ function DIResponse(handle){
this.status = null;
this.data = null;
this.meta = null;
this.continue = false;
this.output_type = '';
}
......@@ -47,13 +48,19 @@ DIResponse.prototype.success = function(data,type){
}else if(typeof type == 'object' && type){
this.output_type=(type.output_type)?type.output_type:this.output_type;
this.meta=(type.meta)?type.meta:this.meta;
this.continue=(type.continue)?true:false;
}
this.data = (data)?data:this.data ;
this.status = 'success';
process.nextTick(function(){
handle.emit('done',{'status':'success','meta':self.meta,'data':self.data,'type':self.output_type});
handle.emit('done', { 'status':'success',
'meta':self.meta,
'data':self.data,
'type':self.output_type,
'flag':{'continue':self.continue}
});
});
}
......@@ -64,14 +71,19 @@ DIResponse.prototype.error = function(err){
this.data = err;
this.status = 'error';
process.nextTick(function(){
handle.emit('done',{'status':'error','data':err});
handle.emit('done',{'status':'error',
'data':err
});
});
}
DIResponse.prototype.reject = function(){
var self=this;
var handle = this.handle;
this.status = 'reject';
process.nextTick(function(){
handle.emit('done',{'status':'reject'});
handle.emit('done',{ 'status':'reject',
'flag':{'continue':self.continue}
});
});
}
var util = require('util');
var DOPlugin = require('../do-plugin');
function DOTask(context,request){
DOPlugin.call(this,context,request);
this.name = "pgsql";
}
util.inherits(DOTask,DOPlugin);
DOTask.prototype.perform = require('./perform');
module.exports = DOTask;
{
"name": "do-pgsql",
"version": "1.0.0",
"description": "",
"main": "index.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
},
"author": "",
"license": "ISC",
"dependencies": {
"pg": "^7.0.2"
}
}
var pg = require('pg');
var ctx = require('../../../context');
var Utils = ctx.getLib('lib/util/plugin-utils');
function perform_function(context,request,response){
var job_id = context.jobconfig.job_id;
var transaction_id = context.transaction.id;
var param = context.jobconfig.data_out.param;
var memstore = context.task.memstore;
var in_type = request.type;
var data = (Array.isArray(request.data))?request.data:[request.data];
var meta = request.meta;
//parameter
var prm_host = param.host;
var prm_port = param.port;
var prm_db = param.database;
var prm_user = param.username;
var prm_password = param.password;
var prm_sql = param.sql;
var cnf = {}
if(prm_host){cnf.host=prm_host}
if(prm_port){cnf.port=prm_port}
if(prm_db){cnf.database=prm_db}
if(prm_user){cnf.user=prm_user}
if(prm_password){cnf.password=prm_password}
var sql='';
data.forEach((dat)=>{
var ev = {
'type' : in_type,
'meta' : meta,
'data' : dat
}
sql+=Utils.vm_execute_text(ev,prm_sql) + ';';
});
pgexcute(cnf,sql,function(err){
if(!err){
response.success();
}else{
response.error("pgsql error");
}
});
//response.success();
//response.reject();
//response.error("error message")
}
function pgexcute(conf,sql,callback){
var client = new pg.Client(conf);
client.connect(function(err) {
if(err) {
callback(err);
return console.error('could not connect to postgres', err);
}
client.query(sql, function(err, result) {
if(err) {
callback(err);
return console.error('error running query', err);
}
callback(null,result);
client.end();
});
});
}
function pg_query(conf,sql,callback){
pg.connect(conf, function(err, client, done) {
var handleError = function(err) {
if(!err) return false;
if(client){
done(client);
}
console.error('pg error ', err);
callback(err);
return true;
};
if(handleError(err)) return;
client.query(sql,function(err, result) {
if(handleError(err)) return;
done();
callback(null,result.rows);
});
});
}
module.exports = perform_function;
#!/usr/bin/env node
var ctx = require('../context');
var repl = require('repl');
var PluginManager = ctx.getLib('lib/plugin/plugin-manager');
var pm = PluginManager.create();
pm.npm_install_all();
......@@ -16,12 +16,13 @@ module.exports.create = function(prm)
function BSSEngine(prm)
{
if(typeof prm == 'string'){
prm = {'file':prm};
prm = {'file':prm,'context':null};
}
// this.repos_dir = prm.repos_dir;
// this.name = prm.name;
this.file = prm.file;
this.name = prm.name;
this.context = (prm.context)?prm.context:null;
this.concurrent = 0;
}
......@@ -102,7 +103,12 @@ BSSEngine.prototype.cmd_write = function(prm,cb)
'resource_id' : obj_id.toString(),
'storage_name' : self.name
}
dataevent.newdata({'resourceId':obj_id.toString(),'storageId':self.name});
//dataevent.newdata({'resourceId':obj_id.toString(),'storageId':self.name});
if(self.context){
newdata_event(self.context,{'resourceId':obj_id.toString(),'storageId':self.name});
}
cb(null,resp);
}else {
cb("write error");
......@@ -110,6 +116,25 @@ BSSEngine.prototype.cmd_write = function(prm,cb)
});
}
function newdata_event(ctx,prm)
{
var objId = prm.resourceId;
var storageId = prm.storageId;
var hostname = ctx.cfg.storage.api_hostname;
var obj_api_url = hostname + '/v1/object'
var key = 'storage.' + storageId + '.dataevent.newdata';
var objMsg = {
'event' : 'newdata',
'resourceId' : objId,
'resource_id' : objId,
'resource_location' : obj_api_url + '/' + storageId + '.' + objId
}
var evp = ctx.evp;
evp.send(key,objMsg);
}
function parseData(dat)
{
if(!dat.value){return null}
......
......@@ -4,7 +4,9 @@ module.exports = BSSPool;
function BSSPool(prm)
{
this.repos_dir = prm.repos_dir
this.context = prm.context;
this.pool = [];
this.size = 32;
}
BSSPool.prototype.get = function(name,cb)
......@@ -15,11 +17,13 @@ BSSPool.prototype.get = function(name,cb)
var bss = this.search(name);
if(bss){
self.clean(function(err){
process.nextTick(function() {
cb(null,bss.engine);
});
});
}else{
bss = BSSEngine.create({'file' : filepath,'name' : bssname});
bss = BSSEngine.create({'context':self.context,'file' : filepath,'name' : bssname});
bss.open(function(err){
if(!err){
self.pool.push({
......@@ -27,23 +31,49 @@ BSSPool.prototype.get = function(name,cb)
'engine':bss
});
}
self.clean(function(err){
cb(err,bss);
});
});
}
}
BSSPool.prototype.clean = function(cb)
{
if(this.size<2){this.size=2}
if(this.pool.length>this.size)
{
var garb = this.pool.shift();
garb.engine.close(cb);
console.log('SS :: release storage >> ' + garb.name);
}else{
cb();
}
}
BSSPool.prototype.search = function(name)
{
var ret=null;
for(var i=0;i<this.pool.length;i++)
{
var bssI = this.pool[i]
var newpool=[];
this.pool.forEach((bssI)=>{
if(bssI.name == name){
ret = bssI;
break;
}
}else{
newpool.push(bssI)
}
});
if(ret){newpool.push(ret)}
this.pool = newpool;
// for(var i=0;i<this.pool.length;i++)
// {
// var bssI = this.pool[i];
// if(bssI.name == name){
// ret = bssI;
// break;
// }
// }
return ret;
}
......
......@@ -7,8 +7,9 @@ module.exports.create = function(cfg){
function Db(cfg)
{
this.repos_dir = cfg.repos_dir;
this.context = cfg.context;
this.bsspool = new BSSPool({'repos_dir':this.repos_dir});
this.bsspool = new BSSPool({'repos_dir':this.repos_dir,'context':this.context});
}
Db.prototype.request = function(req,cb)
......
var path = require('path');
var fs = fs || require('fs')
var fs = require('fs')
var bss_walk_sync = function(dir, filelist,cat) {
files = fs.readdirSync(dir);
......
......@@ -7,8 +7,8 @@ var express = require('express');
var app = express();
var bodyParser = require('body-parser');
var EventPub = ctx.getLib('lib/amqp/event-pub');
var cfg = ctx.config;
var storage_cfg = cfg.storage;
module.exports.create = function(cfg)
{
......@@ -16,11 +16,18 @@ module.exports.create = function(cfg)
return ss;
}
var SS = function StorageService(cfg)
var SS = function StorageService(p_cfg)
{
this.config = cfg;
storage_cfg = cfg.storage;
this.db = Db.create({'repos_dir':storage_cfg.repository});
this.config = p_cfg;
var storage_cfg = p_cfg.storage;
var amqp_cfg = p_cfg.amqp;
this.context = {
'cfg':p_cfg,
'evp':new EventPub({'url':amqp_cfg.url,'name':'bs_storage'})
}
this.db = Db.create({'repos_dir':storage_cfg.repository,'context':this.context});
this.worker_pool = WorkerPool.create({'size':2});
}
......
#!/usr/bin/env node
var ctx = require('../context');
var repl = require('repl');
var PluginManager = ctx.getLib('lib/plugin/plugin-manager');
var pm = PluginManager.create();
global.pm = pm;
console.log("Entering interactive mode.");
repl.start({
prompt: "bigstream> ",
input: process.stdin,
output: process.stdout
});
......@@ -242,26 +242,25 @@ var job_config = {
// var dat2 = new Buffer(10);
// console.log(dat2);
// console.log(hash(dat2));
var path = require('path');
var fs = fs || require('fs')
var walkSync = function(dir, filelist,cat) {
files = fs.readdirSync(dir);
filelist = filelist || [];
cat = cat || '';
files.forEach(function(file) {
if (fs.statSync(path.join(dir, file)).isDirectory()) {
var base_cat = cat + file + '.'
filelist = walkSync(path.join(dir, file), filelist,base_cat);
}
else {
if(path.extname(file) == '.bss'){
var storage = cat + path.basename(file,'.bss');
filelist.push(storage);
}
}
});
return filelist;
};
// var Git = require("nodegit");
//
// Git.Clone("https://github.com/igridproject/bigstream-docker.git", "../bigstream-docker").then(function(repository) {
// // Work with the repository object here.
// console.log(repository);
// });
// var exec = require('child_process').exec;
// exec('npm install',{'cwd':'../plugins/dt/dt-agritronics'}, function(error, stdout, stderr) {
// console.log('stdout: ' + stdout);
// console.log('stderr: ' + stderr);
// if (error !== null) {
// console.log('exec error: ' + error);
// }
// });
var PluginManager = ctx.getLib('lib/plugin/plugin-manager');
console.log(walkSync('D:/testfile/BSDATA'));
var pm = PluginManager.create();
//pm.npm_install_plugin('dt','agritronics')
console.log(pm.list('di'));
......@@ -6,20 +6,19 @@ var amqp = require('amqplib/callback_api');
amqp.connect('amqp://bigmaster.igridproject.info', function(err, conn) {
amqp.connect('amqp://lab1.igridproject.info', function(err, conn) {
conn.createChannel(function(err, ch) {
var ex = 'bs_job_cmd';
var ex = 'bs_storage';
ch.assertExchange(ex, 'topic', {durable: false});
ch.assertQueue('', {exclusive: true}, function(err, q) {
console.log(' [*] Waiting for logs. To exit press CTRL+C');
ch.bindQueue(q.queue, ex, 'cmd.#');
ch.bindQueue(q.queue, ex, '#');
ch.consume(q.queue, function(msg) {
console.log(JSON.stringify(msg));
console.log(msg.fields.routingKey + '\n');
console.log(msg.content.toString() + '\n');
console.log('----------------------------------');
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment