Commit 57c1289d authored by liliars's avatar liliars
Browse files

adding plugin code and readme file

parent dfcfd813
# adaptive-kubejobs
# Adaptive Kubejobs
Controller plugin based on cluster utilization.
\ No newline at end of file
Adaptive Kubejobs is a plugin for [Asperathos]( to control cluster scaling based on its current resource utilisation.
## How does it work?
When a job is running, the controller will observe the total cluster utilisation and, if the current utilisation has surpassed the maximum defined by the user, it will decrease job replicas until the utilisation returns to an acceptable value. While the cluster utilisation is lower than the maximum, the controller will use the schedule strategy defined by the job submission.
Also, if a new job is submitted with a different "max_ram" or "max_cpu", the controller will use the minimum between the new value and the one being considered at the time. This way, the thresholds defined for both jobs end up being respected at some level.
## Installing Adaptive Kubejobs
Send a ```POST``` request to `<your_asperathos_url:port>/plugins` with the following JSON as body:
"plugin_source": "",
"install_source": "git",
"plugin_module": "adaptive_kubejobs",
"component": "controller",
"plugin_name": "adaptive_kubejobs"
You can also create a json file named `adaptive-kubejobs.json`, for instance, and use the `curl` command as follows:
$ curl -H "Content-Type: application/json" --data @adaptive-kubejobs.json <your_asperathos_url:port>/plugins
## Submitting a job with Adaptive Kubejobs
When submitting a new job, set the `control_plugin` parameter in your json submission file to `"adaptive_kubejobs"`. Besides that, you should configure the maximum resource utilization you want your cluster to reach when executing your job. In order to do that, add the `max_cpu` and `max_ram` parameters inside the `control_parameters` block.
Your json submission file should have these entries like the example bellow:
{ "control_plugin": "adaptive_kubejobs",
"control_parameters": {
"actuator": "k8s_replicas",
"check_interval": 10,
"trigger_down": 1,
"trigger_up": 1,
"min_rep": 1,
"max_rep": 10,
"actuation_size": 1,
"metric_source": "redis",
"max_ram": 0.7,
"max_cpu": 0.5
\ No newline at end of file
import threading
import time
from sqlitedict import SqliteDict
from kubernetes import config, client
from kubernetes.client import CustomObjectsApi
from controller.plugins.controller.base import Controller
from controller.utils.logger import Log
from controller.service.api import k8s_manifest
from controller.plugins.controller.kubejobs.alarm import KubeJobs
from controller.utils.logger import ScalingLog
API_LOG = Log("APIv10", "APIv10.log")
class AdaptiveKubejobs():
def __init__(self, application_id, parameters):
except Exception:
raise Exception("Couldn't load kube config")
self.db = SqliteDict('./adaptive_kj.sqlite', autocommit=True)
self.logger = ScalingLog(
"diff.controller.log", "controller.log", application_id)
self.application_id = application_id
parameters.update({"app_id": application_id})
# read scaling parameters
self.check_interval = \
# We use a lock here to prevent race conditions when stopping the
# controller
self.running = True
self.running_lock = threading.RLock()
# The alarm here is responsible for deciding whether to scale up or
# down, or even do nothing
self.alarm = KubeJobs(parameters)
self.k8s_api = client.CoreV1Api()
self.custom_objects_api = CustomObjectsApi()
self.min_replicas = parameters.get('control_parameters').get('min_rep')
def _set_max_parameters(self, parameters):
current_ram = self.db.get('max_ram')
current_cpu = self.db.get('max_cpu')
requested_ram = parameters.get('control_parameters').get('max_ram')
requested_cpu = parameters.get('control_parameters').get('max_cpu')
if current_ram is None or current_ram > requested_ram:
self.db['max_ram'] = requested_ram
self.max_ram = requested_ram
self.max_ram = current_ram
if current_cpu is None or current_cpu > requested_cpu:
self.db['max_cpu'] = requested_cpu
self.max_cpu = requested_cpu
self.max_cpu = current_cpu
def start_application_scaling(self):
run = True
self.logger.log("Start to control resources")
while run:
last_replicas = self.alarm.actuator.get_number_of_replicas()
self.logger.log("Monitoring application: %s" %
current_cpu, current_ram = self.get_cluster_resources()
if current_cpu > self.max_cpu or current_ram > self.max_ram:
self.logger.log('Utilisation surpassed threshold, scaling down')
next_replicas = max(self.min_replicas, last_replicas-1)
self.logger.log("Scaling from %d to %d" %
# Wait some time
'control_parameters': {
'max_ram': self.max_ram,
'max_cpu': self.max_cpu
with self.running_lock:
run = self.running
def stop_application_scaling(self):
with self.running_lock:
self.running = False
def status(self):
return self.alarm.status()
def get_cluster_resources(self):
cluster_ram_total = 0
cluster_cpu_total = 0
cluster_ram_utilisation = 0
cluster_cpu_utilisation = 0
raw_cluster_utilisation = self.custom_objects_api\
'v1beta1', 'nodes')['items']
for n in self.k8s_api.list_node().items:
can_receive_pods = True
if n.spec.taints is not None:
can_receive_pods = [x for x in n.spec.taints \
if x.effect == 'NoSchedule']
can_receive_pods = len(can_receive_pods) == 0
if can_receive_pods:
cluster_cpu_total += int(n.status.capacity['cpu'])
cluster_ram_total += int(n.status.capacity['memory'][:-2])
node_name =
node_utilisation = [node for node in raw_cluster_utilisation\
if node.get('metadata').get('name') == node_name][0]
node_utilisation = node_utilisation.get('usage')
cluster_ram_utilisation += float(node_utilisation.get('memory')[:-2])
cluster_cpu_utilisation += float(node_utilisation.get('cpu')[:-1]) / 1000000000
cluster_cpu_percentage = cluster_cpu_utilisation / cluster_cpu_total
cluster_ram_percentage = cluster_ram_utilisation / cluster_ram_total
return cluster_cpu_percentage, cluster_ram_percentage
PLUGIN = AdaptiveKubejobs
from setuptools import setup, find_packages
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment