Commit 39a52b52 authored by Ignacio's avatar Ignacio
Browse files

show

parent a7513784
......@@ -58,7 +58,7 @@ class KubeJobStreamProgress(Plugin):
self.last_error = 0.0
self.last_timestamp = time.time() * 1000
self.last_completed_items = 0
self.last_tracked_items = 0
self. last_inserted_items = 0
self.max_rep = int(info_plugin['max_replicas'])
self.min_rep = int(info_plugin['min_replicas'])
kubernetes.config.load_kube_config(api.k8s_manifest)
......@@ -92,13 +92,13 @@ class KubeJobStreamProgress(Plugin):
else:
raise ex.BadRequestException("Unknown datasource type...!")
def get_error(self, tracked_items, completed_items, main_q_size, replicas):
def get_error(self, inserted_items, completed_items, main_q_size, replicas):
input_flux = (tracked_items - self.last_tracked_items)/self.collect_period
input_flux = (inserted_items - self.last_inserted_items)/self.collect_period
real_output_flux = (completed_items - self.last_completed_items)/self.collect_period
expected_output_flux = replicas/self.expected_time
error0 =real_output_flux - input_flux
error0 = real_output_flux - input_flux
error1 = expected_output_flux - input_flux
corrector_term0 = (0.5 if main_q_size/self.collect_period > input_flux*2 else 0) #This term exist to make the queue decrease
......@@ -107,7 +107,7 @@ class KubeJobStreamProgress(Plugin):
current_error = error1 if self.error_option else error0
error = current_error - corrector_term
self.last_tracked_items = tracked_items
self.last_inserted_items = inserted_items
self.last_completed_items = completed_items
self.last_error = error
return error
......@@ -181,48 +181,41 @@ class KubeJobStreamProgress(Plugin):
# there is a completed job so the rest don't crash
timestamp = time.time() * 1000
job_result_key = "job:results"
last_item_key = self.rds.scard(job_result_key)
if (not last_item_key):
self.report_job(timestamp)
return
pipe = self.rds.pipeline()
pipe.llen('job')
pipe.llen('job:processing')
pipe.scard('job:results')
main_q_size, num_processing_jobs,items_completed = pipe.execute()
pipe.llen('job:inserted_count')
pipe.llen('job:completed_count')
inserted_items, items_completed = pipe.execute()
replicas = self._get_num_replicas() or self.last_replicas
self.last_replicas = replicas
tracked_items = num_processing_jobs + items_completed
tracked_items += main_q_size
input_flux = (tracked_items - self.last_tracked_items)/self.collect_period
input_flux = (inserted_items - self.last_inserted_items)/self.collect_period
real_output_flux = (items_completed - self.last_completed_items)/self.collect_period
expected_output_flux = replicas/self.expected_time
error = self.get_error(tracked_items, items_completed, main_q_size, replicas) or self.last_error
error = self.get_error(inserted_items, items_completed, main_q_size, replicas) or self.last_error
if (main_q_size == 0 and num_processing_jobs == 0 and
replicas == self.min_rep):
timestamp = self.last_timestamp
error_option = "Expected output" if self.error_option else "Real output"
corrector_term = "No" if self.corrector_term else "Yes"
self.last_timestamp = timestamp
self.LOG.log("========================")
self.LOG.log("Error option: %i" % self.error_option)
self.LOG.log("Correction term: %i" % self.corrector_term)
self.LOG.log("Error option: %i" % error_option)
self.LOG.log("Correction term: %i" % corrector_term)
self.LOG.log("Items Completed: %i" % items_completed)
self.LOG.log("Error: %s" % error)
self.LOG.log("Replicas: %s" % replicas)
self.LOG.log("Queue size: %s" % main_q_size)
self.LOG.log("Pods Processins: %s" % num_processing_jobs)
self.LOG.log("Tracked items : %s" % tracked_items)
self.LOG.log("Inserted items : %s" % inserted_items)
self.LOG.log("Input Flux : %s" % input_flux)
self.LOG.log("Real output Flux : %s" % real_output_flux)
self.LOG.log("Expected output Flux : %s" % expected_output_flux)
self.LOG.log("Collect period ="+str(self.collect_period))
self.LOG.log("Collect period : %s" % str(self.collect_period))
self.LOG.log("========================")
application_progress_error = \
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment