From 1838a07b577d238102bd7af6a417ae909fb3fb34 Mon Sep 17 00:00:00 2001 From: Henning Jacobs Date: Thu, 21 Mar 2019 17:23:38 +0100 Subject: [PATCH] use pykube-ng to query Kubernetes API --- kube_ops_view/kubernetes.py | 73 +++++++++++++++++-------------------- 1 file changed, 34 insertions(+), 39 deletions(-) diff --git a/kube_ops_view/kubernetes.py b/kube_ops_view/kubernetes.py index ab85ed8..e55ca5b 100644 --- a/kube_ops_view/kubernetes.py +++ b/kube_ops_view/kubernetes.py @@ -1,10 +1,12 @@ import datetime import logging import time -from urllib.parse import urljoin import requests +from pykube import Pod, Node +from pykube.objects import APIObject, NamespacedAPIObject + from .utils import get_short_error_message logger = logging.getLogger(__name__) @@ -12,6 +14,22 @@ logger = logging.getLogger(__name__) session = requests.Session() +# https://github.com/kubernetes/community/blob/master/contributors/design-proposals/instrumentation/resource-metrics-api.md +class NodeMetrics(APIObject): + + version = 'metrics.k8s.io/v1beta1' + endpoint = 'nodes' + kind = 'NodeMetrics' + + +# https://github.com/kubernetes/community/blob/master/contributors/design-proposals/instrumentation/resource-metrics-api.md +class PodMetrics(NamespacedAPIObject): + + version = 'metrics.k8s.io/v1beta1' + endpoint = 'pods' + kind = 'PodMetrics' + + def map_node_status(status: dict): return { 'addresses': status.get('addresses'), @@ -48,15 +66,6 @@ def map_container(cont: dict, pod: dict): return obj -def request(cluster, path, **kwargs): - if 'timeout' not in kwargs: - # sane default timeout - kwargs['timeout'] = (5, 15) - if cluster.cert_file and cluster.key_file: - kwargs['cert'] = (cluster.cert_file, cluster.key_file) - return session.get(urljoin(cluster.api_server_url, path), auth=cluster.auth, verify=cluster.ssl_ca_cert, **kwargs) - - def parse_time(s: str): return datetime.datetime.strptime(s, '%Y-%m-%dT%H:%M:%SZ').replace(tzinfo=datetime.timezone.utc).timestamp() @@ -64,19 +73,15 @@ def parse_time(s: str): def query_kubernetes_cluster(cluster): cluster_id = cluster.id api_server_url = cluster.api_server_url - response = request(cluster, '/api/v1/nodes') - response.raise_for_status() nodes = {} pods_by_namespace_name = {} unassigned_pods = {} - for node in response.json()['items']: - obj = map_node(node) + for node in Node.objects(cluster.client): + obj = map_node(node.obj) nodes[obj['name']] = obj - response = request(cluster, '/api/v1/pods') - response.raise_for_status() now = time.time() - for pod in response.json()['items']: - obj = map_pod(pod) + for pod in Pod.objects(cluster.client): + obj = map_pod(pod.obj) if 'deletionTimestamp' in pod['metadata']: obj['deleted'] = parse_time(pod['metadata']['deletionTimestamp']) for cont in pod['spec']['containers']: @@ -101,30 +106,20 @@ def query_kubernetes_cluster(cluster): unassigned_pods[pod_key] = obj try: - response = request(cluster, '/apis/metrics.k8s.io/v1beta1/nodes') - response.raise_for_status() - data = response.json() - if not data.get('items'): - logger.info('Heapster node metrics not available (yet)') - else: - for metrics in data['items']: - nodes[metrics['metadata']['name']]['usage'] = metrics['usage'] + for node_metrics in NodeMetrics.objects(cluster.client): + key = node_metrics.name + nodes[key]['usage'] = node_metrics.obj.get('usage', {}) except Exception as e: logger.warning('Failed to query node metrics {}: {}'.format(cluster.id, get_short_error_message(e))) try: - response = request(cluster, '/apis/metrics.k8s.io/v1beta1/pods') - response.raise_for_status() - data = response.json() - if not data.get('items'): - logger.info('Heapster pod metrics not available (yet)') - else: - for metrics in data['items']: - pod = pods_by_namespace_name.get((metrics['metadata']['namespace'], metrics['metadata']['name'])) - if pod: - for container in pod['containers']: - for container_metrics in metrics['containers']: - if container['name'] == container_metrics['name']: - container['resources']['usage'] = container_metrics['usage'] + for pod_metrics in PodMetrics.objects(cluster.client): + key = (pod_metrics.namespace, pod_metrics.name) + pod = pods_by_namespace_name.get(key) + if pod: + for container in pod['containers']: + for container_metrics in pod_metrics.obj.get('containers', []): + if container['name'] == container_metrics['name']: + container['resources']['usage'] = container_metrics['usage'] except Exception as e: logger.warning('Failed to query pod metrics for cluster {}: {}'.format(cluster.id, get_short_error_message(e))) return {