Source code for koris.deploy.k8s

"""
deploy cluster service to kubernetes via the API server
"""
import base64
from datetime import datetime, timedelta
import getpass
import logging
import os
import random
import re
import socket
import string
import subprocess as sp
import sys
import urllib3

from pkg_resources import resource_filename, Requirement
from netaddr import valid_ipv4

from kubernetes import client as k8sclient
from kubernetes.stream import stream
from kubernetes.client import api_client
from kubernetes.client.configuration import Configuration
from kubernetes.client.rest import ApiException
from kubernetes.config import kube_config
from kubernetes.utils import create_from_yaml

import yaml

from koris.ssl import read_cert
from koris.ssl import discovery_hash as ssl_discovery_hash
from koris.util.util import retry
from koris.util.logger import Logger
from koris import MASTER_LISTENER_NAME

if getattr(sys, 'frozen', False):
    MANIFESTSPATH = os.path.join(
        sys._MEIPASS,  # pylint: disable=no-member, protected-access
        'deploy/manifests')
else:
    MANIFESTSPATH = resource_filename(Requirement.parse("koris"),
                                      'koris/deploy/manifests')

LOGGER = Logger(__name__)


ETCDCTL_BASE = ("ETCDCTL_API=3 etcdctl "
                "--key /etc/kubernetes/pki/etcd/server.key "
                "--cacert /etc/kubernetes/pki/etcd/ca.crt "
                "--cert /etc/kubernetes/pki/etcd/server.crt "
                "{} --endpoints=https://{}:2379 -w json")


def _get_node_addr(addresses, addr_type):
    """
    Parse the address of the node

    Args:
        addresses (object) - instance of addresses returned from k8s API
        addr_type (str) - the address type
    """
    return [i.address for i in addresses if i.type == addr_type][0]


[docs]def rand_string(num): """ generate a random string of len num """ return ''.join([ random.choice(string.ascii_letters.lower() + string.digits) for n in range(num)])
[docs]def get_token_description(): """create a description for the token""" description = "Bootstrap token generated by 'koris add' from {} on {}" return description.format('%s@%s' % (getpass.getuser(), socket.gethostname()), datetime.now())
[docs]def parse_etcd_response(resp): """Takes a response from etcdctl and parses it for its member info. The response is to be expected in JSON format as obtained by ``etcdctl member list -w json``. Right now, the IDs in the JSON response are in uint64 format and will be transformed into hex with this function. Args: resp (str): A JSON response from etcdctl. Returns: A dict containing member information. Raises: ValueError if state could not be extracted. """ if not resp or resp is None: raise ValueError("etcdtl response is empty") if not re.search("master-\\d+", resp): LOGGER.debug(resp) raise ValueError("can't find 'master' in etcdtl response") # Reconstructing the response so we get a dict where the key is the # member name and and value is a dict with the other info. out = {} resp_yaml = yaml.load(resp) for mem in resp_yaml['members']: if 'name' in mem: out[mem['name']] = {k: v for k, v in mem.items() if k != "name"} # ID is uint64, but we need it in hex out[mem['name']]['ID'] = hex(out[mem['name']]['ID'])[2:] return out
[docs]class K8SConfigurator: # pylint: disable=no-member """apply plugins and post install setup"""
[docs] def apply_plugins(self, plugins): """apply all plugins in the list"""
[docs] def get_bootstrap_token(self): """Generate a Bootstrap token Returns: A string of the form ``<token id>.<token secret>``. """ tid = rand_string(6) token_secret = rand_string(16) data = {'description': get_token_description(), 'token-id': tid, 'token-secret': token_secret, 'expiration': datetime.strftime(datetime.now() + timedelta(hours=2), "%Y-%m-%dT%H:%M:%SZ"), 'usage-bootstrap-authentication': 'true', 'usage-bootstrap-signing': 'true', 'auth-extra-groups': 'system:bootstrappers:kubeadm:default-node-token', } for k, val in data.items(): data[k] = base64.b64encode(val.encode()).decode() sec = k8sclient.V1Secret(data=data) sec.metadata = k8sclient.V1ObjectMeta( **{'name': 'bootstrap-token-%s' % tid, 'namespace': 'kube-system'}) sec.type = 'bootstrap.kubernetes.io/token' self.api.create_namespaced_secret(namespace="kube-system", body=sec) return ".".join((tid, token_secret))
@property def host(self): """Retrieve the host or loadbalancer info""" return self.api.api_client.configuration.host @property def ca_info(self): """Return a dict with the read ca and the discovery hash""" return {"ca_cert": self.ca_cert, "discovery_hash": self.discovery_hash} @property def ca_cert(self): """Returns the API servers CA. Returns: The CA encoded as base64. """ return read_cert(self.api.api_client.configuration.ssl_ca_cert) @property def discovery_hash(self): """Calculate and return a discovery_hash. Based on the cluster CA. Returns: A discovery hash encoded in Hex. """ return ssl_discovery_hash(self.ca_cert) @property def is_ready(self): """Check if the API server is already available. Returns: True if it's reachable. """ logging.getLogger("urllib3").setLevel(logging.ERROR) try: k8sclient.apis.core_api.CoreApi().get_api_versions() logging.getLogger("urllib3").setLevel(logging.WARNING) return True except urllib3.exceptions.MaxRetryError: logging.getLogger("urllib3").setLevel(logging.WARNING) return False
[docs] def get_random_master(self): """Returns a name and IP of a random master server in the cluster. Returns: Tuple of name and IP of a master. """ nodes = self.api.list_node(pretty=True) nodes = [node for node in nodes.items if 'node-role.kubernetes.io/master' in node.metadata.labels] addresses = nodes[0].status.addresses # master_ip and master_name are the hostname and IP of an existing # master, where an etcd instance is already running. master_ip = _get_node_addr(addresses, "InternalIP") master_name = _get_node_addr(addresses, "Hostname") return master_name, master_ip
[docs] @retry(ValueError) def etcd_cluster_status(self): """Checks the current etcd cluster state. This function calls etcdctl inside a pod in order to obtain the current state of the etcd cluster before a new member can be added to it. Right now, etcdctl offers no convenient way to format the output so the URLs from the masters can be extracted, which is why jq is used here. Args: podname (str): The name of the pod where the etcdctl command should be sent from. Needs to be inside the kube-system namespace. master_ip (str) Returns: The status of the etcd as a string (e.g.master-1=192.168.1.102,master-2=192.168.1.103) """ name, master_ip = self.get_random_master() exec_command = ['/bin/sh', '-c', ETCDCTL_BASE.format( "member list", master_ip)] response = stream(self.api.connect_get_namespaced_pod_exec, "etcd-%s" % name, 'kube-system', command=exec_command, stderr=True, stdin=False, stdout=True, tty=False) if not response or not re.search("master-\\d+", response): LOGGER.info(response) raise ValueError("Could not extract current etcd cluster state!") # respone should be something like # {'members': [{'ID': 9007573287841766007, 'name': 'master-7-am', # 'peerURLs': ['https://10.32.192.11:2380'], # 'clientURLs': ['https://10.32.192.11:2379']}]} response = yaml.load(response) etcd_cluster = ",".join(("=".join((m['name'], m['peerURLs'][0])) for m in response['members'] if 'name' in m)) LOGGER.debug("Current etcd cluster state is: %s", etcd_cluster) return etcd_cluster
[docs] def add_all_masters_to_loadbalancer(self, cluster_name, n_masters, lb_inst): """Adds all master nodes to the LoadBalancer listener. If the number of members in the master listener pool of the LoadBalancer is less than expected number of masters this function will add them to the pool as soon as they have node status "Ready". Args: cluster_name (string): the name of the cluster n_master (int): Number of desired master nodes. lb_inst (:class:`.cloud.openstack.LoadBalancer`): A configured LoadBalancer instance. """ cond = {'Ready': 'True'} master_listener = lb_inst.master_listener listener_name = '-'.join((MASTER_LISTENER_NAME, cluster_name)) if not master_listener: LOGGER.error(f"No {listener_name} found, aborting") sys.exit(1) try: listener_name = master_listener['name'] mem = master_listener['pool']['members'] # noqa # pylint: disable=unused-variable pool_id = master_listener['pool']['id'] except KeyError as exc: LOGGER.error(f"Unable to extract info of {listener_name}: {exc}") sys.exit(1) while len(lb_inst.master_listener['pool']['members']) < n_masters: for item in self.api.list_node(pretty=True).items: if cond in [{c.type: c.status} for c in item.status.conditions]: if 'master' in item.metadata.name: addr_to_add = item.status.addresses[0].address addr_present = [x['address'] for x in lb_inst.master_listener['pool']['members']] if addr_to_add not in addr_present: LOGGER.debug("Adding %s to pool '%s' (%s) ...", addr_to_add, listener_name, pool_id) lb_inst.add_member(pool_id, addr_to_add)
[docs] def apply_addons(self, koris_config, apply_func=create_from_yaml): """apply all addons to the cluster Args: koris_config (dict): koris configuration loaded as dict """ for addon in get_addons(koris_config): LOGGER.info("Applying add-on [%s]", addon.name) addon.apply(self.client, apply_func=apply_func)
@property def nginx_ingress_ports(self): """ get the ingress-nginx service ports as dictionary """ ingress = self.api.list_namespaced_service( 'ingress-nginx', label_selector="app.kubernetes.io/name=ingress-nginx", limit=1) return {i.name.upper(): i for i in ingress.items[0].spec.ports}
[docs] def validate_context(self, conn): """Validate that server that we are talking to via K8S API is also the cloud context we are using. This retrieves the project ID of the Kubernetes LoadBalancer, then checks if it finds the same ID in any LoadBalancer of the currently sourced OpenStack project. In case the IP is not a Floating IP but only a Virtual IP, both IPs are simply compared. Args: conn (obj): OpenStack connection object. Return: bool """ raw_ip = self.host.strip("https://").split(":")[0] lb_ip = conn.network.find_ip(raw_ip) if lb_ip: # We have a Floating IP for item in conn.load_balancer.load_balancers(): if item.project_id == lb_ip.project_id: return True else: # We have a Virtual IP for item in conn.load_balancer.load_balancers(): if item.vip_address == raw_ip: return True return False
[docs]class K8SScaler: # pylint: disable=no-member """ A Mixin to modify the cluster size """
[docs] def add_node(self): """add a node to the cluster"""
[docs] def add_master(self): """add a master to the cluster"""
[docs] def drain_node(self, nodename, ignore_not_found=True): """Drains a node of pods. We're using ``kubectl drain`` instead of the eviction API, since it's quicker and we don't have to get all the Pods of the Node first. Will check if the node exists first. Args: nodename (str): Name of the node to drain ignore_not_found (bool): If set to False, will raise a ValueError if the node doesn't exist. Raises: RuntimeError if ``kubectl drain`` fails. """ if self.node_status(nodename) is None: msg = f"Node {nodename} doesn't exist" if ignore_not_found: LOGGER.info("Skipping node eviction, %s", msg) return raise ValueError(msg) # kubectl drain needs to block cmd = ["kubectl", "drain", nodename, "--ignore-daemonsets"] try: proc = sp.run(cmd, check=True, encoding="utf-8", stdout=sp.PIPE, stderr=sp.PIPE) except sp.CalledProcessError as exc: raise RuntimeError("error calling '%s':" "%s" % " ".join(cmd), exc) LOGGER.debug("STDOUT: %s (Exit code %s)", proc.stdout, proc.returncode)
# pylint: disable=too-many-function-args
[docs] def delete_node(self, nodename, grace_period=0, ignore_not_found=True): """Delete a node in Kubernetes. Args: nodename (str): The name of the node to delete. grace_period (int): Duration in seconds before the node should be delete. Defaults to 0, which means immediately. ignore_not_found (bool): If set to False, will raise a ValueError if node doesn't exist. Raises: :class:`kubernetes.client.rest.ApiException` in case the API call fails. """ if self.node_status(nodename) is None: msg = f"Node {nodename} doesn't exist" if ignore_not_found: LOGGER.info("Skipping node eviction, %s", msg) return raise ValueError(msg) resp = self.api.delete_node(nodename, grace_period_seconds=grace_period, pretty=True) LOGGER.debug(resp) LOGGER.success("Kubernetes node '%s' has been deleted successfully", nodename)
[docs] def node_status(self, nodename): """Returns the status of a Node. Args: nodename (str): The name of the node to check. Returns: The status of the node as string or None if an error was encountered. """ resp = None try: resp = self.api.read_node_status( nodename, pretty=True) LOGGER.debug("API Response: %s", resp) except ApiException as exc: LOGGER.debug("API exception: %s", exc) return None # Grab dat string status = [x for x in resp.status.conditions if x.type == 'Ready'] return status[0].status
[docs] @retry(ValueError) def etcd_members(self, podname, master_ip): """Retrieves a dictionary with information about the etcd cluster. This function uses ``etcdctl member list`` to retrieve information about the etcd cluster, then parses that response into a dictionary where the keys are the names of the members and the corresponding values hold the rest of the information such as ID, clientURLs and peerURLs. Returns: A dictionary with information about the etcd cluster. Raises: ValueError if master_ip is not valid. """ if not valid_ipv4(master_ip): raise ValueError(f"Invalid IP: {master_ip}") exec_command = ['/bin/sh', '-c', ETCDCTL_BASE.format( "member list", master_ip)] response = stream(self.api.connect_get_namespaced_pod_exec, podname, 'kube-system', command=exec_command, stderr=True, stdin=False, stdout=True, tty=False) return parse_etcd_response(response)
[docs] @retry(ValueError) def remove_from_etcd(self, name, ignore_not_found=True): """Removes a member from etcd. The 'master-adder' operator will be used to perform the queries against etcd. The pod will be created if not found. Args: name (str): The name of the member to remove. ignore_not_found (bool): If set to False, will raise a ValueError if member is not part of etcd cluster. """ master, master_ip = self.get_random_master() podname = "etcd-%s" % master etcd_members = self.etcd_members(podname, master_ip) LOGGER.debug(etcd_members) try: etcd_id = etcd_members[name]['ID'] except KeyError: msg = f"'{name}' not part of etcd cluster" if ignore_not_found: LOGGER.info("Skipping removing %s from etcd: %s", name, msg) return raise ValueError(msg) exec_command = ['/bin/sh', '-c', ETCDCTL_BASE.format("member remove %s" % etcd_id, master_ip)] response = stream(self.api.connect_get_namespaced_pod_exec, podname, 'kube-system', command=exec_command, stderr=True, stdin=False, stdout=True, tty=False) LOGGER.debug("%s", response) LOGGER.debug("Removed '%s' from etcd", name)
[docs]class K8S(K8SConfigurator, K8SScaler): # pylint: disable=too-many-locals """Class allowing various interactions with a Kubernets cluster. """ def __init__(self, config, manifest_path=None): """ A class to configure k8s after boot Args: config (str): File path for the kubernetes configuration file manfiest_path (str): Path for kubernetes manifests to be applied """ self.config = config if not manifest_path: manifest_path = MANIFESTSPATH self.manifest_path = manifest_path kube_config.load_kube_config(config_file=config) config = Configuration() self.api = k8sclient.CoreV1Api() self.client = api_client.ApiClient(configuration=config) @property def nginx_ingress_ports(self): """ get the ingress-nginx service ports as dictionary """ ingress = self.api.list_namespaced_service( 'ingress-nginx', label_selector="app.kubernetes.io/name=ingress-nginx", limit=1) return {i.name.upper(): i for i in ingress.items[0].spec.ports}
[docs]def get_addons(config): """ A prototype for loading addons. There are optional addons, and non-optional addons. Currently, non-optional addons include only the metrics-server. Args: config (dict): parse yaml with an optional section, list of addons """ for item in config.get('addons', {}): yield KorisAddon(item) for item in ['metrics-server', 'nginx-ingress', 'ext-cloud-openstack']: yield KorisAddon(item)
[docs]class KorisAddon: # pylint: disable=too-few-public-methods """ Naive Addon class. Applies a kubernetes collection of resources from yml. Args: name (str): the name of the plugin manifest_path (str): the path where kubernetes resources are saved. """ def __init__(self, name, manifest_path=MANIFESTSPATH): self.name = name self.file = os.path.join(manifest_path, name + ".yml")
[docs] def apply(self, k8s_client, apply_func=create_from_yaml): """ Apply a plugin to the cluster. Currently we use the Python client to apply a plugin. This might be limited, so we keep the possibilty to use a kubectl shell wrapper by making this an optional argument. Args: k8s_client: A Kubernet API client apply_func: A callable that can apply a plugin to the cluster """ apply_func(k8s_client, self.file, verbose=False)
[docs]def add_ingress_listeners(nginx_ingress_ports, lbinst, lb_masters): """ Reconfigure the Openstack LoadBalancer - add an HTTP and HTTPS listener for nginx ingress controller Args: lbinst (:class:`.cloud.openstack.LoadBalancer`): A configured LoadBalancer instance. members (list): list containining memebr information """ # [{"name": "foo", "address": "10.0.0.38", "protocol_port": "6443"}, # {"name": "bar", "address": "10.0.0.29", "protocol_port": "6443"}, # ] for key, port in {'Ingress-HTTP': 80, 'Ingress-HTTPS': 443}.items(): protocol = key.split("-")[-1] name = '-'.join((key, lbinst.config['cluster-name'])) listener = lbinst.add_listener( name=name, protocol=protocol, protocol_port=port) pool = lbinst.add_pool(listener.id, protocol=protocol, name=name) updated_masters = lb_masters.copy() for master in updated_masters: master['protocol_port'] = nginx_ingress_ports[protocol].node_port master["monitor_port"] = nginx_ingress_ports[protocol].node_port if not lbinst.bulk_update_members(updated_masters, pool['id']): LOGGER.debug("Bulk update failed, falling back to serial update") for master in lb_masters: lbinst.add_member(pool.id, master['address'], protocol_port=nginx_ingress_ports[protocol].node_port) # noqa