Commit 9c7969f5 authored by Ignacio's avatar Ignacio
Browse files

updating to real ouput flux

parent 72db3c6c
......@@ -38,6 +38,7 @@ class KubeJobStreamProgress(Plugin):
def __init__(self, app_id, info_plugin,
retries=10, last_replicas=None):
collect_period = int(info_plugin.get('collect_period',2))
self.corrector_term = bool(info_plugin.get("corrector_term", True))
Plugin.__init__(self, app_id, info_plugin,
collect_period, retries=retries)
self.validate(info_plugin)
......@@ -57,9 +58,9 @@ class KubeJobStreamProgress(Plugin):
self.last_error = 0.0
self.last_timestamp = time.time() * 1000
self.last_completed_items = 0
self.last_tracked_items = 0
self.max_rep = int(info_plugin['max_replicas'])
self.min_rep = int(info_plugin['min_replicas'])
self.tracked_items = 0
kubernetes.config.load_kube_config(api.k8s_manifest)
self.b_v1 = kubernetes.client.BatchV1Api()
self.datasource = self.setup_datasource(info_plugin)
......@@ -91,38 +92,17 @@ class KubeJobStreamProgress(Plugin):
else:
raise ex.BadRequestException("Unknown datasource type...!")
def get_time_to_complete(self, num_replicas, queue_length, items_completed):
def get_error(self, tracked_items, completed_items, main_q_size):
items_input_flux = (tracked_items - self.last_tracked_items)/self.collect_period
items_output_flux = (completed_items - self.last_completed_items)/self.collect_period#replicas/self.expected_time
# processed = items_completed - self.last_completed_items
# time_to_complete = (float(processed)/num_replicas) * queue_length
try:
if (self.last_time_to_complete != -1):
time_to_complete = self.last_time_to_complete
processed = items_completed - self.last_completed_items
self.last_completed_items = items_completed
if (processed == 0):
processing_time = self.expected_time
else:
processing_time = processed/self.collect_period
time_to_complete = (queue_length /(processing_time))
corrector_term = (0.5 if main_q_size/self.collect_period > items_input_flux*1.5 else 0) #This term exist to make the queue decraese
corrector_term = corrector_term if self.corrector_term else 0
self.last_time_to_complete = time_to_complete
return time_to_complete
error = items_output_flux - items_input_flux - corrector_term
except Exception:
self.LOG.log(traceback.format_exc())
def get_error(self, tracked_items, n_processing_jobs,
replicas, main_q_size):
items_flux = (tracked_items - self.tracked_items)/self.collect_period
items_flux = items_flux
error = replicas/self.expected_time - items_flux - (0.5 if main_q_size/self.collect_period > (replicas)*2 else 0)
self.tracked_items = tracked_items
self.last_tracked_items = tracked_items
self.last_completed_items = completed_items
self.last_error = error
return error
......@@ -179,29 +159,16 @@ class KubeJobStreamProgress(Plugin):
if (not last_item_key):
self.report_job(timestamp)
return
last_item_key = str(last_item_key[0])[2:-1]
item_start_time_key = "job:" + last_item_key + ":start_time"
item_end_time_key = "job:" + last_item_key + ":end_time"
self.LOG.log(item_start_time_key)
self.LOG.log(item_end_time_key)
last_item_start = float(self.rds.get(item_start_time_key))
last_item_end = float(self.rds.get(item_end_time_key))
main_q_size = self.rds.llen('job')
replicas = self._get_num_replicas() or self.last_replicas
self.last_replicas = replicas
time_to_complete = self.get_time_to_complete(replicas, main_q_size, items_completed)
self.last_time_to_complete = time_to_complete
self.LOG.log(time_to_complete)
num_processing_jobs = self.rds.llen('job:processing')
num_processing_jobs = min(num_processing_jobs, replicas)
tracked_items = num_processing_jobs + items_completed
tracked_items += main_q_size
flux = (tracked_items - self.tracked_items)/self.collect_period
error = self.get_error(tracked_items, num_processing_jobs,
replicas, main_q_size) or self.last_error
error = self.get_error(tracked_items, items_completed, main_q_size) or self.last_error
if (main_q_size == 0 and num_processing_jobs == 0 and
replicas == self.min_rep):
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment