Commit 5019c9cb authored by Ignacio's avatar Ignacio
Browse files

making multiples errors available

parent d92fe175
......@@ -38,7 +38,8 @@ class KubeJobStreamProgress(Plugin):
def __init__(self, app_id, info_plugin,
retries=10, last_replicas=None):
collect_period = int(info_plugin.get('collect_period',2))
self.corrector_term = bool(info_plugin.get("corrector_term", True))
self.corrector_term = int(info_plugin.get("corrector_term", 0))
self.error_option = int(info_plugin.get("error_option", 0))
Plugin.__init__(self, app_id, info_plugin,
collect_period, retries=retries)
self.validate(info_plugin)
......@@ -92,13 +93,19 @@ class KubeJobStreamProgress(Plugin):
raise ex.BadRequestException("Unknown datasource type...!")
def get_error(self, tracked_items, completed_items, main_q_size):
items_input_flux = (tracked_items - self.last_tracked_items)/self.collect_period
items_output_flux = (completed_items - self.last_completed_items)/self.collect_period#replicas/self.expected_time
corrector_term = (0.5 if main_q_size/self.collect_period > items_input_flux*1.5 else 0) #This term exist to make the queue decraese
corrector_term = corrector_term if self.corrector_term else 0
input_flux = (tracked_items - self.last_tracked_items)/self.collect_period
real_output_flux = (completed_items - self.last_completed_items)/self.collect_period
expected_output_flux = replicas/self.expected_time
error = items_output_flux - items_input_flux - corrector_term
error0 =real_output_flux - input_flux
error1 = expected_output_flux - input_flux
corrector_term0 = (0.5 if main_q_size/self.collect_period > input_flux*2 else 0) #This term exist to make the queue decrease
corrector_term = 0 if self.corrector_term else corrector_term0
current_error = error1 if self.error_option else error0
error = items_output_flux - input_flux - corrector_term
self.last_tracked_items = tracked_items
self.last_completed_items = completed_items
......@@ -122,13 +129,29 @@ class KubeJobStreamProgress(Plugin):
return expected_time_manifest
def get_real_time_manifest(self, real_time, timestamp):
real_time_manifest = {'name': 'job_progress',
'value': real_time,
def get_real_output_manifest(self, output_flux, timestamp):
real_output_manifest = {'name': 'real_output_flux',
'value': output_flux,
'timestamp': timestamp,
'dimensions': self.dimensions
}
return real_output_manifest
def get_expected_output_manifest(self, output_flux, timestamp):
expected_output_manifest = {'name': 'expected_output_flux',
'value': output_flux,
'timestamp': timestamp,
'dimensions': self.dimensions
}
return expected_output_manifest
def get_input_manifest(self, input_flux , timestamp):
input_manifest = {'name': 'input_flux',
'value': input_flux,
'timestamp': timestamp,
'dimensions': self.dimensions
}
return real_time_manifest
return input_manifest
def get_application_progress_error_manifest(self, error, timestamp):
application_progress_error = {'name': 'application_progress_error',
......@@ -140,7 +163,7 @@ class KubeJobStreamProgress(Plugin):
def get_detailed_report(self):
if not self.report_flag:
return self.datasource.get_measurements()
return self.datasource.get_stream_measurements()
return {'message': 'Job is still running...'}
def _publish_measurement(self, items_completed):
......@@ -167,7 +190,8 @@ class KubeJobStreamProgress(Plugin):
tracked_items = num_processing_jobs + items_completed
tracked_items += main_q_size
input_flux = (tracked_items - self.last_tracked_items)/self.collect_period
output_flux = (items_completed - self.last_completed_items)/self.collect_period
real_output_flux = (items_completed - self.last_completed_items)/self.collect_period
expected_output_flux = replicas/self.expected_time
error = self.get_error(tracked_items, items_completed, main_q_size) or self.last_error
......@@ -178,14 +202,18 @@ class KubeJobStreamProgress(Plugin):
self.last_timestamp = timestamp
self.LOG.log("========================")
self.LOG.log("Error option: %i" % self.error_option)
self.LOG.log("Correction term: %i" % self.corrector_term)
self.LOG.log("Items Completed: %i" % items_completed)
self.LOG.log("Error: %s" % error)
self.LOG.log("Replicas: %s" % replicas)
self.LOG.log("Queue size: %s" % main_q_size)
self.LOG.log("Pods Processins: %s" % num_processing_jobs)
self.LOG.log("Tracked items : %s" % (tracked_items))
self.LOG.log("Input Flux : %s" % (input_flux))
self.LOG.log("Output Flux : %s" % (output_flux))
self.LOG.log("Tracked items : %s" % tracked_items)
self.LOG.log("Input Flux : %s" % input_flux)
self.LOG.log("Real output Flux : %s" % real_output_flux)
self.LOG.log("Expected output Flux : %s" % expected_output_flux)
self.LOG.log("Corrector term : %s" % bool(self.corrector_term))
self.LOG.log("========================")
application_progress_error = \
......@@ -195,18 +223,29 @@ class KubeJobStreamProgress(Plugin):
str(application_progress_error))
if self.enable_detailed_report:
job_progress_error = \
self.get_real_time_manifest(1,
input_flux = \
self.get_input_manifest(input_flux,
timestamp)
expected_output_flux = \
self.get_expected_output_manifest(expected_output_flux,
timestamp)
real_output_flux = \
self.get_real_output_manifest(real_output_flux,
timestamp)
time_progress_error = \
self.get_expected_time_manifest(self.expected_time,
timestamp)
parallelism = \
self.get_parallelism_manifest(replicas, timestamp)
self.publish_persistent_measurement(application_progress_error,
job_progress_error,
time_progress_error,
parallelism)
parallelism,
input_flux,
real_output_flux,
expected_output_flux)
self.report_job(timestamp)
except Exception:
self.LOG.log(traceback.format_exc())
......@@ -215,11 +254,18 @@ class KubeJobStreamProgress(Plugin):
def publish_persistent_measurement(self, application_progress_error,
job_progress_error,
time_progress_error,
parallelism):
parallelism,
input_flux,
real_output_flux,
expected_output_flux):
self.datasource.send_metrics([application_progress_error])
self.datasource.send_metrics([job_progress_error])
self.datasource.send_metrics([time_progress_error])
self.datasource.send_metrics([parallelism])
self.datasource.send_metrics([input_flux])
self.datasource.send_metrics([real_output_flux])
self.datasource.send_metrics([expected_output_flux])
def report_job(self,timestamp):
if self.report_flag:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment