Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Sign in
Toggle navigation
N
node-bigstream
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
3
Merge Requests
3
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
bs
node-bigstream
Commits
49185e0e
Commit
49185e0e
authored
May 18, 2017
by
project
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
--no commit message
--no commit message
parent
217af7a0
Hide whitespace changes
Inline
Side-by-side
Showing
6 changed files
with
130 additions
and
36 deletions
+130
-36
job-manager.js
coreservice/lib/job-manager.js
+15
-0
scheduler.js
coreservice/scheduler.js
+1
-0
ws-jobs.js
coreservice/ws/v1/ws-jobs.js
+18
-0
cronlist.js
lib/mems/cronlist.js
+22
-3
trigger-registry.js
lib/mems/trigger-registry.js
+49
-0
test.js
test/test.js
+25
-33
No files found.
coreservice/lib/job-manager.js
View file @
49185e0e
...
...
@@ -2,6 +2,7 @@ var ctx = require('../../context');
var
cfg
=
ctx
.
config
;
var
JobRegistry
=
ctx
.
getLib
(
'lib/mems/job-registry'
);
var
TriggerRegistry
=
ctx
.
getLib
(
'lib/mems/trigger-registry'
);
var
JUtils
=
ctx
.
getLib
(
'lib/job/jobutils'
);
module
.
exports
.
create
=
function
(
cfg
)
...
...
@@ -15,6 +16,7 @@ function JobManager (cfg)
this
.
conn
=
cfg
.
conn
;
this
.
mem
=
this
.
conn
.
getMemstore
();
this
.
job_registry
=
JobRegistry
.
create
({
'redis'
:
this
.
mem
});
this
.
trigger_registry
=
TriggerRegistry
.
create
({
'redis'
:
this
.
mem
});
}
JobManager
.
prototype
.
listJob
=
function
(
prm
,
cb
)
...
...
@@ -43,5 +45,18 @@ JobManager.prototype.getJob = function (prm,cb)
self
.
job_registry
.
getJob
(
prm
.
jid
,
function
(
err
,
jobcfg
){
cb
(
err
,
jobcfg
)
})
}
JobManager
.
prototype
.
setJob
=
function
(
prm
,
cb
)
{
var
self
=
this
;
var
job
=
prm
.
job
;
if
(
JUtils
.
validate
(
job
)){
self
.
job_registry
.
setJob
(
job
.
job_id
,
job
);
self
.
trigger_registry
.
setByJob
(
job
);
cb
(
null
);
}
else
{
cb
(
'Invalid job config'
);
}
}
coreservice/scheduler.js
View file @
49185e0e
...
...
@@ -41,6 +41,7 @@ SchedulerService.prototype.reload = function ()
self
.
clean
();
self
.
crons
.
update
(
function
(
err
){
var
cl
=
self
.
crons
.
list
;
console
.
log
(
cl
);
for
(
var
i
=
0
;
i
<
cl
.
length
;
i
++
)
{
var
c
=
cl
[
i
];
...
...
coreservice/ws/v1/ws-jobs.js
View file @
49185e0e
...
...
@@ -34,4 +34,22 @@ router.get('/:jid',function (req, res) {
})
});
router
.
post
(
'/'
,
function
(
req
,
res
)
{
var
reqHelper
=
request
.
create
(
req
);
var
respHelper
=
response
.
create
(
res
);
var
jm
=
req
.
context
.
jobManager
;
var
json_job
=
req
.
body
;
jm
.
setJob
({
'job'
:
json_job
},
function
(
err
,
res
){
if
(
err
)
{
respHelper
.
response400
(
err
);
}
else
{
respHelper
.
response201
();
}
});
});
module
.
exports
=
router
;
lib/mems/cronlist.js
View file @
49185e0e
var
redis
=
require
(
'redis'
);
const
PREFIX
=
'bs:scheduler:cronlist'
;
const
KEYS
=
'bs:regis:triggers'
;
module
.
exports
.
create
=
function
(
cfg
)
{
...
...
@@ -54,12 +55,30 @@ CronList.prototype.clean = function()
CronList
.
prototype
.
update
=
function
(
cb
)
{
var
self
=
this
;
this
.
mem
.
get
(
PREFIX
,
function
(
err
,
result
)
{
if
(
!
err
&&
result
){
self
.
list
=
JSON
.
parse
(
result
);
// this.mem.get(PREFIX, function (err, result) {
// if(!err && result){
// self.list = JSON.parse(result);
// }
// cb(err);
// });
self
.
list
=
[];
self
.
mem
.
hgetall
(
KEYS
,
function
(
err
,
res
){
if
(
!
err
&&
Array
.
isArray
(
res
)){
var
ks
=
Object
.
keys
(
res
);
for
(
int
i
=
0
;
i
<
ks
.
length
;
i
++
)
{
var
k
=
ks
[
i
];
var
trigger
=
JSON
.
parse
(
res
[
k
]);
if
(
trigger
.
type
==
'cron'
)
{
var
cl
=
self
.
mkCron
(
trigger
.
id
,
trigger
.
cmd
,
trigger
.
job_id
);
self
.
list
.
push
(
cl
);
}
}
}
cb
(
err
);
});
}
CronList
.
prototype
.
commit
=
function
(
cb
)
...
...
lib/mems/trigger-registry.js
0 → 100644
View file @
49185e0e
var
redis
=
require
(
'redis'
);
const
KEYS
=
'bs:regis:triggers'
;
module
.
exports
.
create
=
function
(
cfg
)
{
return
new
TR
(
cfg
);
}
var
TR
=
function
TriggerRegistry
(
cfg
)
{
this
.
config
=
cfg
;
if
(
cfg
.
conn
){
this
.
mem
=
redis
.
createClient
(
cfg
.
conn
);
}
else
if
(
cfg
.
redis
){
this
.
mem
=
cfg
.
redis
;
}
else
{
this
.
mem
=
null
;
}
}
TR
.
prototype
.
setTrigger
=
function
(
name
,
trigger
,
cb
)
{
var
self
=
this
;
var
strTrigger
=
JSON
.
stringify
(
trigger
);
self
.
mem
.
hset
(
KEYS
,
name
,
strTrigger
);
if
(
typeof
cb
==
'function'
){
cb
();
}
}
TR
.
prototype
.
setByJob
=
function
(
job
,
cb
)
{
var
self
=
this
;
var
id
=
'def.'
+
job
.
job_id
;
var
trigger
=
job
.
trigger
;
trigger
.
id
=
id
;
trigger
.
job_id
=
job
.
job_id
;
self
.
setTrigger
(
id
,
trigger
,
cb
);
}
TR
.
prototype
.
clear
=
function
(
cb
)
{
self
.
mem
.
del
(
KEYS
);
if
(
typeof
cb
==
'function'
){
cb
();
}
}
test/test.js
View file @
49185e0e
...
...
@@ -125,23 +125,23 @@ const crypto = require("crypto");
var
redis
=
require
(
'redis'
);
// var handle = {'mem' : redis.createClient('redis://bigmaster.igridproject.info:6379/1')}
// var input_data = {};
//
var job_config = {
//
"job_id" : "example",
//
"active" : true,
//
"trigger" : {
//
"type": "cron",
//
"cmd": "29,59 * * * * *"
//
},
//
"data_in" : {
//
"type": "example"
//
},
//
"data_transform" : {
//
"type": "noop"
//
},
//
"data_out" : {
//
"type": "console"
//
}
//
}
var
job_config
=
{
"job_id"
:
"example"
,
"active"
:
true
,
"trigger"
:
{
"type"
:
"cron"
,
"cmd"
:
"29,59 * * * * *"
},
"data_in"
:
{
"type"
:
"example"
},
"data_transform"
:
{
"type"
:
"noop"
},
"data_out"
:
{
"type"
:
"console"
}
}
//
// var ag = {
// "job_id" : "agritronics-gistda-01",
...
...
@@ -219,24 +219,16 @@ var redis = require('redis');
// });
//var client = redis.createClient('redis://bigmaster.igridproject.info:6379/1');
var
client
=
redis
.
createClient
(
'redis://localhost:9736/1'
);
client
.
keys
(
'bs:regis:jobs:*'
,
function
(
err
,
keys
)
{
if
(
err
)
return
console
.
log
(
err
);
var
arr
=
[];
for
(
var
i
=
0
,
len
=
keys
.
length
;
i
<
len
;
i
++
)
{
//console.log(keys[i]);
arr
.
push
(
keys
[
i
].
split
(
':'
)[
3
]);
client
.
get
(
keys
[
i
],
function
(
err
,
data
){
var
obj_data
=
JSON
.
parse
(
data
);
//arr.push(obj_data);
});
}
var
client
=
redis
.
createClient
(
'redis://lab1.igridproject.info:6379/1'
);
//var client = redis.createClient('redis://localhost:9736/1');
console
.
log
(
arr
);
client
.
hgetall
(
'bs:regis:triggers'
,
function
(
err
,
rep
)
{
Object
.
keys
(
rep
).
forEach
(
function
(
a
,
b
){
console
.
log
(
b
);
})
//console.log(Object.keys(rep));
});
//client.hset('bs:regis:triggers','job01',JSON.stringify(job_config));
// var CronList = ctx.getLib('lib/mems/cronlist');
//
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment