Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Sign in
Toggle navigation
N
node-bigstream
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
3
Merge Requests
3
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
bs
node-bigstream
Commits
212f3e42
Commit
212f3e42
authored
Mar 22, 2017
by
project
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
--no commit message
--no commit message
parent
0a5f9e97
Show whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
131 additions
and
20 deletions
+131
-20
jobtask.js
jobexecutor/lib/jobtask.js
+50
-11
test.js
test/test.js
+81
-9
No files found.
jobexecutor/lib/jobtask.js
View file @
212f3e42
...
@@ -7,6 +7,7 @@ var ctx = require('../../context');
...
@@ -7,6 +7,7 @@ var ctx = require('../../context');
var
memstore
=
ctx
.
getLib
(
'jobexecutor/lib/memstore'
);
var
memstore
=
ctx
.
getLib
(
'jobexecutor/lib/memstore'
);
var
bsdata
=
ctx
.
getLib
(
'lib/model/bsdata'
);
var
bsdata
=
ctx
.
getLib
(
'lib/model/bsdata'
);
module
.
exports
=
JobTask
;
function
JobTask
(
prm
)
function
JobTask
(
prm
)
{
{
EventEmitter
.
call
(
this
);
EventEmitter
.
call
(
this
);
...
@@ -32,7 +33,7 @@ JobTask.prototype.run = function ()
...
@@ -32,7 +33,7 @@ JobTask.prototype.run = function ()
var
ctx_transaction
=
{
var
ctx_transaction
=
{
"id"
:
tran
I
d
"id"
:
tran
saction_i
d
}
}
var
jobMem
=
new
memstore
({
'job_id'
:
job_id
,
'cat'
:
'global'
,
'mem'
:
self
.
mem
})
var
jobMem
=
new
memstore
({
'job_id'
:
job_id
,
'cat'
:
'global'
,
'mem'
:
self
.
mem
})
...
@@ -46,18 +47,58 @@ JobTask.prototype.run = function ()
...
@@ -46,18 +47,58 @@ JobTask.prototype.run = function ()
"input_data"
:
input_data
,
"input_data"
:
input_data
,
"job"
:
ctx_job
"job"
:
ctx_job
}
}
var
task_di
=
function
(
callback
)
{
perform_di
({
'context'
:
context
,
'handle'
:
self
}
,
function
(
err
,
resp
){
if
(
resp
.
status
==
'success'
){
callback
(
null
,
resp
);
}
else
{
callback
(
resp
);
}
});
}
var
task_dt
=
function
(
request
,
callback
)
{
var
dt_request
=
{
'input_type'
:
request
.
type
,
'data'
:
request
.
data
}
perform_dt
({
'context'
:
context
,
'request'
:
dt_request
,
'handle'
:
self
},
function
(
err
,
dt_resp
){
if
(
dt_resp
.
status
==
'success'
){
callback
(
null
,
dt_resp
);
}
else
{
callback
(
dt_resp
);
}
});
}
var
task_do
=
function
(
request
,
callback
)
{
var
do_request
=
{
'input_type'
:
request
.
type
,
'data'
:
request
.
data
}
perform_do
({
'context'
:
context
,
'request'
:
do_request
,
'handle'
:
self
},
function
(
err
,
do_resp
){
if
(
do_resp
.
status
==
'success'
){
callback
(
null
,
do_resp
);
}
else
{
callback
(
do_resp
);
}
});
}
async
.
waterfall
([
task_di
,
task_dt
,
task_do
],
function
(
err
,
resp
)
{
if
(
!
err
){
console
.
log
(
'***** JOB SUCCESSFULLY DONE *****'
);
}
else
{
console
.
log
(
'***** JOB UNSUCCESSFULLY DONE *****'
);
}
});
}
}
function
perform_di
(
prm
,
cb
)
function
perform_di
(
prm
,
cb
)
{
{
var
di_context
=
prm
.
context
;
var
di_context
=
prm
.
context
;
var
job
I
d
=
di_context
.
jobconfig
.
job_id
;
var
job
_i
d
=
di_context
.
jobconfig
.
job_id
;
var
di_cfg
=
di_context
.
jobconfig
.
data_in
;
var
di_cfg
=
di_context
.
jobconfig
.
data_in
;
var
DITask
=
getPlugins
(
'di'
,
di_cfg
.
type
);
var
DITask
=
getPlugins
(
'di'
,
di_cfg
.
type
);
var
mempref
=
"ms."
+
jobId
+
'.di'
;
var
diMem
=
new
memstore
({
'job_id'
:
job_id
,
'cat'
:
'di'
,
'mem'
:
prm
.
handle
.
mem
})
var
diMem
=
new
memstore
(
mempref
,
storage
);
di_context
.
task
=
{
di_context
.
task
=
{
"memstore"
:
diMem
"memstore"
:
diMem
}
}
...
@@ -73,12 +114,11 @@ function perform_dt(prm,cb)
...
@@ -73,12 +114,11 @@ function perform_dt(prm,cb)
{
{
var
dt_context
=
prm
.
context
var
dt_context
=
prm
.
context
var
job
I
d
=
dt_context
.
jobconfig
.
job_id
;
var
job
_i
d
=
dt_context
.
jobconfig
.
job_id
;
var
dt_cfg
=
dt_context
.
jobconfig
.
data_transform
;
var
dt_cfg
=
dt_context
.
jobconfig
.
data_transform
;
var
DITask
=
getPlugins
(
'dt'
,
dt_cfg
.
type
);
var
DITask
=
getPlugins
(
'dt'
,
dt_cfg
.
type
);
var
mempref
=
"ms."
+
jobId
+
'.dt'
;
var
dtMem
=
new
memstore
({
'job_id'
:
job_id
,
'cat'
:
'dt'
,
'mem'
:
prm
.
handle
.
mem
})
var
dtMem
=
new
memstore
(
mempref
,
storage
);
dt_context
.
task
=
{
dt_context
.
task
=
{
"memstore"
:
dtMem
"memstore"
:
dtMem
}
}
...
@@ -95,12 +135,11 @@ function perform_do(prm,cb)
...
@@ -95,12 +135,11 @@ function perform_do(prm,cb)
{
{
var
do_context
=
prm
.
context
var
do_context
=
prm
.
context
var
job
I
d
=
do_context
.
jobconfig
.
job_id
;
var
job
_i
d
=
do_context
.
jobconfig
.
job_id
;
var
do_cfg
=
do_context
.
jobconfig
.
data_out
;
var
do_cfg
=
do_context
.
jobconfig
.
data_out
;
var
DOTask
=
getPlugins
(
'do'
,
do_cfg
.
type
);
var
DOTask
=
getPlugins
(
'do'
,
do_cfg
.
type
);
var
mempref
=
"ms."
+
jobId
+
'.do'
;
var
doMem
=
new
memstore
({
'job_id'
:
job_id
,
'cat'
:
'do'
,
'mem'
:
prm
.
handle
.
mem
})
var
doMem
=
new
memstore
(
mempref
,
storage
);
do_context
.
task
=
{
do_context
.
task
=
{
"memstore"
:
doMem
"memstore"
:
doMem
}
}
...
@@ -114,7 +153,7 @@ function perform_do(prm,cb)
...
@@ -114,7 +153,7 @@ function perform_do(prm,cb)
function
getPlugins
(
type
,
name
)
function
getPlugins
(
type
,
name
)
{
{
var
path
=
'../plugins/'
+
type
+
'/'
+
type
+
'-'
+
name
;
var
path
=
'../
../
plugins/'
+
type
+
'/'
+
type
+
'-'
+
name
;
return
require
(
path
);
return
require
(
path
);
}
}
...
...
test/test.js
View file @
212f3e42
...
@@ -110,13 +110,85 @@ const crypto = require("crypto");
...
@@ -110,13 +110,85 @@ const crypto = require("crypto");
// console.log(msg);
// console.log(msg);
// });
// });
var
memstore
=
ctx
.
getLib
(
'jobexecutor/lib/memstore'
);
// var memstore = ctx.getLib('jobexecutor/lib/memstore');
//
var
ms
=
new
memstore
({
'job_id'
:
'job01'
,
'cat'
:
'global'
,
'conn'
:
'redis://:@bigmaster.igridproject.info:6379/1'
})
// var ms = new memstore({'job_id':'job01','cat':'global','conn':'redis://:@bigmaster.igridproject.info:6379/1'})
//
var
txt
=
"kamron
\n
aroonrua"
// var txt = "kamron\naroonrua"
//
// ms.setItem('test1',{'t':txt});
// ms.getItem('test1',function(err,val){
// console.log(val.t);
// });
ms
.
setItem
(
'test1'
,{
't'
:
txt
});
var
redis
=
require
(
'redis'
);
ms
.
getItem
(
'test1'
,
function
(
err
,
val
){
var
handle
=
{
'mem'
:
redis
.
createClient
(
'redis://:@bigmaster.igridproject.info:6379/1'
)}
console
.
log
(
val
.
t
);
var
input_data
=
{};
});
var
job_config
=
{
"job_id"
:
"example"
,
"active"
:
true
,
"trigger"
:
{
"type"
:
"cron"
,
"cmd"
:
"29,59 * * * * *"
},
"data_in"
:
{
"type"
:
"example"
},
"data_transform"
:
{
"type"
:
"noop"
},
"data_out"
:
{
"type"
:
"console"
}
}
var
ag
=
{
"job_id"
:
"002"
,
"active"
:
true
,
"trigger"
:
{
"type"
:
"cron"
,
"cmd"
:
"15,45 * * * *"
},
"data_in"
:
{
"type"
:
"agritronics"
,
"profile"
:
{
"station_id"
:
"GISDA-02"
,
"latitude"
:
""
,
"longitude"
:
""
},
"param"
:
{
"url"
:
"http://agritronics.nstda.or.th/ws/get.php"
,
"appkey"
:
"0c5a295bd8c07a081f4f0061eee6665c38"
,
"station_id"
:
"GISTDA-02"
,
"data_types"
:
[
{
"type"
:
"1"
,
"node_id"
:
"4096"
},
{
"type"
:
"2"
,
"node_id"
:
"4096"
},
{
"type"
:
"4"
,
"node_id"
:
"4096"
},
{
"type"
:
"5"
,
"node_id"
:
"4096"
},
{
"type"
:
"6"
,
"node_id"
:
"4096"
},
{
"type"
:
"7"
,
"node_id"
:
"4096"
},
{
"type"
:
"8"
,
"node_id"
:
"4096"
},
{
"type"
:
"10"
,
"node_id"
:
"4096"
},
{
"type"
:
"2021"
,
"node_id"
:
"7328"
},
{
"type"
:
"2022"
,
"node_id"
:
"7328"
}
],
"init_observed_date"
:
"2017-03-22"
,
"init_observed_time"
:
"12:00:00"
}
},
"data_transform"
:
{
"type"
:
"agritronics"
},
"data_out"
:
{
"type"
:
"console"
}
}
var
JobTask
=
ctx
.
getLib
(
'jobexecutor/lib/jobtask'
);
var
job
=
new
JobTask
({
'handle'
:
handle
,
'job_config'
:
ag
,
'input_data'
:
input_data
});
job
.
run
();
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment