use pykube-ng to query Kubernetes API

This commit is contained in:
Henning Jacobs
2019-03-21 17:23:38 +01:00
parent aefbf446ba
commit 1838a07b57

View File

@@ -1,10 +1,12 @@
import datetime import datetime
import logging import logging
import time import time
from urllib.parse import urljoin
import requests import requests
from pykube import Pod, Node
from pykube.objects import APIObject, NamespacedAPIObject
from .utils import get_short_error_message from .utils import get_short_error_message
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -12,6 +14,22 @@ logger = logging.getLogger(__name__)
session = requests.Session() session = requests.Session()
# https://github.com/kubernetes/community/blob/master/contributors/design-proposals/instrumentation/resource-metrics-api.md
class NodeMetrics(APIObject):
version = 'metrics.k8s.io/v1beta1'
endpoint = 'nodes'
kind = 'NodeMetrics'
# https://github.com/kubernetes/community/blob/master/contributors/design-proposals/instrumentation/resource-metrics-api.md
class PodMetrics(NamespacedAPIObject):
version = 'metrics.k8s.io/v1beta1'
endpoint = 'pods'
kind = 'PodMetrics'
def map_node_status(status: dict): def map_node_status(status: dict):
return { return {
'addresses': status.get('addresses'), 'addresses': status.get('addresses'),
@@ -48,15 +66,6 @@ def map_container(cont: dict, pod: dict):
return obj return obj
def request(cluster, path, **kwargs):
if 'timeout' not in kwargs:
# sane default timeout
kwargs['timeout'] = (5, 15)
if cluster.cert_file and cluster.key_file:
kwargs['cert'] = (cluster.cert_file, cluster.key_file)
return session.get(urljoin(cluster.api_server_url, path), auth=cluster.auth, verify=cluster.ssl_ca_cert, **kwargs)
def parse_time(s: str): def parse_time(s: str):
return datetime.datetime.strptime(s, '%Y-%m-%dT%H:%M:%SZ').replace(tzinfo=datetime.timezone.utc).timestamp() return datetime.datetime.strptime(s, '%Y-%m-%dT%H:%M:%SZ').replace(tzinfo=datetime.timezone.utc).timestamp()
@@ -64,19 +73,15 @@ def parse_time(s: str):
def query_kubernetes_cluster(cluster): def query_kubernetes_cluster(cluster):
cluster_id = cluster.id cluster_id = cluster.id
api_server_url = cluster.api_server_url api_server_url = cluster.api_server_url
response = request(cluster, '/api/v1/nodes')
response.raise_for_status()
nodes = {} nodes = {}
pods_by_namespace_name = {} pods_by_namespace_name = {}
unassigned_pods = {} unassigned_pods = {}
for node in response.json()['items']: for node in Node.objects(cluster.client):
obj = map_node(node) obj = map_node(node.obj)
nodes[obj['name']] = obj nodes[obj['name']] = obj
response = request(cluster, '/api/v1/pods')
response.raise_for_status()
now = time.time() now = time.time()
for pod in response.json()['items']: for pod in Pod.objects(cluster.client):
obj = map_pod(pod) obj = map_pod(pod.obj)
if 'deletionTimestamp' in pod['metadata']: if 'deletionTimestamp' in pod['metadata']:
obj['deleted'] = parse_time(pod['metadata']['deletionTimestamp']) obj['deleted'] = parse_time(pod['metadata']['deletionTimestamp'])
for cont in pod['spec']['containers']: for cont in pod['spec']['containers']:
@@ -101,28 +106,18 @@ def query_kubernetes_cluster(cluster):
unassigned_pods[pod_key] = obj unassigned_pods[pod_key] = obj
try: try:
response = request(cluster, '/apis/metrics.k8s.io/v1beta1/nodes') for node_metrics in NodeMetrics.objects(cluster.client):
response.raise_for_status() key = node_metrics.name
data = response.json() nodes[key]['usage'] = node_metrics.obj.get('usage', {})
if not data.get('items'):
logger.info('Heapster node metrics not available (yet)')
else:
for metrics in data['items']:
nodes[metrics['metadata']['name']]['usage'] = metrics['usage']
except Exception as e: except Exception as e:
logger.warning('Failed to query node metrics {}: {}'.format(cluster.id, get_short_error_message(e))) logger.warning('Failed to query node metrics {}: {}'.format(cluster.id, get_short_error_message(e)))
try: try:
response = request(cluster, '/apis/metrics.k8s.io/v1beta1/pods') for pod_metrics in PodMetrics.objects(cluster.client):
response.raise_for_status() key = (pod_metrics.namespace, pod_metrics.name)
data = response.json() pod = pods_by_namespace_name.get(key)
if not data.get('items'):
logger.info('Heapster pod metrics not available (yet)')
else:
for metrics in data['items']:
pod = pods_by_namespace_name.get((metrics['metadata']['namespace'], metrics['metadata']['name']))
if pod: if pod:
for container in pod['containers']: for container in pod['containers']:
for container_metrics in metrics['containers']: for container_metrics in pod_metrics.obj.get('containers', []):
if container['name'] == container_metrics['name']: if container['name'] == container_metrics['name']:
container['resources']['usage'] = container_metrics['usage'] container['resources']['usage'] = container_metrics['usage']
except Exception as e: except Exception as e: