Skip to content
GitLab
Menu
Projects
Groups
Snippets
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
asperathos-custom
stream-kubejobs
Commits
04a4365e
Commit
04a4365e
authored
Aug 19, 2020
by
Diego Gama
💻
Browse files
Changes the plugin to run as normal so this is the default option when installing
parent
5d4d4c6a
Changes
1
Hide whitespace changes
Inline
Side-by-side
stream-kubejobs/__init__.py
View file @
04a4365e
...
...
@@ -32,7 +32,6 @@ import kubernetes
LOG_FILE
=
"progress.log"
LOG_NAME
=
"kubejobs-progress"
THRESHOLD
=
1
class
KubeJobStreamProgress
(
Plugin
):
...
...
@@ -94,6 +93,7 @@ class KubeJobStreamProgress(Plugin):
raise
ex
.
BadRequestException
(
"Unknown datasource type...!"
)
def
get_error
(
self
,
inserted_items
,
completed_items
,
main_q_size
,
replicas
):
input_flux
=
(
inserted_items
-
self
.
last_inserted_items
)
/
self
.
collect_period
real_output_flux
=
(
completed_items
-
self
.
last_completed_items
)
/
self
.
collect_period
expected_output_flux
=
replicas
/
self
.
expected_time
...
...
@@ -101,16 +101,8 @@ class KubeJobStreamProgress(Plugin):
error0
=
real_output_flux
-
input_flux
error1
=
expected_output_flux
-
input_flux
# if expected_output_flux > input_flux:
# current_error = error1
# else:
# current_error = error0
if
abs
(
expected_output_flux
-
real_output_flux
)
>=
THRESHOLD
:
current_error
=
error1
else
:
current_error
=
error0
current_error
=
error1
if
self
.
error_option
else
error0
corrector_term_check
=
True
if
main_q_size
>
input_flux
*
5
else
False
#This term exist to make the queue decrease
error
=
current_error
...
...
@@ -156,14 +148,6 @@ class KubeJobStreamProgress(Plugin):
}
return
expected_output_manifest
def
get_current_output_manifest
(
self
,
output_flux
,
timestamp
):
current_output_manifest
=
{
'name'
:
'current_output_flux'
,
'value'
:
output_flux
,
'timestamp'
:
timestamp
,
'dimensions'
:
self
.
dimensions
}
return
current_output_manifest
def
get_input_manifest
(
self
,
input_flux
,
timestamp
):
input_manifest
=
{
'name'
:
'input_flux'
,
'value'
:
input_flux
,
...
...
@@ -198,7 +182,7 @@ class KubeJobStreamProgress(Plugin):
def
get_detailed_report
(
self
):
if
not
self
.
report_flag
:
return
self
.
datasource
.
get_
hybrid_
stream_measurements
()
return
self
.
datasource
.
get_stream_measurements
()
return
{
'message'
:
'Job is still running...'
}
def
_publish_measurement
(
self
):
...
...
@@ -227,6 +211,7 @@ class KubeJobStreamProgress(Plugin):
items_completed
=
float
(
items_completed
)
if
items_completed
!=
None
else
0
lease_expired
=
int
(
lease_expired
)
if
lease_expired
!=
None
else
0
replicas
=
self
.
_get_num_replicas
()
or
self
.
last_replicas
self
.
last_replicas
=
replicas
...
...
@@ -241,20 +226,7 @@ class KubeJobStreamProgress(Plugin):
timestamp
=
self
.
last_timestamp
expected_output_flux
=
replicas
/
self
.
expected_time
current_output_flux
=
None
# if expected_output_flux > input_flux:
# current_output_flux = expected_output_flux
# else:
# current_output_flux = real_output_flux
if
abs
(
expected_output_flux
-
real_output_flux
)
>=
THRESHOLD
:
current_output_flux
=
expected_output_flux
else
:
current_output_flux
=
real_output_flux
error_option
=
"Hybrid"
error_option
=
"Expected output"
if
self
.
error_option
else
"Real output"
corrector_term
=
"Yes"
if
self
.
corrector_term
else
"No"
self
.
last_timestamp
=
timestamp
self
.
LOG
.
log
(
"========================"
)
...
...
@@ -264,13 +236,12 @@ class KubeJobStreamProgress(Plugin):
self
.
LOG
.
log
(
"Replicas: %s"
%
replicas
)
self
.
LOG
.
log
(
"Inserted items : %i"
%
inserted_items
)
self
.
LOG
.
log
(
"Queue size: %s"
%
main_q_size
)
self
.
LOG
.
log
(
"Pods Processin
g
: %s"
%
num_processing_jobs
)
self
.
LOG
.
log
(
"Items
c
ompleted: %i"
%
items_completed
)
self
.
LOG
.
log
(
"Pods Processin
s
: %s"
%
num_processing_jobs
)
self
.
LOG
.
log
(
"Items
C
ompleted: %i"
%
items_completed
)
self
.
LOG
.
log
(
"Lease expired: %i"
%
lease_expired
)
self
.
LOG
.
log
(
"Input
f
lux : %s"
%
input_flux
)
self
.
LOG
.
log
(
"Input
F
lux : %s"
%
input_flux
)
self
.
LOG
.
log
(
"Real output Flux : %s"
%
real_output_flux
)
self
.
LOG
.
log
(
"Expected output flux : %s"
%
expected_output_flux
)
self
.
LOG
.
log
(
"Current output flux: %s"
%
current_output_flux
)
self
.
LOG
.
log
(
"Expected output Flux : %s"
%
expected_output_flux
)
self
.
LOG
.
log
(
"Collect period : %s"
%
str
(
self
.
collect_period
))
self
.
LOG
.
log
(
"========================"
)
...
...
@@ -291,9 +262,6 @@ class KubeJobStreamProgress(Plugin):
real_output_flux
=
\
self
.
get_real_output_manifest
(
real_output_flux
,
timestamp
)
current_output_flux
=
\
self
.
get_current_output_manifest
(
current_output_flux
,
timestamp
)
time_progress_error
=
\
self
.
get_expected_time_manifest
(
self
.
expected_time
,
...
...
@@ -312,7 +280,6 @@ class KubeJobStreamProgress(Plugin):
input_flux
,
real_output_flux
,
expected_output_flux
,
current_output_flux
,
queue
,
lease_expired_count
)
self
.
report_job
(
timestamp
)
...
...
@@ -326,7 +293,6 @@ class KubeJobStreamProgress(Plugin):
input_flux
,
real_output_flux
,
expected_output_flux
,
current_output_flux
,
queue
,
lease_expired_count
):
...
...
@@ -336,7 +302,6 @@ class KubeJobStreamProgress(Plugin):
self
.
datasource
.
send_metrics
([
input_flux
])
self
.
datasource
.
send_metrics
([
real_output_flux
])
self
.
datasource
.
send_metrics
([
expected_output_flux
])
self
.
datasource
.
send_metrics
([
current_output_flux
])
self
.
datasource
.
send_metrics
([
queue
])
self
.
datasource
.
send_metrics
([
lease_expired_count
])
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment