Commit ce6ca7e4 authored by Ignacio's avatar Ignacio
Browse files

making stuff atomically

parent a171c9e1
......@@ -174,11 +174,12 @@ class KubeJobStreamProgress(Plugin):
return self.datasource.get_stream_measurements()
return {'message': 'Job is still running...'}
def _publish_measurement(self, items_completed):
def _publish_measurement(self):
if self.report_flag:
try:
# doing this here because i need to check if
# there is a completed job so the rest don't crash
timestamp = time.time() * 1000
job_result_key = "job:results"
last_item_key = self.rds.lrange(job_result_key, -1, -1)
......@@ -190,11 +191,16 @@ class KubeJobStreamProgress(Plugin):
self.report_job(timestamp)
return
main_q_size = self.rds.llen('job')
pipe = self.rds.pipeline()
pipe.llen('job')
pipe.llen('job:processing')
pipe.llen('job:results')
main_q_size, num_processing_jobs,items_completed = pipe.execute()
replicas = self._get_num_replicas() or self.last_replicas
self.last_replicas = replicas
num_processing_jobs = self.rds.llen('job:processing')
num_processing_jobs = min(num_processing_jobs, replicas)
tracked_items = num_processing_jobs + items_completed
tracked_items += main_q_size
input_flux = (tracked_items - self.last_tracked_items)/self.collect_period
......@@ -311,8 +317,7 @@ class KubeJobStreamProgress(Plugin):
def monitoring_application(self):
try:
items_completed = self.rds.llen('job:results')
self._publish_measurement(items_completed=items_completed)
self._publish_measurement()
return items_completed
except Exception as ex:
self.LOG.log(ex.message)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment