diff --git a/kube_ops_view/main.py b/kube_ops_view/main.py index ce8e611..cca4e70 100644 --- a/kube_ops_view/main.py +++ b/kube_ops_view/main.py @@ -12,15 +12,11 @@ import json import json_delta import logging import os -import re -import requests -import datetime import random import redis import signal import string import time -import tokens from pathlib import Path from queue import Queue from redlock import Redlock @@ -29,6 +25,9 @@ from flask import Flask, redirect from flask_oauthlib.client import OAuth, OAuthRemoteApp from urllib.parse import urljoin +from .mock import get_mock_clusters +from .kubernetes import get_kubernetes_clusters + ONE_YEAR = 3600 * 24 * 365 logging.basicConfig(level=logging.INFO) @@ -157,9 +156,6 @@ class RedisStore: self._redis.set(redis_key, json.dumps(data)) -CLUSTER_ID_INVALID_CHARS = re.compile('[^a-z0-9:-]') - - def get_bool(name: str): return os.getenv(name, '').lower() in ('1', 'true') @@ -167,7 +163,6 @@ def get_bool(name: str): DEBUG = get_bool('DEBUG') SERVER_PORT = int(os.getenv('SERVER_PORT', 8080)) SERVER_STATUS = {'shutdown': False} -DEFAULT_CLUSTERS = 'http://localhost:8001/' CREDENTIALS_DIR = os.getenv('CREDENTIALS_DIR', '') AUTHORIZE_URL = os.getenv('AUTHORIZE_URL') APP_URL = os.getenv('APP_URL') @@ -228,11 +223,6 @@ auth = OAuthRemoteAppWithRefresh( ) oauth.remote_apps['auth'] = auth -session = requests.Session() - -tokens.configure(from_file_only=True) -tokens.manage('read-only') - @app.route('/health') def health(): @@ -258,195 +248,6 @@ def index(): return flask.render_template('index.html', app_js=app_js) -def hash_int(x: int): - x = ((x >> 16) ^ x) * 0x45d9f3b - x = ((x >> 16) ^ x) * 0x45d9f3b - x = (x >> 16) ^ x - return x - - -def generate_mock_pod(index: int, i: int, j: int): - names = [ - 'agent-cooper', - 'black-lodge', - 'bob', - 'bobby-briggs', - 'laura-palmer', - 'leland-palmer', - 'log-lady', - 'sheriff-truman', - ] - pod_phases = ['Pending', 'Running', 'Running'] - labels = {} - phase = pod_phases[hash_int((index + 1) * (i + 1) * (j + 1)) % len(pod_phases)] - containers = [] - for k in range(1 + j % 2): - container = { - 'name': 'myapp', 'image': 'foo/bar/{}'.format(j), 'resources': {'requests': {'cpu': '100m', 'memory': '100Mi'}, 'limits': {}}, - 'ready': True, - 'state': {'running': {}} - } - if phase == 'Running': - if j % 13 == 0: - container.update(**{'ready': False, 'state': {'waiting': {'reason': 'CrashLoopBackOff'}}}) - elif j % 7 == 0: - container.update(**{'ready': True, 'state': {'running': {}}, 'restartCount': 3}) - containers.append(container) - pod = { - 'name': '{}-{}-{}'.format(names[hash_int((i + 1) * (j + 1)) % len(names)], i, j), - 'namespace': 'kube-system' if j < 3 else 'default', - 'labels': labels, - 'phase': phase, - 'containers': containers - } - if phase == 'Running' and j % 17 == 0: - pod['deleted'] = 123 - - return pod - - -def generate_cluster_id(url: str): - '''Generate some "cluster ID" from given API server URL''' - for prefix in ('https://', 'http://'): - if url.startswith(prefix): - url = url[len(prefix):] - return CLUSTER_ID_INVALID_CHARS.sub('-', url.lower()).strip('-') - - -def generate_mock_cluster_data(index: int): - '''Generate deterministic (no randomness!) mock data''' - nodes = {} - for i in range(10): - # add/remove the second to last node every 13 seconds - if i == 8 and int(time.time() / 13) % 2 == 0: - continue - labels = {} - if i < 2: - labels['master'] = 'true' - pods = {} - for j in range(hash_int((index + 1) * (i + 1)) % 32): - # add/remove some pods every 7 seconds - if j % 17 == 0 and int(time.time() / 7) % 2 == 0: - pass - else: - pod = generate_mock_pod(index, i, j) - pods['{}/{}'.format(pod['namespace'], pod['name'])] = pod - node = {'name': 'node-{}'.format(i), 'labels': labels, 'status': {'capacity': {'cpu': '4', 'memory': '32Gi', 'pods': '110'}}, 'pods': pods} - nodes[node['name']] = node - pod = generate_mock_pod(index, 11, index) - unassigned_pods = {'{}/{}'.format(pod['namespace'], pod['name']): pod} - return { - 'id': 'mock-cluster-{}'.format(index), - 'api_server_url': 'https://kube-{}.example.org'.format(index), - 'nodes': nodes, - 'unassigned_pods': unassigned_pods - } - - -def get_mock_clusters(): - for i in range(3): - data = generate_mock_cluster_data(i) - yield data - - -def map_node_status(status: dict): - return { - 'addresses': status.get('addresses'), - 'capacity': status.get('capacity'), - } - - -def map_node(node: dict): - return { - 'name': node['metadata']['name'], - 'labels': node['metadata']['labels'], - 'status': map_node_status(node['status']), - 'pods': {} - } - - -def map_pod(pod: dict): - return { - 'name': pod['metadata']['name'], - 'namespace': pod['metadata']['namespace'], - 'labels': pod['metadata'].get('labels', {}), - 'phase': pod['status'].get('phase'), - 'startTime': pod['status']['startTime'] if 'startTime' in pod['status'] else '', - 'containers': [] - } - - -def map_container(cont: dict, pod: dict): - obj = {'name': cont['name'], 'image': cont['image'], 'resources': cont['resources']} - status = list([s for s in pod.get('status', {}).get('containerStatuses', []) if s['name'] == cont['name']]) - if status: - obj.update(**status[0]) - return obj - - -def get_kubernetes_clusters(): - for api_server_url in (os.getenv('CLUSTERS') or DEFAULT_CLUSTERS).split(','): - cluster_id = generate_cluster_id(api_server_url) - if 'localhost' not in api_server_url: - # TODO: hacky way of detecting whether we need a token or not - session.headers['Authorization'] = 'Bearer {}'.format(tokens.get('read-only')) - response = session.get(urljoin(api_server_url, '/api/v1/nodes'), timeout=5) - response.raise_for_status() - nodes = {} - pods_by_namespace_name = {} - unassigned_pods = {} - for node in response.json()['items']: - obj = map_node(node) - nodes[obj['name']] = obj - response = session.get(urljoin(api_server_url, '/api/v1/pods'), timeout=5) - response.raise_for_status() - for pod in response.json()['items']: - obj = map_pod(pod) - if 'deletionTimestamp' in pod['metadata']: - obj['deleted'] = datetime.datetime.strptime(pod['metadata']['deletionTimestamp'], - '%Y-%m-%dT%H:%M:%SZ').replace( - tzinfo=datetime.timezone.utc).timestamp() - for cont in pod['spec']['containers']: - obj['containers'].append(map_container(cont, pod)) - pods_by_namespace_name[(obj['namespace'], obj['name'])] = obj - pod_key = '{}/{}'.format(obj['namespace'], obj['name']) - if 'nodeName' in pod['spec'] and pod['spec']['nodeName'] in nodes: - nodes[pod['spec']['nodeName']]['pods'][pod_key] = obj - else: - unassigned_pods[pod_key] = obj - - try: - response = session.get(urljoin(api_server_url, '/api/v1/namespaces/kube-system/services/heapster/proxy/apis/metrics/v1alpha1/nodes'), timeout=5) - response.raise_for_status() - data = response.json() - if not data.get('items'): - logging.info('Heapster node metrics not available (yet)') - else: - for metrics in data['items']: - nodes[metrics['metadata']['name']]['usage'] = metrics['usage'] - except: - logging.exception('Failed to get node metrics') - try: - response = session.get(urljoin(api_server_url, - '/api/v1/namespaces/kube-system/services/heapster/proxy/apis/metrics/v1alpha1/pods'), - timeout=5) - response.raise_for_status() - data = response.json() - if not data.get('items'): - logging.info('Heapster pod metrics not available (yet)') - else: - for metrics in data['items']: - pod = pods_by_namespace_name.get((metrics['metadata']['namespace'], metrics['metadata']['name'])) - if pod: - for container in pod['containers']: - for container_metrics in metrics['containers']: - if container['name'] == container_metrics['name']: - container['resources']['usage'] = container_metrics['usage'] - except: - logging.exception('Failed to get pod metrics') - yield {'id': cluster_id, 'api_server_url': api_server_url, 'nodes': nodes, 'unassigned_pods': unassigned_pods} - - def event(cluster_ids: set): # first sent full data once for cluster_id in (STORE.get('cluster-ids') or []):