#96 move error handling into update loop
This commit is contained in:
@@ -137,3 +137,10 @@ class KubeconfigDiscoverer:
|
|||||||
ssl_ca_cert=config.ssl_ca_cert,
|
ssl_ca_cert=config.ssl_ca_cert,
|
||||||
auth=StaticTokenAuth(config.api_key['authorization'].split(' ', 1)[-1]))
|
auth=StaticTokenAuth(config.api_key['authorization'].split(' ', 1)[-1]))
|
||||||
yield cluster
|
yield cluster
|
||||||
|
|
||||||
|
|
||||||
|
class MockDiscoverer:
|
||||||
|
|
||||||
|
def get_clusters(self):
|
||||||
|
for i in range(3):
|
||||||
|
yield Cluster('mock-cluster-{}'.format(i), api_server_url='https://kube-{}.example.org'.format(i))
|
||||||
|
|||||||
@@ -51,7 +51,7 @@ def request(cluster, path, **kwargs):
|
|||||||
return session.get(urljoin(cluster.api_server_url, path), auth=cluster.auth, verify=cluster.ssl_ca_cert, **kwargs)
|
return session.get(urljoin(cluster.api_server_url, path), auth=cluster.auth, verify=cluster.ssl_ca_cert, **kwargs)
|
||||||
|
|
||||||
|
|
||||||
def get_kubernetes_cluster(cluster):
|
def query_kubernetes_cluster(cluster):
|
||||||
cluster_id = cluster.id
|
cluster_id = cluster.id
|
||||||
api_server_url = cluster.api_server_url
|
api_server_url = cluster.api_server_url
|
||||||
response = request(cluster, '/api/v1/nodes')
|
response = request(cluster, '/api/v1/nodes')
|
||||||
@@ -107,12 +107,3 @@ def get_kubernetes_cluster(cluster):
|
|||||||
except:
|
except:
|
||||||
logger.exception('Failed to get pod metrics')
|
logger.exception('Failed to get pod metrics')
|
||||||
return {'id': cluster_id, 'api_server_url': api_server_url, 'nodes': nodes, 'unassigned_pods': unassigned_pods}
|
return {'id': cluster_id, 'api_server_url': api_server_url, 'nodes': nodes, 'unassigned_pods': unassigned_pods}
|
||||||
|
|
||||||
|
|
||||||
def get_kubernetes_clusters(cluster_discoverer):
|
|
||||||
for cluster in cluster_discoverer.get_clusters():
|
|
||||||
try:
|
|
||||||
data = get_kubernetes_cluster(cluster)
|
|
||||||
yield data
|
|
||||||
except:
|
|
||||||
logger.exception('Failed to query cluster {} ({})'.format(cluster.id, cluster.api_server_url))
|
|
||||||
|
|||||||
@@ -10,7 +10,6 @@ import functools
|
|||||||
import gevent
|
import gevent
|
||||||
import gevent.wsgi
|
import gevent.wsgi
|
||||||
import json
|
import json
|
||||||
import json_delta
|
|
||||||
import logging
|
import logging
|
||||||
import os
|
import os
|
||||||
import signal
|
import signal
|
||||||
@@ -23,10 +22,11 @@ from flask_oauthlib.client import OAuth
|
|||||||
from .oauth import OAuthRemoteAppWithRefresh
|
from .oauth import OAuthRemoteAppWithRefresh
|
||||||
from urllib.parse import urljoin
|
from urllib.parse import urljoin
|
||||||
|
|
||||||
from .mock import get_mock_clusters
|
from .mock import query_mock_cluster
|
||||||
from .kubernetes import get_kubernetes_clusters
|
from .kubernetes import query_kubernetes_cluster
|
||||||
from .stores import MemoryStore, RedisStore
|
from .stores import MemoryStore, RedisStore
|
||||||
from .cluster_discovery import DEFAULT_CLUSTERS, StaticClusterDiscoverer, ClusterRegistryDiscoverer, KubeconfigDiscoverer
|
from .cluster_discovery import DEFAULT_CLUSTERS, StaticClusterDiscoverer, ClusterRegistryDiscoverer, KubeconfigDiscoverer, MockDiscoverer
|
||||||
|
from .update import update_clusters
|
||||||
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
@@ -155,35 +155,6 @@ def authorized():
|
|||||||
return redirect(urljoin(APP_URL, '/'))
|
return redirect(urljoin(APP_URL, '/'))
|
||||||
|
|
||||||
|
|
||||||
def update(cluster_discoverer, store, mock: bool):
|
|
||||||
while True:
|
|
||||||
lock = store.acquire_lock()
|
|
||||||
if lock:
|
|
||||||
try:
|
|
||||||
if mock:
|
|
||||||
_clusters = get_mock_clusters()
|
|
||||||
else:
|
|
||||||
_clusters = get_kubernetes_clusters(cluster_discoverer)
|
|
||||||
cluster_ids = []
|
|
||||||
for cluster in _clusters:
|
|
||||||
old_data = store.get(cluster['id'])
|
|
||||||
if old_data:
|
|
||||||
# https://pikacode.com/phijaro/json_delta/ticket/11/
|
|
||||||
# diff is extremely slow without array_align=False
|
|
||||||
delta = json_delta.diff(old_data, cluster, verbose=app.debug, array_align=False)
|
|
||||||
store.publish('clusterdelta', {'cluster_id': cluster['id'], 'delta': delta})
|
|
||||||
else:
|
|
||||||
store.publish('clusterupdate', cluster)
|
|
||||||
store.set(cluster['id'], cluster)
|
|
||||||
cluster_ids.append(cluster['id'])
|
|
||||||
store.set('cluster-ids', cluster_ids)
|
|
||||||
except:
|
|
||||||
logger.exception('Failed to update')
|
|
||||||
finally:
|
|
||||||
store.release_lock(lock)
|
|
||||||
gevent.sleep(5)
|
|
||||||
|
|
||||||
|
|
||||||
def shutdown():
|
def shutdown():
|
||||||
# just wait some time to give Kubernetes time to update endpoints
|
# just wait some time to give Kubernetes time to update endpoints
|
||||||
# this requires changing the readinessProbe's
|
# this requires changing the readinessProbe's
|
||||||
@@ -227,15 +198,20 @@ def main(port, debug, mock, secret_key, redis_url, clusters, cluster_registry_ur
|
|||||||
app.secret_key = secret_key
|
app.secret_key = secret_key
|
||||||
app.store = store
|
app.store = store
|
||||||
|
|
||||||
if cluster_registry_url:
|
if mock:
|
||||||
discoverer = ClusterRegistryDiscoverer(cluster_registry_url)
|
cluster_query = query_mock_cluster
|
||||||
elif kubeconfig_path:
|
discoverer = MockDiscoverer()
|
||||||
discoverer = KubeconfigDiscoverer(Path(kubeconfig_path))
|
|
||||||
else:
|
else:
|
||||||
api_server_urls = clusters.split(',') if clusters else []
|
cluster_query = query_kubernetes_cluster
|
||||||
discoverer = StaticClusterDiscoverer(api_server_urls)
|
if cluster_registry_url:
|
||||||
|
discoverer = ClusterRegistryDiscoverer(cluster_registry_url)
|
||||||
|
elif kubeconfig_path:
|
||||||
|
discoverer = KubeconfigDiscoverer(Path(kubeconfig_path))
|
||||||
|
else:
|
||||||
|
api_server_urls = clusters.split(',') if clusters else []
|
||||||
|
discoverer = StaticClusterDiscoverer(api_server_urls)
|
||||||
|
|
||||||
gevent.spawn(update, cluster_discoverer=discoverer, store=store, mock=mock)
|
gevent.spawn(update_clusters, cluster_discoverer=discoverer, query_cluster=cluster_query, store=store, debug=debug)
|
||||||
|
|
||||||
signal.signal(signal.SIGTERM, exit_gracefully)
|
signal.signal(signal.SIGTERM, exit_gracefully)
|
||||||
http_server = gevent.wsgi.WSGIServer(('0.0.0.0', port), app)
|
http_server = gevent.wsgi.WSGIServer(('0.0.0.0', port), app)
|
||||||
|
|||||||
@@ -48,8 +48,9 @@ def generate_mock_pod(index: int, i: int, j: int):
|
|||||||
return pod
|
return pod
|
||||||
|
|
||||||
|
|
||||||
def generate_mock_cluster_data(index: int):
|
def query_mock_cluster(cluster):
|
||||||
'''Generate deterministic (no randomness!) mock data'''
|
'''Generate deterministic (no randomness!) mock data'''
|
||||||
|
index = int(cluster.id.split('-')[-1])
|
||||||
nodes = {}
|
nodes = {}
|
||||||
for i in range(10):
|
for i in range(10):
|
||||||
# add/remove the second to last node every 13 seconds
|
# add/remove the second to last node every 13 seconds
|
||||||
@@ -76,9 +77,3 @@ def generate_mock_cluster_data(index: int):
|
|||||||
'nodes': nodes,
|
'nodes': nodes,
|
||||||
'unassigned_pods': unassigned_pods
|
'unassigned_pods': unassigned_pods
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
def get_mock_clusters():
|
|
||||||
for i in range(3):
|
|
||||||
data = generate_mock_cluster_data(i)
|
|
||||||
yield data
|
|
||||||
|
|||||||
@@ -1,5 +1,9 @@
|
|||||||
from kube_ops_view.mock import get_mock_clusters
|
from kube_ops_view.cluster_discovery import MockDiscoverer
|
||||||
|
from kube_ops_view.mock import query_mock_cluster
|
||||||
|
|
||||||
|
|
||||||
def test_get_mock_clusters():
|
def test_query_mock_clusters():
|
||||||
get_mock_clusters()
|
discoverer = MockDiscoverer()
|
||||||
|
for cluster in discoverer.get_clusters():
|
||||||
|
data = query_mock_cluster(cluster)
|
||||||
|
assert data['id'].startswith('mock-cluster-')
|
||||||
|
|||||||
Reference in New Issue
Block a user