#90 move Memory/RedisStore out of main.py

This commit is contained in:
Henning Jacobs
2017-01-14 13:56:16 +01:00
parent e9da91d451
commit 323b1e5042
4 changed files with 341 additions and 129 deletions

122
kube_ops_view/kubernetes.py Normal file
View File

@@ -0,0 +1,122 @@
import datetime
import logging
import os
import re
from urllib.parse import urljoin
import requests
import tokens
CLUSTER_ID_INVALID_CHARS = re.compile('[^a-z0-9:-]')
DEFAULT_CLUSTERS = 'http://localhost:8001/'
tokens.configure(from_file_only=True)
tokens.manage('read-only')
session = requests.Session()
def generate_cluster_id(url: str):
'''Generate some "cluster ID" from given API server URL'''
for prefix in ('https://', 'http://'):
if url.startswith(prefix):
url = url[len(prefix):]
return CLUSTER_ID_INVALID_CHARS.sub('-', url.lower()).strip('-')
def map_node_status(status: dict):
return {
'addresses': status.get('addresses'),
'capacity': status.get('capacity'),
}
def map_node(node: dict):
return {
'name': node['metadata']['name'],
'labels': node['metadata']['labels'],
'status': map_node_status(node['status']),
'pods': {}
}
def map_pod(pod: dict):
return {
'name': pod['metadata']['name'],
'namespace': pod['metadata']['namespace'],
'labels': pod['metadata'].get('labels', {}),
'phase': pod['status'].get('phase'),
'startTime': pod['status']['startTime'] if 'startTime' in pod['status'] else '',
'containers': []
}
def map_container(cont: dict, pod: dict):
obj = {'name': cont['name'], 'image': cont['image'], 'resources': cont['resources']}
status = list([s for s in pod.get('status', {}).get('containerStatuses', []) if s['name'] == cont['name']])
if status:
obj.update(**status[0])
return obj
def get_kubernetes_clusters():
for api_server_url in (os.getenv('CLUSTERS') or DEFAULT_CLUSTERS).split(','):
cluster_id = generate_cluster_id(api_server_url)
if 'localhost' not in api_server_url:
# TODO: hacky way of detecting whether we need a token or not
session.headers['Authorization'] = 'Bearer {}'.format(tokens.get('read-only'))
response = session.get(urljoin(api_server_url, '/api/v1/nodes'), timeout=5)
response.raise_for_status()
nodes = {}
pods_by_namespace_name = {}
unassigned_pods = {}
for node in response.json()['items']:
obj = map_node(node)
nodes[obj['name']] = obj
response = session.get(urljoin(api_server_url, '/api/v1/pods'), timeout=5)
response.raise_for_status()
for pod in response.json()['items']:
obj = map_pod(pod)
if 'deletionTimestamp' in pod['metadata']:
obj['deleted'] = datetime.datetime.strptime(pod['metadata']['deletionTimestamp'],
'%Y-%m-%dT%H:%M:%SZ').replace(
tzinfo=datetime.timezone.utc).timestamp()
for cont in pod['spec']['containers']:
obj['containers'].append(map_container(cont, pod))
pods_by_namespace_name[(obj['namespace'], obj['name'])] = obj
pod_key = '{}/{}'.format(obj['namespace'], obj['name'])
if 'nodeName' in pod['spec'] and pod['spec']['nodeName'] in nodes:
nodes[pod['spec']['nodeName']]['pods'][pod_key] = obj
else:
unassigned_pods[pod_key] = obj
try:
response = session.get(urljoin(api_server_url, '/api/v1/namespaces/kube-system/services/heapster/proxy/apis/metrics/v1alpha1/nodes'), timeout=5)
response.raise_for_status()
data = response.json()
if not data.get('items'):
logging.info('Heapster node metrics not available (yet)')
else:
for metrics in data['items']:
nodes[metrics['metadata']['name']]['usage'] = metrics['usage']
except:
logging.exception('Failed to get node metrics')
try:
response = session.get(urljoin(api_server_url,
'/api/v1/namespaces/kube-system/services/heapster/proxy/apis/metrics/v1alpha1/pods'),
timeout=5)
response.raise_for_status()
data = response.json()
if not data.get('items'):
logging.info('Heapster pod metrics not available (yet)')
else:
for metrics in data['items']:
pod = pods_by_namespace_name.get((metrics['metadata']['namespace'], metrics['metadata']['name']))
if pod:
for container in pod['containers']:
for container_metrics in metrics['containers']:
if container['name'] == container_metrics['name']:
container['resources']['usage'] = container_metrics['usage']
except:
logging.exception('Failed to get pod metrics')
yield {'id': cluster_id, 'api_server_url': api_server_url, 'nodes': nodes, 'unassigned_pods': unassigned_pods}

View File

@@ -12,14 +12,9 @@ import json
import json_delta import json_delta
import logging import logging
import os import os
import random
import redis
import signal import signal
import string
import time import time
from pathlib import Path from pathlib import Path
from queue import Queue
from redlock import Redlock
from flask import Flask, redirect from flask import Flask, redirect
from flask_oauthlib.client import OAuth, OAuthRemoteApp from flask_oauthlib.client import OAuth, OAuthRemoteApp
@@ -27,135 +22,12 @@ from urllib.parse import urljoin
from .mock import get_mock_clusters from .mock import get_mock_clusters
from .kubernetes import get_kubernetes_clusters from .kubernetes import get_kubernetes_clusters
from .stores import MemoryStore, RedisStore
ONE_YEAR = 3600 * 24 * 365
logging.basicConfig(level=logging.INFO) logging.basicConfig(level=logging.INFO)
def generate_token(n: int):
'''Generate a random ASCII token of length n'''
# uses os.urandom()
rng = random.SystemRandom()
return ''.join([rng.choice(string.ascii_letters + string.digits) for i in range(n)])
def generate_token_data():
'''Generate screen token data for storing'''
token = generate_token(10)
now = time.time()
return {'token': token, 'created': now, 'expires': now + ONE_YEAR}
def check_token(token: str, remote_addr: str, data: dict):
'''Check whether the given screen token is valid, raises exception if not'''
now = time.time()
if data and now < data['expires'] and data.get('remote_addr', remote_addr) == remote_addr:
data['remote_addr'] = remote_addr
return data
else:
raise ValueError('Invalid token')
class MemoryStore:
'''Memory-only backend, mostly useful for local debugging'''
def __init__(self):
self._data = {}
self._queues = []
self._screen_tokens = {}
def set(self, key, value):
self._data[key] = value
def get(self, key):
return self._data.get(key)
def acquire_lock(self):
# no-op for memory store
return 'fake-lock'
def release_lock(self, lock):
# no op for memory store
pass
def publish(self, event_type, event_data):
for queue in self._queues:
queue.put((event_type, event_data))
def listen(self):
queue = Queue()
self._queues.append(queue)
try:
while True:
item = queue.get()
yield item
finally:
self._queues.remove(queue)
def create_screen_token(self):
data = generate_token_data()
token = data['token']
self._screen_tokens[token] = data
return token
def redeem_screen_token(self, token: str, remote_addr: str):
data = self._screen_tokens.get(token)
data = check_token(token, remote_addr, data)
self._screen_tokens[token] = data
class RedisStore:
'''Redis-based backend for deployments with replicas > 1'''
def __init__(self, url: str):
logging.info('Connecting to Redis on {}..'.format(url))
self._redis = redis.StrictRedis.from_url(url)
self._redlock = Redlock([url])
def set(self, key, value):
self._redis.set(key, json.dumps(value, separators=(',', ':')))
def get(self, key):
value = self._redis.get(key)
if value:
return json.loads(value.decode('utf-8'))
def acquire_lock(self):
return self._redlock.lock('update', 10000)
def release_lock(self, lock):
self._redlock.unlock(lock)
def publish(self, event_type, event_data):
self._redis.publish('default', '{}:{}'.format(event_type, json.dumps(event_data, separators=(',', ':'))))
def listen(self):
p = self._redis.pubsub()
p.subscribe('default')
for message in p.listen():
if message['type'] == 'message':
event_type, data = message['data'].decode('utf-8').split(':', 1)
yield (event_type, json.loads(data))
def create_screen_token(self):
'''Generate a new screen token and store it in Redis'''
data = generate_token_data()
token = data['token']
self._redis.set('screen-tokens:{}'.format(token), json.dumps(data))
return token
def redeem_screen_token(self, token: str, remote_addr: str):
'''Validate the given token and bind it to the IP'''
redis_key = 'screen-tokens:{}'.format(token)
data = self._redis.get(redis_key)
if not data:
raise ValueError('Invalid token')
data = json.loads(data.decode('utf-8'))
data = check_token(token, remote_addr, data)
self._redis.set(redis_key, json.dumps(data))
def get_bool(name: str): def get_bool(name: str):
return os.getenv(name, '').lower() in ('1', 'true') return os.getenv(name, '').lower() in ('1', 'true')

84
kube_ops_view/mock.py Normal file
View File

@@ -0,0 +1,84 @@
import time
def hash_int(x: int):
x = ((x >> 16) ^ x) * 0x45d9f3b
x = ((x >> 16) ^ x) * 0x45d9f3b
x = (x >> 16) ^ x
return x
def generate_mock_pod(index: int, i: int, j: int):
names = [
'agent-cooper',
'black-lodge',
'bob',
'bobby-briggs',
'laura-palmer',
'leland-palmer',
'log-lady',
'sheriff-truman',
]
pod_phases = ['Pending', 'Running', 'Running']
labels = {}
phase = pod_phases[hash_int((index + 1) * (i + 1) * (j + 1)) % len(pod_phases)]
containers = []
for k in range(1 + j % 2):
container = {
'name': 'myapp', 'image': 'foo/bar/{}'.format(j), 'resources': {'requests': {'cpu': '100m', 'memory': '100Mi'}, 'limits': {}},
'ready': True,
'state': {'running': {}}
}
if phase == 'Running':
if j % 13 == 0:
container.update(**{'ready': False, 'state': {'waiting': {'reason': 'CrashLoopBackOff'}}})
elif j % 7 == 0:
container.update(**{'ready': True, 'state': {'running': {}}, 'restartCount': 3})
containers.append(container)
pod = {
'name': '{}-{}-{}'.format(names[hash_int((i + 1) * (j + 1)) % len(names)], i, j),
'namespace': 'kube-system' if j < 3 else 'default',
'labels': labels,
'phase': phase,
'containers': containers
}
if phase == 'Running' and j % 17 == 0:
pod['deleted'] = 123
return pod
def generate_mock_cluster_data(index: int):
'''Generate deterministic (no randomness!) mock data'''
nodes = {}
for i in range(10):
# add/remove the second to last node every 13 seconds
if i == 8 and int(time.time() / 13) % 2 == 0:
continue
labels = {}
if i < 2:
labels['master'] = 'true'
pods = {}
for j in range(hash_int((index + 1) * (i + 1)) % 32):
# add/remove some pods every 7 seconds
if j % 17 == 0 and int(time.time() / 7) % 2 == 0:
pass
else:
pod = generate_mock_pod(index, i, j)
pods['{}/{}'.format(pod['namespace'], pod['name'])] = pod
node = {'name': 'node-{}'.format(i), 'labels': labels, 'status': {'capacity': {'cpu': '4', 'memory': '32Gi', 'pods': '110'}}, 'pods': pods}
nodes[node['name']] = node
pod = generate_mock_pod(index, 11, index)
unassigned_pods = {'{}/{}'.format(pod['namespace'], pod['name']): pod}
return {
'id': 'mock-cluster-{}'.format(index),
'api_server_url': 'https://kube-{}.example.org'.format(index),
'nodes': nodes,
'unassigned_pods': unassigned_pods
}
def get_mock_clusters():
for i in range(3):
data = generate_mock_cluster_data(i)
yield data

134
kube_ops_view/stores.py Normal file
View File

@@ -0,0 +1,134 @@
import json
import logging
import random
import string
import redis
import time
from redlock import Redlock
from queue import Queue
ONE_YEAR = 3600 * 24 * 365
def generate_token(n: int):
'''Generate a random ASCII token of length n'''
# uses os.urandom()
rng = random.SystemRandom()
return ''.join([rng.choice(string.ascii_letters + string.digits) for i in range(n)])
def generate_token_data():
'''Generate screen token data for storing'''
token = generate_token(10)
now = time.time()
return {'token': token, 'created': now, 'expires': now + ONE_YEAR}
def check_token(token: str, remote_addr: str, data: dict):
'''Check whether the given screen token is valid, raises exception if not'''
now = time.time()
if data and now < data['expires'] and data.get('remote_addr', remote_addr) == remote_addr:
data['remote_addr'] = remote_addr
return data
else:
raise ValueError('Invalid token')
class MemoryStore:
'''Memory-only backend, mostly useful for local debugging'''
def __init__(self):
self._data = {}
self._queues = []
self._screen_tokens = {}
def set(self, key, value):
self._data[key] = value
def get(self, key):
return self._data.get(key)
def acquire_lock(self):
# no-op for memory store
return 'fake-lock'
def release_lock(self, lock):
# no op for memory store
pass
def publish(self, event_type, event_data):
for queue in self._queues:
queue.put((event_type, event_data))
def listen(self):
queue = Queue()
self._queues.append(queue)
try:
while True:
item = queue.get()
yield item
finally:
self._queues.remove(queue)
def create_screen_token(self):
data = generate_token_data()
token = data['token']
self._screen_tokens[token] = data
return token
def redeem_screen_token(self, token: str, remote_addr: str):
data = self._screen_tokens.get(token)
data = check_token(token, remote_addr, data)
self._screen_tokens[token] = data
class RedisStore:
'''Redis-based backend for deployments with replicas > 1'''
def __init__(self, url: str):
logging.info('Connecting to Redis on {}..'.format(url))
self._redis = redis.StrictRedis.from_url(url)
self._redlock = Redlock([url])
def set(self, key, value):
self._redis.set(key, json.dumps(value, separators=(',', ':')))
def get(self, key):
value = self._redis.get(key)
if value:
return json.loads(value.decode('utf-8'))
def acquire_lock(self):
return self._redlock.lock('update', 10000)
def release_lock(self, lock):
self._redlock.unlock(lock)
def publish(self, event_type, event_data):
self._redis.publish('default', '{}:{}'.format(event_type, json.dumps(event_data, separators=(',', ':'))))
def listen(self):
p = self._redis.pubsub()
p.subscribe('default')
for message in p.listen():
if message['type'] == 'message':
event_type, data = message['data'].decode('utf-8').split(':', 1)
yield (event_type, json.loads(data))
def create_screen_token(self):
'''Generate a new screen token and store it in Redis'''
data = generate_token_data()
token = data['token']
self._redis.set('screen-tokens:{}'.format(token), json.dumps(data))
return token
def redeem_screen_token(self, token: str, remote_addr: str):
'''Validate the given token and bind it to the IP'''
redis_key = 'screen-tokens:{}'.format(token)
data = self._redis.get(redis_key)
if not data:
raise ValueError('Invalid token')
data = json.loads(data.decode('utf-8'))
data = check_token(token, remote_addr, data)
self._redis.set(redis_key, json.dumps(data))