Commit d1f60932 authored by Ignacio's avatar Ignacio
Browse files

adding lease count

parent 84a3584f
......@@ -161,6 +161,14 @@ class KubeJobStreamProgress(Plugin):
}
return queue_manifest
def get_lease_expired_count_manifest(self, lease_expired_count , timestamp):
lease_expired_count_manifest = {'name': 'lease_expired_count',
'value': lease_expired_count,
'timestamp': timestamp,
'dimensions': self.dimensions
}
return lease_expired_count
def get_application_progress_error_manifest(self, error, timestamp):
application_progress_error = {'name': 'application_progress_error',
'value': error,
......@@ -187,10 +195,19 @@ class KubeJobStreamProgress(Plugin):
pipe.get('job:completed_count')
pipe.llen('job:processing')
pipe.llen('job')
inserted_items, items_completed, num_processing_jobs, main_q_size = pipe.execute()
pipe.get('lease_expired')
results = pipe.execute()
inserted_items = results[0]
items_completed = results[1]
num_processing_jobs = results[2]
main_q_size = results[3]
lease_expired = results[4]
inserted_items = float(inserted_items) if inserted_items != None else 0
items_completed = float(items_completed) if items_completed != None else 0
lease_expired = float(lease_expired) if lease_expired != None else 0
replicas = self._get_num_replicas() or self.last_replicas
self.last_replicas = replicas
......@@ -218,10 +235,12 @@ class KubeJobStreamProgress(Plugin):
self.LOG.log("Queue size: %s" % main_q_size)
self.LOG.log("Pods Processins: %s" % num_processing_jobs)
self.LOG.log("Items Completed: %i" % items_completed)
self.LOG.log("Lease expired: %i" % lease_expired)
self.LOG.log("Input Flux : %s" % input_flux)
self.LOG.log("Real output Flux : %s" % real_output_flux)
self.LOG.log("Expected output Flux : %s" % expected_output_flux)
self.LOG.log("Collect period : %s" % str(self.collect_period))
self.LOG.log("========================")
application_progress_error = \
......@@ -249,13 +268,17 @@ class KubeJobStreamProgress(Plugin):
queue = self.get_queue_size_manifest(main_q_size,timestamp)
lease_expired_count = \
self.get_lease_expired_count_manifest(lease_expired, timestamp)
self.publish_persistent_measurement(application_progress_error,
time_progress_error,
parallelism,
input_flux,
real_output_flux,
expected_output_flux,
queue)
queue,
lease_expired_count)
self.report_job(timestamp)
except Exception:
self.LOG.log(traceback.format_exc())
......@@ -267,7 +290,8 @@ class KubeJobStreamProgress(Plugin):
input_flux,
real_output_flux,
expected_output_flux,
queue):
queue,
lease_expired_count):
self.datasource.send_metrics([application_progress_error])
self.datasource.send_metrics([time_progress_error])
......@@ -276,6 +300,7 @@ class KubeJobStreamProgress(Plugin):
self.datasource.send_metrics([real_output_flux])
self.datasource.send_metrics([expected_output_flux])
self.datasource.send_metrics([queue])
self.datasource.send_metrics([lease_expired_count])
def report_job(self,timestamp):
if self.report_flag:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment