move Redis key formatting to AbstractStore
This commit is contained in:
@@ -86,9 +86,9 @@ def index():
|
|||||||
|
|
||||||
def event(cluster_ids: set):
|
def event(cluster_ids: set):
|
||||||
# first sent full data once
|
# first sent full data once
|
||||||
for cluster_id in (app.store.get('cluster-ids') or []):
|
for cluster_id in app.store.get_cluster_ids():
|
||||||
if not cluster_ids or cluster_id in cluster_ids:
|
if not cluster_ids or cluster_id in cluster_ids:
|
||||||
cluster = app.store.get(cluster_id)
|
cluster = app.store.get_cluster_data(cluster_id)
|
||||||
if cluster:
|
if cluster:
|
||||||
yield 'event: clusterupdate\ndata: ' + json.dumps(cluster, separators=(',', ':')) + '\n\n'
|
yield 'event: clusterupdate\ndata: ' + json.dumps(cluster, separators=(',', ':')) + '\n\n'
|
||||||
while True:
|
while True:
|
||||||
|
|||||||
@@ -37,7 +37,28 @@ def check_token(token: str, remote_addr: str, data: dict):
|
|||||||
raise ValueError('Invalid token')
|
raise ValueError('Invalid token')
|
||||||
|
|
||||||
|
|
||||||
class MemoryStore:
|
class AbstractStore:
|
||||||
|
|
||||||
|
def get_cluster_ids(self):
|
||||||
|
return self.get('cluster-ids') or []
|
||||||
|
|
||||||
|
def set_cluster_ids(self, cluster_ids: set):
|
||||||
|
self.set('cluster-ids', list(sorted(cluster_ids)))
|
||||||
|
|
||||||
|
def get_cluster_status(self, cluster_id: str) -> dict:
|
||||||
|
return self.get('clusters:{}:status'.format(cluster_id)) or {}
|
||||||
|
|
||||||
|
def set_cluster_status(self, cluster_id: str, status: dict):
|
||||||
|
self.set('clusters:{}:status'.format(cluster_id), status)
|
||||||
|
|
||||||
|
def get_cluster_data(self, cluster_id: str) -> dict:
|
||||||
|
return self.get('clusters:{}:data'.format(cluster_id)) or {}
|
||||||
|
|
||||||
|
def set_cluster_data(self, cluster_id: str, data: dict):
|
||||||
|
self.set('clusters:{}:data'.format(cluster_id), data)
|
||||||
|
|
||||||
|
|
||||||
|
class MemoryStore(AbstractStore):
|
||||||
'''Memory-only backend, mostly useful for local debugging'''
|
'''Memory-only backend, mostly useful for local debugging'''
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
@@ -85,7 +106,7 @@ class MemoryStore:
|
|||||||
self._screen_tokens[token] = data
|
self._screen_tokens[token] = data
|
||||||
|
|
||||||
|
|
||||||
class RedisStore:
|
class RedisStore(AbstractStore):
|
||||||
'''Redis-based backend for deployments with replicas > 1'''
|
'''Redis-based backend for deployments with replicas > 1'''
|
||||||
|
|
||||||
def __init__(self, url: str):
|
def __init__(self, url: str):
|
||||||
|
|||||||
@@ -41,8 +41,7 @@ def update_clusters(cluster_discoverer, query_cluster: callable, store, query_in
|
|||||||
cluster_ids = set()
|
cluster_ids = set()
|
||||||
for cluster in clusters:
|
for cluster in clusters:
|
||||||
cluster_ids.add(cluster.id)
|
cluster_ids.add(cluster.id)
|
||||||
status_key = '{}:status'.format(cluster.id)
|
status = store.get_cluster_status(cluster.id)
|
||||||
status = store.get(status_key) or {}
|
|
||||||
now = time.time()
|
now = time.time()
|
||||||
if now < status.get('last_query_time', 0) + query_interval:
|
if now < status.get('last_query_time', 0) + query_interval:
|
||||||
continue
|
continue
|
||||||
@@ -61,20 +60,20 @@ def update_clusters(cluster_discoverer, query_cluster: callable, store, query_in
|
|||||||
if backoff:
|
if backoff:
|
||||||
logger.info('Cluster {} ({}) recovered after {} tries.'.format(cluster.id, cluster.api_server_url, backoff['tries']))
|
logger.info('Cluster {} ({}) recovered after {} tries.'.format(cluster.id, cluster.api_server_url, backoff['tries']))
|
||||||
del status['backoff']
|
del status['backoff']
|
||||||
old_data = store.get(data['id'])
|
old_data = store.get_cluster_data(data['id'])
|
||||||
if old_data:
|
if old_data:
|
||||||
# https://pikacode.com/phijaro/json_delta/ticket/11/
|
# https://pikacode.com/phijaro/json_delta/ticket/11/
|
||||||
# diff is extremely slow without array_align=False
|
# diff is extremely slow without array_align=False
|
||||||
delta = json_delta.diff(old_data, data, verbose=debug, array_align=False)
|
delta = json_delta.diff(old_data, data, verbose=debug, array_align=False)
|
||||||
store.publish('clusterdelta', {'cluster_id': cluster.id, 'delta': delta})
|
store.publish('clusterdelta', {'cluster_id': cluster.id, 'delta': delta})
|
||||||
if delta:
|
if delta:
|
||||||
store.set(cluster.id, data)
|
store.set_cluster_data(cluster.id, data)
|
||||||
else:
|
else:
|
||||||
logger.info('Discovered new cluster {} ({}).'.format(cluster.id, cluster.api_server_url))
|
logger.info('Discovered new cluster {} ({}).'.format(cluster.id, cluster.api_server_url))
|
||||||
store.publish('clusterupdate', data)
|
store.publish('clusterupdate', data)
|
||||||
store.set(cluster.id, data)
|
store.set_cluster_data(cluster.id, data)
|
||||||
store.set(status_key, status)
|
store.set_cluster_status(cluster.id, status)
|
||||||
store.set('cluster-ids', list(sorted(cluster_ids)))
|
store.set_cluster_ids(cluster_ids)
|
||||||
except:
|
except:
|
||||||
logger.exception('Failed to update')
|
logger.exception('Failed to update')
|
||||||
finally:
|
finally:
|
||||||
|
|||||||
Reference in New Issue
Block a user