#73 implement --cluster-registry-url option
This commit is contained in:
@@ -1,13 +1,30 @@
|
||||
import time
|
||||
from urllib.parse import urljoin
|
||||
|
||||
import kubernetes.client
|
||||
import kubernetes.config
|
||||
import logging
|
||||
import re
|
||||
import requests
|
||||
import tokens
|
||||
from requests.auth import AuthBase
|
||||
|
||||
DEFAULT_CLUSTERS = 'http://localhost:8001/'
|
||||
CLUSTER_ID_INVALID_CHARS = re.compile('[^a-z0-9:-]')
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
tokens.configure(from_file_only=True)
|
||||
|
||||
|
||||
def generate_cluster_id(url: str):
|
||||
'''Generate some "cluster ID" from given API server URL'''
|
||||
for prefix in ('https://', 'http://'):
|
||||
if url.startswith(prefix):
|
||||
url = url[len(prefix):]
|
||||
return CLUSTER_ID_INVALID_CHARS.sub('-', url.lower()).strip('-')
|
||||
|
||||
|
||||
class StaticTokenAuth(AuthBase):
|
||||
def __init__(self, token):
|
||||
self.token = token
|
||||
@@ -29,7 +46,8 @@ class OAuthTokenAuth(AuthBase):
|
||||
|
||||
|
||||
class Cluster:
|
||||
def __init__(self, api_server_url, ssl_ca_cert=None, auth=None):
|
||||
def __init__(self, id, api_server_url, ssl_ca_cert=None, auth=None):
|
||||
self.id = id
|
||||
self.api_server_url = api_server_url
|
||||
self.ssl_ca_cert = ssl_ca_cert
|
||||
self.auth = auth
|
||||
@@ -37,17 +55,20 @@ class Cluster:
|
||||
|
||||
class StaticClusterDiscoverer:
|
||||
|
||||
def __init__(self, api_server_urls):
|
||||
def __init__(self, api_server_urls: list):
|
||||
self._clusters = []
|
||||
|
||||
if not api_server_urls:
|
||||
try:
|
||||
kubernetes.config.load_incluster_config()
|
||||
except kubernetes.config.ConfigException:
|
||||
cluster = Cluster('http://localhost:8001')
|
||||
# we are not running inside a cluster
|
||||
# => assume default kubectl proxy URL
|
||||
cluster = Cluster(generate_cluster_id(DEFAULT_CLUSTERS), DEFAULT_CLUSTERS)
|
||||
else:
|
||||
config = kubernetes.client.configuration
|
||||
cluster = Cluster(
|
||||
generate_cluster_id(config.host),
|
||||
config.host,
|
||||
ssl_ca_cert=config.ssl_ca_cert,
|
||||
auth=StaticTokenAuth(config.api_key['authorization'].split(' ', 1)[-1]))
|
||||
@@ -60,7 +81,38 @@ class StaticClusterDiscoverer:
|
||||
auth = OAuthTokenAuth('read-only')
|
||||
else:
|
||||
auth = None
|
||||
self._clusters.append(Cluster(api_server_url, auth=auth))
|
||||
self._clusters.append(Cluster(generate_cluster_id(api_server_url), api_server_url, auth=auth))
|
||||
|
||||
def get_clusters(self):
|
||||
return self._clusters
|
||||
|
||||
|
||||
class ClusterRegistryDiscoverer:
|
||||
|
||||
def __init__(self, cluster_registry_url: str, cache_lifetime=60):
|
||||
self._url = cluster_registry_url
|
||||
self._cache_lifetime = cache_lifetime
|
||||
self._last_cache_refresh = 0
|
||||
self._clusters = []
|
||||
self._session = requests.Session()
|
||||
self._session.auth = OAuthTokenAuth('read-only')
|
||||
|
||||
def refresh(self):
|
||||
try:
|
||||
response = self._session.get(urljoin(self._url, '/kubernetes-clusters'), timeout=10)
|
||||
response.raise_for_status()
|
||||
clusters = []
|
||||
for row in response.json()['items']:
|
||||
# only consider "ready" clusters
|
||||
if row.get('lifecycle_status', 'ready') == 'ready':
|
||||
clusters.append(Cluster(row['id'], row['api_server_url']))
|
||||
self._clusters = clusters
|
||||
self._last_cache_refresh = time.time()
|
||||
except:
|
||||
logger.exception('Failed to refresh from cluster registry {}'.format(self._url))
|
||||
|
||||
def get_clusters(self):
|
||||
now = time.time()
|
||||
if now - self._last_cache_refresh > self._cache_lifetime:
|
||||
self.refresh()
|
||||
return self._clusters
|
||||
|
||||
@@ -1,23 +1,14 @@
|
||||
import datetime
|
||||
import logging
|
||||
import re
|
||||
from urllib.parse import urljoin
|
||||
|
||||
import requests
|
||||
|
||||
CLUSTER_ID_INVALID_CHARS = re.compile('[^a-z0-9:-]')
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
session = requests.Session()
|
||||
|
||||
|
||||
def generate_cluster_id(url: str):
|
||||
'''Generate some "cluster ID" from given API server URL'''
|
||||
for prefix in ('https://', 'http://'):
|
||||
if url.startswith(prefix):
|
||||
url = url[len(prefix):]
|
||||
return CLUSTER_ID_INVALID_CHARS.sub('-', url.lower()).strip('-')
|
||||
|
||||
|
||||
def map_node_status(status: dict):
|
||||
return {
|
||||
'addresses': status.get('addresses'),
|
||||
@@ -62,8 +53,8 @@ def request(cluster, path, **kwargs):
|
||||
|
||||
def get_kubernetes_clusters(cluster_discoverer):
|
||||
for cluster in cluster_discoverer.get_clusters():
|
||||
cluster_id = cluster.id
|
||||
api_server_url = cluster.api_server_url
|
||||
cluster_id = generate_cluster_id(api_server_url)
|
||||
response = request(cluster, '/api/v1/nodes')
|
||||
response.raise_for_status()
|
||||
nodes = {}
|
||||
@@ -94,18 +85,18 @@ def get_kubernetes_clusters(cluster_discoverer):
|
||||
response.raise_for_status()
|
||||
data = response.json()
|
||||
if not data.get('items'):
|
||||
logging.info('Heapster node metrics not available (yet)')
|
||||
logger.info('Heapster node metrics not available (yet)')
|
||||
else:
|
||||
for metrics in data['items']:
|
||||
nodes[metrics['metadata']['name']]['usage'] = metrics['usage']
|
||||
except:
|
||||
logging.exception('Failed to get node metrics')
|
||||
logger.exception('Failed to get node metrics')
|
||||
try:
|
||||
response = request(cluster, '/api/v1/namespaces/kube-system/services/heapster/proxy/apis/metrics/v1alpha1/pods')
|
||||
response.raise_for_status()
|
||||
data = response.json()
|
||||
if not data.get('items'):
|
||||
logging.info('Heapster pod metrics not available (yet)')
|
||||
logger.info('Heapster pod metrics not available (yet)')
|
||||
else:
|
||||
for metrics in data['items']:
|
||||
pod = pods_by_namespace_name.get((metrics['metadata']['namespace'], metrics['metadata']['name']))
|
||||
@@ -115,5 +106,5 @@ def get_kubernetes_clusters(cluster_discoverer):
|
||||
if container['name'] == container_metrics['name']:
|
||||
container['resources']['usage'] = container_metrics['usage']
|
||||
except:
|
||||
logging.exception('Failed to get pod metrics')
|
||||
logger.exception('Failed to get pod metrics')
|
||||
yield {'id': cluster_id, 'api_server_url': api_server_url, 'nodes': nodes, 'unassigned_pods': unassigned_pods}
|
||||
|
||||
@@ -26,7 +26,7 @@ from urllib.parse import urljoin
|
||||
from .mock import get_mock_clusters
|
||||
from .kubernetes import get_kubernetes_clusters
|
||||
from .stores import MemoryStore, RedisStore
|
||||
from .cluster_discovery import DEFAULT_CLUSTERS, StaticClusterDiscoverer
|
||||
from .cluster_discovery import DEFAULT_CLUSTERS, StaticClusterDiscoverer, ClusterRegistryDiscoverer
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -216,7 +216,8 @@ def print_version(ctx, param, value):
|
||||
@click.option('--redis-url', help='Redis URL to use for pub/sub and job locking', envvar='REDIS_URL')
|
||||
@click.option('--clusters', help='Comma separated list of Kubernetes API server URLs (default: {})'.format(DEFAULT_CLUSTERS),
|
||||
envvar='CLUSTERS')
|
||||
def main(port, debug, mock, secret_key, redis_url, clusters):
|
||||
@click.option('--cluster-registry-url', help='URL to cluster registry', envvar='CLUSTER_REGISTRY_URL')
|
||||
def main(port, debug, mock, secret_key, redis_url, clusters, cluster_registry_url):
|
||||
logging.basicConfig(level=logging.DEBUG if debug else logging.INFO)
|
||||
|
||||
store = RedisStore(redis_url) if redis_url else MemoryStore()
|
||||
@@ -225,8 +226,13 @@ def main(port, debug, mock, secret_key, redis_url, clusters):
|
||||
app.secret_key = secret_key
|
||||
app.store = store
|
||||
|
||||
api_server_urls = clusters.split(',') if clusters else []
|
||||
gevent.spawn(update, cluster_discoverer=StaticClusterDiscoverer(api_server_urls), store=store, mock=mock)
|
||||
if cluster_registry_url:
|
||||
discoverer = ClusterRegistryDiscoverer(cluster_registry_url)
|
||||
else:
|
||||
api_server_urls = clusters.split(',') if clusters else []
|
||||
discoverer = StaticClusterDiscoverer(api_server_urls)
|
||||
|
||||
gevent.spawn(update, cluster_discoverer=discoverer, store=store, mock=mock)
|
||||
|
||||
signal.signal(signal.SIGTERM, exit_gracefully)
|
||||
http_server = gevent.wsgi.WSGIServer(('0.0.0.0', port), app)
|
||||
|
||||
@@ -8,6 +8,8 @@ import time
|
||||
from redlock import Redlock
|
||||
from queue import Queue
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
ONE_YEAR = 3600 * 24 * 365
|
||||
|
||||
|
||||
@@ -87,7 +89,7 @@ class RedisStore:
|
||||
'''Redis-based backend for deployments with replicas > 1'''
|
||||
|
||||
def __init__(self, url: str):
|
||||
logging.info('Connecting to Redis on {}..'.format(url))
|
||||
logger.info('Connecting to Redis on {}..'.format(url))
|
||||
self._redis = redis.StrictRedis.from_url(url)
|
||||
self._redlock = Redlock([url])
|
||||
|
||||
|
||||
Reference in New Issue
Block a user