#90 move mock and k8s cluster source out of main.py

This commit is contained in:
Henning Jacobs
2017-01-14 13:52:32 +01:00
parent d3816d3274
commit e9da91d451

View File

@@ -12,15 +12,11 @@ import json
import json_delta
import logging
import os
import re
import requests
import datetime
import random
import redis
import signal
import string
import time
import tokens
from pathlib import Path
from queue import Queue
from redlock import Redlock
@@ -29,6 +25,9 @@ from flask import Flask, redirect
from flask_oauthlib.client import OAuth, OAuthRemoteApp
from urllib.parse import urljoin
from .mock import get_mock_clusters
from .kubernetes import get_kubernetes_clusters
ONE_YEAR = 3600 * 24 * 365
logging.basicConfig(level=logging.INFO)
@@ -157,9 +156,6 @@ class RedisStore:
self._redis.set(redis_key, json.dumps(data))
CLUSTER_ID_INVALID_CHARS = re.compile('[^a-z0-9:-]')
def get_bool(name: str):
return os.getenv(name, '').lower() in ('1', 'true')
@@ -167,7 +163,6 @@ def get_bool(name: str):
DEBUG = get_bool('DEBUG')
SERVER_PORT = int(os.getenv('SERVER_PORT', 8080))
SERVER_STATUS = {'shutdown': False}
DEFAULT_CLUSTERS = 'http://localhost:8001/'
CREDENTIALS_DIR = os.getenv('CREDENTIALS_DIR', '')
AUTHORIZE_URL = os.getenv('AUTHORIZE_URL')
APP_URL = os.getenv('APP_URL')
@@ -228,11 +223,6 @@ auth = OAuthRemoteAppWithRefresh(
)
oauth.remote_apps['auth'] = auth
session = requests.Session()
tokens.configure(from_file_only=True)
tokens.manage('read-only')
@app.route('/health')
def health():
@@ -258,195 +248,6 @@ def index():
return flask.render_template('index.html', app_js=app_js)
def hash_int(x: int):
x = ((x >> 16) ^ x) * 0x45d9f3b
x = ((x >> 16) ^ x) * 0x45d9f3b
x = (x >> 16) ^ x
return x
def generate_mock_pod(index: int, i: int, j: int):
names = [
'agent-cooper',
'black-lodge',
'bob',
'bobby-briggs',
'laura-palmer',
'leland-palmer',
'log-lady',
'sheriff-truman',
]
pod_phases = ['Pending', 'Running', 'Running']
labels = {}
phase = pod_phases[hash_int((index + 1) * (i + 1) * (j + 1)) % len(pod_phases)]
containers = []
for k in range(1 + j % 2):
container = {
'name': 'myapp', 'image': 'foo/bar/{}'.format(j), 'resources': {'requests': {'cpu': '100m', 'memory': '100Mi'}, 'limits': {}},
'ready': True,
'state': {'running': {}}
}
if phase == 'Running':
if j % 13 == 0:
container.update(**{'ready': False, 'state': {'waiting': {'reason': 'CrashLoopBackOff'}}})
elif j % 7 == 0:
container.update(**{'ready': True, 'state': {'running': {}}, 'restartCount': 3})
containers.append(container)
pod = {
'name': '{}-{}-{}'.format(names[hash_int((i + 1) * (j + 1)) % len(names)], i, j),
'namespace': 'kube-system' if j < 3 else 'default',
'labels': labels,
'phase': phase,
'containers': containers
}
if phase == 'Running' and j % 17 == 0:
pod['deleted'] = 123
return pod
def generate_cluster_id(url: str):
'''Generate some "cluster ID" from given API server URL'''
for prefix in ('https://', 'http://'):
if url.startswith(prefix):
url = url[len(prefix):]
return CLUSTER_ID_INVALID_CHARS.sub('-', url.lower()).strip('-')
def generate_mock_cluster_data(index: int):
'''Generate deterministic (no randomness!) mock data'''
nodes = {}
for i in range(10):
# add/remove the second to last node every 13 seconds
if i == 8 and int(time.time() / 13) % 2 == 0:
continue
labels = {}
if i < 2:
labels['master'] = 'true'
pods = {}
for j in range(hash_int((index + 1) * (i + 1)) % 32):
# add/remove some pods every 7 seconds
if j % 17 == 0 and int(time.time() / 7) % 2 == 0:
pass
else:
pod = generate_mock_pod(index, i, j)
pods['{}/{}'.format(pod['namespace'], pod['name'])] = pod
node = {'name': 'node-{}'.format(i), 'labels': labels, 'status': {'capacity': {'cpu': '4', 'memory': '32Gi', 'pods': '110'}}, 'pods': pods}
nodes[node['name']] = node
pod = generate_mock_pod(index, 11, index)
unassigned_pods = {'{}/{}'.format(pod['namespace'], pod['name']): pod}
return {
'id': 'mock-cluster-{}'.format(index),
'api_server_url': 'https://kube-{}.example.org'.format(index),
'nodes': nodes,
'unassigned_pods': unassigned_pods
}
def get_mock_clusters():
for i in range(3):
data = generate_mock_cluster_data(i)
yield data
def map_node_status(status: dict):
return {
'addresses': status.get('addresses'),
'capacity': status.get('capacity'),
}
def map_node(node: dict):
return {
'name': node['metadata']['name'],
'labels': node['metadata']['labels'],
'status': map_node_status(node['status']),
'pods': {}
}
def map_pod(pod: dict):
return {
'name': pod['metadata']['name'],
'namespace': pod['metadata']['namespace'],
'labels': pod['metadata'].get('labels', {}),
'phase': pod['status'].get('phase'),
'startTime': pod['status']['startTime'] if 'startTime' in pod['status'] else '',
'containers': []
}
def map_container(cont: dict, pod: dict):
obj = {'name': cont['name'], 'image': cont['image'], 'resources': cont['resources']}
status = list([s for s in pod.get('status', {}).get('containerStatuses', []) if s['name'] == cont['name']])
if status:
obj.update(**status[0])
return obj
def get_kubernetes_clusters():
for api_server_url in (os.getenv('CLUSTERS') or DEFAULT_CLUSTERS).split(','):
cluster_id = generate_cluster_id(api_server_url)
if 'localhost' not in api_server_url:
# TODO: hacky way of detecting whether we need a token or not
session.headers['Authorization'] = 'Bearer {}'.format(tokens.get('read-only'))
response = session.get(urljoin(api_server_url, '/api/v1/nodes'), timeout=5)
response.raise_for_status()
nodes = {}
pods_by_namespace_name = {}
unassigned_pods = {}
for node in response.json()['items']:
obj = map_node(node)
nodes[obj['name']] = obj
response = session.get(urljoin(api_server_url, '/api/v1/pods'), timeout=5)
response.raise_for_status()
for pod in response.json()['items']:
obj = map_pod(pod)
if 'deletionTimestamp' in pod['metadata']:
obj['deleted'] = datetime.datetime.strptime(pod['metadata']['deletionTimestamp'],
'%Y-%m-%dT%H:%M:%SZ').replace(
tzinfo=datetime.timezone.utc).timestamp()
for cont in pod['spec']['containers']:
obj['containers'].append(map_container(cont, pod))
pods_by_namespace_name[(obj['namespace'], obj['name'])] = obj
pod_key = '{}/{}'.format(obj['namespace'], obj['name'])
if 'nodeName' in pod['spec'] and pod['spec']['nodeName'] in nodes:
nodes[pod['spec']['nodeName']]['pods'][pod_key] = obj
else:
unassigned_pods[pod_key] = obj
try:
response = session.get(urljoin(api_server_url, '/api/v1/namespaces/kube-system/services/heapster/proxy/apis/metrics/v1alpha1/nodes'), timeout=5)
response.raise_for_status()
data = response.json()
if not data.get('items'):
logging.info('Heapster node metrics not available (yet)')
else:
for metrics in data['items']:
nodes[metrics['metadata']['name']]['usage'] = metrics['usage']
except:
logging.exception('Failed to get node metrics')
try:
response = session.get(urljoin(api_server_url,
'/api/v1/namespaces/kube-system/services/heapster/proxy/apis/metrics/v1alpha1/pods'),
timeout=5)
response.raise_for_status()
data = response.json()
if not data.get('items'):
logging.info('Heapster pod metrics not available (yet)')
else:
for metrics in data['items']:
pod = pods_by_namespace_name.get((metrics['metadata']['namespace'], metrics['metadata']['name']))
if pod:
for container in pod['containers']:
for container_metrics in metrics['containers']:
if container['name'] == container_metrics['name']:
container['resources']['usage'] = container_metrics['usage']
except:
logging.exception('Failed to get pod metrics')
yield {'id': cluster_id, 'api_server_url': api_server_url, 'nodes': nodes, 'unassigned_pods': unassigned_pods}
def event(cluster_ids: set):
# first sent full data once
for cluster_id in (STORE.get('cluster-ids') or []):