deploy cluster service to kubernetes via the API server
import base64
from datetime import datetime, timedelta
import getpass
import logging
import os
import random
import re
import socket
import string
import subprocess as sp
import sys
import urllib3

from pkg_resources import resource_filename, Requirement
from netaddr import valid_ipv4

from kubernetes import client as k8sclient
from import stream
from kubernetes.client import api_client
from kubernetes.client.configuration import Configuration
from import ApiException
from kubernetes.config import kube_config
from kubernetes.utils import create_from_yaml

import yaml

from koris.ssl import read_cert
from koris.ssl import discovery_hash as ssl_discovery_hash
from koris.util.util import retry
from koris.util.logger import Logger
from koris import MASTER_LISTENER_NAME

if getattr(sys, 'frozen', False):
    MANIFESTSPATH = os.path.join(
        sys._MEIPASS,  # pylint: disable=no-member, protected-access
    MANIFESTSPATH = resource_filename(Requirement.parse("koris"),

LOGGER = Logger(__name__)

                "--key /etc/kubernetes/pki/etcd/server.key "
                "--cacert /etc/kubernetes/pki/etcd/ca.crt "
                "--cert /etc/kubernetes/pki/etcd/server.crt "
                "{} --endpoints=https://{}:2379 -w json")

def _get_node_addr(addresses, addr_type):
    Parse the address of the node

        addresses (object) - instance of addresses returned from k8s API
        addr_type (str) - the address type
    return [i.address for i in addresses if i.type == addr_type][0]

def rand_string(num):
    """ generate a random string of len num """
    return ''.join([
        random.choice(string.ascii_letters.lower() + string.digits)
        for n in range(num)])
def get_token_description():
    """create a description for the token"""
    description = "Bootstrap token generated by 'koris add' from {} on {}"
    return description.format('%s@%s' % (getpass.getuser(),
                                         socket.gethostname()),
def parse_etcd_response(resp):
    """Takes a response from etcdctl and parses it for its member info.

    The response is to be expected in JSON format as obtained by
    ``etcdctl member list -w json``. Right now, the IDs in the JSON response
    are in uint64 format and will be transformed into hex with this function.

    Args:
        resp (str): A JSON response from etcdctl.

    Returns:
        A dict containing member information.

    Raises:
        ValueError if state could not be extracted.
    """
    if not resp or resp is None:
        raise ValueError("etcdtl response is empty")

    if not"master-\\d+", resp):
        LOGGER.debug(resp)
        raise ValueError("can't find 'master' in etcdtl response")

    # Reconstructing the response so we get a dict where the key is the
    # member name and and value is a dict with the other info.
    out = {}
    resp_yaml = yaml.load(resp)
    for mem in resp_yaml['members']:
        if 'name' in mem:
            out[mem['name']] = {k: v for k, v in mem.items() if k != "name"}
            # ID is uint64, but we need it in hex
            out[mem['name']]['ID'] = hex(out[mem['name']]['ID'])[2:]
    return out
class K8SConfigurator:  # pylint: disable=no-member
    """apply plugins and post install setup"""
def apply_plugins(self, plugins):
        """apply all plugins in the list"""
def get_bootstrap_token(self):
        """Generate a Bootstrap token

        Returns:
            A string of the form ``<token id>.<token secret>``.
        """
        tid = rand_string(6)
        token_secret = rand_string(16)
        data = {'description': get_token_description(),
                'token-id': tid,
                'token-secret': token_secret,
                'expiration': datetime.strftime(
           + timedelta(hours=2),
                    "%Y-%m-%dT%H:%M:%SZ"),
                'usage-bootstrap-authentication': 'true',
                'usage-bootstrap-signing': 'true',
                'auth-extra-groups': 'system:bootstrappers:kubeadm:default-node-token',
                }
        for k, val in data.items():
            data[k] = base64.b64encode(val.encode()).decode()
        sec = k8sclient.V1Secret(data=data)
        sec.metadata = k8sclient.V1ObjectMeta(
            **{'name': 'bootstrap-token-%s' % tid, 'namespace': 'kube-system'})
        sec.type = ''
        self.api.create_namespaced_secret(namespace="kube-system", body=sec)
        return ".".join((tid, token_secret))
@property def host(self): """Retrieve the host or loadbalancer info""" return @property def ca_info(self): """Return a dict with the read ca and the discovery hash""" return {"ca_cert": self.ca_cert, "discovery_hash": self.discovery_hash} @property def ca_cert(self): """Returns the API servers CA. Returns: The CA encoded as base64. """ return read_cert(self.api.api_client.configuration.ssl_ca_cert) @property def discovery_hash(self): """Calculate and return a discovery_hash. Based on the cluster CA. Returns: A discovery hash encoded in Hex. """ return ssl_discovery_hash(self.ca_cert) @property def is_ready(self): """Check if the API server is already available. Returns: True if it's reachable. """ logging.getLogger("urllib3").setLevel(logging.ERROR) try: k8sclient.apis.core_api.CoreApi().get_api_versions() logging.getLogger("urllib3").setLevel(logging.WARNING) return True except urllib3.exceptions.MaxRetryError: logging.getLogger("urllib3").setLevel(logging.WARNING) return False
def get_random_master(self):
        """Returns a name and IP of a random master server in the cluster.

        Returns:
            Tuple of name and IP of a master.
        """
        nodes = self.api.list_node(pretty=True)
        nodes = [node for node in nodes.items
                 if '' in node.metadata.labels]
        addresses = nodes[0].status.addresses
        # master_ip and master_name are the hostname and IP of an existing
        # master, where an etcd instance is already running.
        master_ip = _get_node_addr(addresses, "InternalIP")
        master_name = _get_node_addr(addresses, "Hostname")
        return master_name, master_ip
@retry(ValueError)
    def etcd_cluster_status(self):
        """Checks the current etcd cluster state.

        This function calls etcdctl inside a pod in order to obtain the
        current state of the etcd cluster before a new member can be added to
        it. Right now, etcdctl offers no convenient way to format the output
        so the URLs from the masters can be extracted, which is why jq is used
        here.

        Args:
            podname (str): The name of the pod where the etcdctl command
                should be sent from. Needs to be inside the kube-system
                namespace.
            master_ip (str)

        Returns:
            The status of the etcd as a string
            (e.g.master-1=,master-2=
        """
        name, master_ip = self.get_random_master()
        exec_command = ['/bin/sh', '-c', ETCDCTL_BASE.format(
            "member list", master_ip)]
        response = stream(self.api.connect_get_namespaced_pod_exec,
                          "etcd-%s" % name,
                          'kube-system',
                          command=exec_command,
                          stderr=True, stdin=False,
                          stdout=True, tty=False)
        if not response or not"master-\\d+", response):
            raise ValueError("Could not extract current etcd cluster state!")
        # respone should be something like
        # {'members': [{'ID': 9007573287841766007, 'name': 'master-7-am',
        #               'peerURLs': [''],
        #               'clientURLs': ['']}]}
        response = yaml.load(response)
        etcd_cluster = ",".join(("=".join((m['name'], m['peerURLs'][0]))
                                 for m in response['members'] if 'name' in m))
        LOGGER.debug("Current etcd cluster state is: %s", etcd_cluster)
        return etcd_cluster
def add_all_masters_to_loadbalancer(self, cluster_name, n_masters,
                                         lb_inst):
        """Adds all master nodes to the LoadBalancer listener.

        If the number of members in the master listener pool of the
        LoadBalancer is less than expected number of masters this function
        will add them to the pool as soon as they have node status "Ready".

        Args:
            cluster_name (string): the name of the cluster
            n_master (int): Number of desired master nodes.
            lb_inst (:class:`.cloud.openstack.LoadBalancer`): A configured
                LoadBalancer instance.
        """
        cond = {'Ready': 'True'}
        master_listener = lb_inst.master_listener
        listener_name = '-'.join((MASTER_LISTENER_NAME, cluster_name))
        if not master_listener:
            LOGGER.error(f"No {listener_name} found, aborting")
            sys.exit(1)
        try:
            listener_name = master_listener['name']
            mem = master_listener['pool']['members']  # noqa
            # pylint: disable=unused-variable
            pool_id = master_listener['pool']['id']
        except KeyError as exc:
            LOGGER.error(f"Unable to extract info of {listener_name}: {exc}")
            sys.exit(1)
        while len(lb_inst.master_listener['pool']['members']) < n_masters:
            for item in self.api.list_node(pretty=True).items:
                if cond in [{c.type: c.status}
                            for c in item.status.conditions]:
                    if 'master' in
                        addr_to_add = item.status.addresses[0].address
                        addr_present = [x['address'] for x in
                                        lb_inst.master_listener['pool']['members']]
                        if addr_to_add not in addr_present:
                            LOGGER.debug("Adding %s to pool '%s' (%s) ...",
                                         addr_to_add, listener_name, pool_id)
                            lb_inst.add_member(pool_id, addr_to_add)
def apply_addons(self, koris_config, apply_func=create_from_yaml):
        """apply all addons to the cluster

        Args:
            koris_config (dict): koris configuration loaded as dict
        """
        for addon in get_addons(koris_config):
  "Applying add-on [%s]",
            addon.apply(self.client, apply_func=apply_func)
@property def nginx_ingress_ports(self): """ get the ingress-nginx service ports as dictionary """ ingress = self.api.list_namespaced_service( 'ingress-nginx', label_selector="", limit=1) return { i for i in ingress.items[0].spec.ports}
def validate_context(self, conn):
        """Validate that server that we are talking to via K8S API is also the
        cloud context we are using.

        This retrieves the project ID of the Kubernetes LoadBalancer, then
        checks if it finds the same ID in any LoadBalancer of the currently
        sourced OpenStack project. In case the IP is not a Floating IP but
        only a Virtual IP, both IPs are simply compared.

        Args:
            conn (obj): OpenStack connection object.

        Return:
            bool
        """
        raw_ip ="https://").split(":")[0]
        lb_ip =
        if lb_ip:
            # We have a Floating IP
            for item in conn.load_balancer.load_balancers():
                if item.project_id == lb_ip.project_id:
                    return True
        else:
            # We have a Virtual IP
            for item in conn.load_balancer.load_balancers():
                if item.vip_address == raw_ip:
                    return True
        return False
class K8SScaler:  # pylint: disable=no-member
    """ A Mixin to modify the cluster size """
def add_node(self):
        """add a node to the cluster"""
def add_master(self):
        """add a master to the cluster"""
def drain_node(self, nodename, ignore_not_found=True):
        """Drains a node of pods.

        We're using ``kubectl drain`` instead of the eviction API, since it's
        quicker and we don't have to get all the Pods of the Node first.
        Will check if the node exists first.

        Args:
            nodename (str): Name of the node to drain
            ignore_not_found (bool): If set to False, will raise a ValueError
                if the node doesn't exist.

        Raises:
            RuntimeError if ``kubectl drain`` fails.
        """
        if self.node_status(nodename) is None:
            msg = f"Node {nodename} doesn't exist"
            if ignore_not_found:
      "Skipping node eviction, %s", msg)
                return
            raise ValueError(msg)
        # kubectl drain needs to block
        cmd = ["kubectl", "drain", nodename, "--ignore-daemonsets"]
        try:
            proc =, check=True, encoding="utf-8",
                              stdout=sp.PIPE, stderr=sp.PIPE)
        except sp.CalledProcessError as exc:
            raise RuntimeError("error calling '%s':"
                               "%s" % " ".join(cmd), exc)
        LOGGER.debug("STDOUT: %s (Exit code %s)", proc.stdout, proc.returncode)
def delete_node(self, nodename, grace_period=0, ignore_not_found=True):
        """Delete a node in Kubernetes.

        Args:
            nodename (str): The name of the node to delete.
            grace_period (int): Duration in seconds before the node should be
                delete. Defaults to 0, which means immediately.
            ignore_not_found (bool): If set to False, will raise a ValueError
                if node doesn't exist.

        Raises:
            :class:`` in case the API call
                fails.
        """
        if self.node_status(nodename) is None:
            msg = f"Node {nodename} doesn't exist"
            if ignore_not_found:
      "Skipping node eviction, %s", msg)
                return
            raise ValueError(msg)
        resp = self.api.delete_node(nodename,
                                     grace_period_seconds=grace_period,
                                     pretty=True)
        LOGGER.debug(resp)
        LOGGER.success("Kubernetes node '%s' has been deleted successfully",
                       nodename)
def node_status(self, nodename):
        """Returns the status of a Node.

        Args:
            nodename (str): The name of the node to check.

        Returns:
            The status of the node as string or None if an error was
            encountered.
        """
        resp = None
        try:
            resp = self.api.read_node_status(
                nodename, pretty=True)
            LOGGER.debug("API Response: %s", resp)
        except ApiException as exc:
            LOGGER.debug("API exception: %s", exc)
            return None
        # Grab dat string
        status = [x for x in resp.status.conditions if x.type == 'Ready']
        return status[0].status
@retry(ValueError)
    def etcd_members(self, podname, master_ip):
        """Retrieves a dictionary with information about the etcd cluster.

        This function uses ``etcdctl member list`` to retrieve information
        about the etcd cluster, then parses that response into a dictionary
        where the keys are the names of the members and the corresponding
        values hold the rest of the information such as ID, clientURLs and
        peerURLs.

        Returns:
            A dictionary with information about the etcd cluster.

        Raises:
            ValueError if master_ip is not valid.
        """
        if not valid_ipv4(master_ip):
            raise ValueError(f"Invalid IP: {master_ip}")
        exec_command = ['/bin/sh', '-c', ETCDCTL_BASE.format(
            "member list", master_ip)]
        response = stream(self.api.connect_get_namespaced_pod_exec,
                          podname,
                          'kube-system',
                          command=exec_command,
                          stderr=True, stdin=False,
                          stdout=True, tty=False)
        return parse_etcd_response(response)
@retry(ValueError)
    def remove_from_etcd(self, name, ignore_not
[docs]class K8S(K8SConfigurator, K8SScaler): # pylint: disable=too-many-locals """Class allowing various interactions with a Kubernets cluster. """ def __init__(self, config, manifest_path=None): """ A class to configure k8s after boot Args: config (str): File path for the kubernetes configuration file manfiest_path (str): Path for kubernetes manifests to be applied """ self.config = config if not manifest_path: manifest_path = MANIFESTSPATH self.manifest_path = manifest_path kube_config.load_kube_config(config_file=config) config = Configuration() self.api = k8sclient.CoreV1Api() self.client = api_client.ApiClient(configuration=config) @property def nginx_ingress_ports(self): """ get the ingress-nginx service ports as dictionary """ ingress = self.api.list_namespaced_service( 'ingress-nginx', label_selector="", limit=1) return { i for i in ingress.items[0].spec.ports}
[docs]def get_addons(config): """ A prototype for loading addons. There are optional addons, and non-optional addons. Currently, non-optional addons include only the metrics-server. Args: config (dict): parse yaml with an optional section, list of addons """ for item in config.get('addons', {}): yield KorisAddon(item) for item in ['metrics-server', 'nginx-ingress', 'ext-cloud-openstack']: yield KorisAddon(item)
[docs]class KorisAddon: # pylint: disable=too-few-public-methods """ Naive Addon class. Applies a kubernetes collection of resources from yml. Args: name (str): the name of the plugin manifest_path (str): the path where kubernetes resources are saved. """ def __init__(self, name, manifest_path=MANIFESTSPATH): = name self.file = os.path.join(manifest_path, name + ".yml")
[docs] def apply(self, k8s_client, apply_func=create_from_yaml): """ Apply a plugin to the cluster. Currently we use the Python client to apply a plugin. This might be limited, so we keep the possibilty to use a kubectl shell wrapper by making this an optional argument. Args: k8s_client: A Kubernet API client apply_func: A callable that can apply a plugin to the cluster """ apply_func(k8s_client, self.file, verbose=False)
[docs]def add_ingress_listeners(nginx_ingress_ports, lbinst, lb_masters): """ Reconfigure the Openstack LoadBalancer - add an HTTP and HTTPS listener for nginx ingress controller Args: lbinst (:class:`.cloud.openstack.LoadBalancer`): A configured LoadBalancer instance. members (list): list containining memebr information """ # [{"name": "foo", "address": "", "protocol_port": "6443"}, # {"name": "bar", "address": "", "protocol_port": "6443"}, # ] for key, port in {'Ingress-HTTP': 80, 'Ingress-HTTPS': 443}.items(): protocol = key.split("-")[-1] name = '-'.join((key, lbinst.config['cluster-name'])) listener = lbinst.add_listener( name=name, protocol=protocol, protocol_port=port) pool = lbinst.add_pool(, protocol=protocol, name=name) updated_masters = lb_masters.copy() for master in updated_masters: master['protocol_port'] = nginx_ingress_ports[protocol].node_port master["monitor_port"] = nginx_ingress_ports[protocol].node_port if not lbinst.bulk_update_members(updated_masters, pool['id']): LOGGER.debug("Bulk update failed, falling back to serial update") for master in lb_masters: lbinst.add_member(, master['address'], protocol_port=nginx_ingress_ports[protocol].node_port) # noqa