Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Sign in
Toggle navigation
N
node-bigstream
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
3
Merge Requests
3
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
bs
node-bigstream
Commits
1dd67702
Commit
1dd67702
authored
May 03, 2017
by
project
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
--no commit message
--no commit message
parent
7d3fd30a
Hide whitespace changes
Inline
Side-by-side
Showing
5 changed files
with
56 additions
and
12 deletions
+56
-12
scheduler.js
coreservice/scheduler.js
+29
-2
message.js
lib/model/example/message.js
+6
-0
test.js
test/test.js
+0
-9
test_jobexecute.js
test/test_jobexecute.js
+1
-1
test_trigger_ctl.js
test/test_trigger_ctl.js
+20
-0
No files found.
coreservice/scheduler.js
View file @
1dd67702
...
@@ -7,6 +7,7 @@ var amqp_cfg = ctx.config.amqp;
...
@@ -7,6 +7,7 @@ var amqp_cfg = ctx.config.amqp;
var
ConnCtx
=
ctx
.
getLib
(
'lib/conn/connection-context'
);
var
ConnCtx
=
ctx
.
getLib
(
'lib/conn/connection-context'
);
var
CronList
=
ctx
.
getLib
(
'lib/mems/cronlist'
);
var
CronList
=
ctx
.
getLib
(
'lib/mems/cronlist'
);
var
QueueCaller
=
ctx
.
getLib
(
'lib/amqp/queuecaller'
);
var
QueueCaller
=
ctx
.
getLib
(
'lib/amqp/queuecaller'
);
var
EvenSub
=
ctx
.
getLib
(
'lib/amqp/event-sub'
);
module
.
exports
.
create
=
function
(
cfg
)
module
.
exports
.
create
=
function
(
cfg
)
{
{
...
@@ -20,6 +21,7 @@ function SchedulerService(cfg)
...
@@ -20,6 +21,7 @@ function SchedulerService(cfg)
this
.
conn
=
ConnCtx
.
create
(
this
.
config
);
this
.
conn
=
ConnCtx
.
create
(
this
.
config
);
this
.
mem
=
this
.
conn
.
getMemstore
();
this
.
mem
=
this
.
conn
.
getMemstore
();
this
.
jobcaller
=
new
QueueCaller
({
'url'
:
amqp_cfg
.
url
,
'name'
:
'bs_jobs_queue'
});
this
.
jobcaller
=
new
QueueCaller
({
'url'
:
amqp_cfg
.
url
,
'name'
:
'bs_jobs_queue'
});
this
.
evs
=
new
EvenSub
({
'url'
:
amqp_cfg
.
url
,
'name'
:
'bs_trigger_cmd'
});
this
.
crons
=
CronList
.
create
({
'redis'
:
this
.
mem
});
this
.
crons
=
CronList
.
create
({
'redis'
:
this
.
mem
});
this
.
engine
=
[];
this
.
engine
=
[];
...
@@ -29,6 +31,7 @@ SchedulerService.prototype.start = function ()
...
@@ -29,6 +31,7 @@ SchedulerService.prototype.start = function ()
{
{
console
.
log
(
'SCHEDULER:Starting
\
t
\
t[OK]'
);
console
.
log
(
'SCHEDULER:Starting
\
t
\
t[OK]'
);
this
.
reload
();
this
.
reload
();
this
.
_start_controller
();
}
}
SchedulerService
.
prototype
.
reload
=
function
()
SchedulerService
.
prototype
.
reload
=
function
()
...
@@ -40,11 +43,14 @@ SchedulerService.prototype.reload = function ()
...
@@ -40,11 +43,14 @@ SchedulerService.prototype.reload = function ()
var
cl
=
self
.
crons
.
list
;
var
cl
=
self
.
crons
.
list
;
for
(
var
i
=
0
;
i
<
cl
.
length
;
i
++
)
for
(
var
i
=
0
;
i
<
cl
.
length
;
i
++
)
{
{
var
c
=
cl
[
i
];
var
s
=
schedule
.
scheduleJob
(
cl
[
i
].
cmd
,
function
(
y
){
var
s
=
schedule
.
scheduleJob
(
cl
[
i
].
cmd
,
function
(
y
){
self
.
_callJob
(
y
);
self
.
_callJob
(
y
);
}.
bind
(
null
,
cl
[
i
]));
}.
bind
(
null
,
cl
[
i
]));
self
.
engine
.
push
(
s
);
self
.
engine
.
push
({
'c'
:
c
,
's'
:
s
});
}
}
console
.
log
(
'SCHEDULER:Register '
+
String
(
i
)
+
' jobs
\
t[OK]'
);
});
});
}
}
...
@@ -54,7 +60,7 @@ SchedulerService.prototype.clean = function ()
...
@@ -54,7 +60,7 @@ SchedulerService.prototype.clean = function ()
var
arrEngine
=
this
.
engine
;
var
arrEngine
=
this
.
engine
;
for
(
var
i
=
0
;
i
<
arrEngine
.
length
;
i
++
)
for
(
var
i
=
0
;
i
<
arrEngine
.
length
;
i
++
)
{
{
arrEngine
[
i
].
cancel
();
arrEngine
[
i
].
s
.
cancel
();
}
}
this
.
engine
=
[];
this
.
engine
=
[];
}
}
...
@@ -76,5 +82,26 @@ SchedulerService.prototype._callJob = function(cron)
...
@@ -76,5 +82,26 @@ SchedulerService.prototype._callJob = function(cron)
}
}
this
.
jobcaller
.
send
(
cmd
);
this
.
jobcaller
.
send
(
cmd
);
}
SchedulerService
.
prototype
.
_start_controller
=
function
()
{
var
self
=
this
;
var
topic
=
'ctl.trigger.#'
;
self
.
evs
.
sub
(
topic
,
function
(
err
,
msg
){
if
(
!
msg
){
return
;}
var
ctl
=
msg
.
data
;
if
(
ctl
.
trigger_type
!=
'cron'
&&
ctl
.
trigger_type
!=
'all'
)
{
return
;
}
if
(
ctl
.
cmd
==
'reload'
)
{
console
.
log
(
'SCHEDULER:CMD Reload
\
t
\
t[OK]'
);
self
.
reload
();
}
});
}
}
lib/model/example/message.js
View file @
1dd67702
...
@@ -54,3 +54,9 @@ var cron = {
...
@@ -54,3 +54,9 @@ var cron = {
'cron'
:
'*/10 * * * * *'
,
'cron'
:
'*/10 * * * * *'
,
'jobid'
:
'job01'
'jobid'
:
'job01'
}
}
var
trigger_cmd
=
{
'trigger_type'
:
'cron'
,
'cmd'
:
'reload'
,
'param'
:{}
}
test/test.js
View file @
1dd67702
...
@@ -242,12 +242,3 @@ var client = redis.createClient('redis://bigmaster.igridproject.info:6379/1');
...
@@ -242,12 +242,3 @@ var client = redis.createClient('redis://bigmaster.igridproject.info:6379/1');
// crons.update(function(err){
// crons.update(function(err){
// console.log(crons.list);
// console.log(crons.list);
// });
// });
var
schedule
=
require
(
'node-schedule'
);
var
cron
=
'*/10 * * * * *'
;
var
x
=
'Tada!'
;
var
j
=
schedule
.
scheduleJob
(
cron
,
function
(
y
){
console
.
log
(
y
);
}.
bind
(
null
,
x
));
x
=
'Changing Data'
;
console
.
log
(
x
);
test/test_jobexecute.js
View file @
1dd67702
...
@@ -12,7 +12,7 @@ var qc = new QueueCaller({'url':amp,'name':'bs_jobs_queue'});
...
@@ -12,7 +12,7 @@ var qc = new QueueCaller({'url':amp,'name':'bs_jobs_queue'});
var
cmd
=
{
var
cmd
=
{
'object_type'
:
'job_execute'
,
'object_type'
:
'job_execute'
,
'source'
:
'http_listener'
,
'source'
:
'http_listener'
,
'jobId'
:
'job0
1
'
,
'jobId'
:
'job0
3
'
,
'option'
:
{},
'option'
:
{},
'input_data'
:
{
'input_data'
:
{
'type'
:
'bsdata'
,
'type'
:
'bsdata'
,
...
...
test/test_trigger_ctl.js
0 → 100644
View file @
1dd67702
var
ctx
=
require
(
'../context'
);
var
amqp_cfg
=
ctx
.
config
.
amqp
;
var
amp
=
'amqp://lab1.igridproject.info'
;
var
EvenPub
=
ctx
.
getLib
(
'lib/amqp/event-pub'
);
var
evp
=
new
EvenPub
({
'url'
:
amp
,
'name'
:
'bs_trigger_cmd'
});
var
topic
=
'ctl.trigger.all.reload'
;
var
msg
=
{
'trigger_type'
:
'all'
,
'cmd'
:
'reload'
,
'prm'
:
{}
}
evp
.
send
(
topic
,
msg
);
setTimeout
(
function
(){
evp
.
close
();
},
500
);
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment