diff --git a/kube_ops_view/main.py b/kube_ops_view/main.py index 9d3643e..ec59705 100644 --- a/kube_ops_view/main.py +++ b/kube_ops_view/main.py @@ -86,9 +86,9 @@ def index(): def event(cluster_ids: set): # first sent full data once - for cluster_id in (app.store.get('cluster-ids') or []): + for cluster_id in app.store.get_cluster_ids(): if not cluster_ids or cluster_id in cluster_ids: - cluster = app.store.get(cluster_id) + cluster = app.store.get_cluster_data(cluster_id) if cluster: yield 'event: clusterupdate\ndata: ' + json.dumps(cluster, separators=(',', ':')) + '\n\n' while True: diff --git a/kube_ops_view/stores.py b/kube_ops_view/stores.py index e4cd850..0ed3b4f 100644 --- a/kube_ops_view/stores.py +++ b/kube_ops_view/stores.py @@ -37,7 +37,28 @@ def check_token(token: str, remote_addr: str, data: dict): raise ValueError('Invalid token') -class MemoryStore: +class AbstractStore: + + def get_cluster_ids(self): + return self.get('cluster-ids') or [] + + def set_cluster_ids(self, cluster_ids: set): + self.set('cluster-ids', list(sorted(cluster_ids))) + + def get_cluster_status(self, cluster_id: str) -> dict: + return self.get('clusters:{}:status'.format(cluster_id)) or {} + + def set_cluster_status(self, cluster_id: str, status: dict): + self.set('clusters:{}:status'.format(cluster_id), status) + + def get_cluster_data(self, cluster_id: str) -> dict: + return self.get('clusters:{}:data'.format(cluster_id)) or {} + + def set_cluster_data(self, cluster_id: str, data: dict): + self.set('clusters:{}:data'.format(cluster_id), data) + + +class MemoryStore(AbstractStore): '''Memory-only backend, mostly useful for local debugging''' def __init__(self): @@ -85,7 +106,7 @@ class MemoryStore: self._screen_tokens[token] = data -class RedisStore: +class RedisStore(AbstractStore): '''Redis-based backend for deployments with replicas > 1''' def __init__(self, url: str): diff --git a/kube_ops_view/update.py b/kube_ops_view/update.py index e6858ff..a3c7a24 100644 --- a/kube_ops_view/update.py +++ b/kube_ops_view/update.py @@ -41,8 +41,7 @@ def update_clusters(cluster_discoverer, query_cluster: callable, store, query_in cluster_ids = set() for cluster in clusters: cluster_ids.add(cluster.id) - status_key = '{}:status'.format(cluster.id) - status = store.get(status_key) or {} + status = store.get_cluster_status(cluster.id) now = time.time() if now < status.get('last_query_time', 0) + query_interval: continue @@ -61,20 +60,20 @@ def update_clusters(cluster_discoverer, query_cluster: callable, store, query_in if backoff: logger.info('Cluster {} ({}) recovered after {} tries.'.format(cluster.id, cluster.api_server_url, backoff['tries'])) del status['backoff'] - old_data = store.get(data['id']) + old_data = store.get_cluster_data(data['id']) if old_data: # https://pikacode.com/phijaro/json_delta/ticket/11/ # diff is extremely slow without array_align=False delta = json_delta.diff(old_data, data, verbose=debug, array_align=False) store.publish('clusterdelta', {'cluster_id': cluster.id, 'delta': delta}) if delta: - store.set(cluster.id, data) + store.set_cluster_data(cluster.id, data) else: logger.info('Discovered new cluster {} ({}).'.format(cluster.id, cluster.api_server_url)) store.publish('clusterupdate', data) - store.set(cluster.id, data) - store.set(status_key, status) - store.set('cluster-ids', list(sorted(cluster_ids))) + store.set_cluster_data(cluster.id, data) + store.set_cluster_status(cluster.id, status) + store.set_cluster_ids(cluster_ids) except: logger.exception('Failed to update') finally: