#96 query clusters in 5 second interval

This commit is contained in:
Henning Jacobs
2017-01-15 17:12:28 +01:00
parent a60f1b58b3
commit 1b3819a290

View File

@@ -27,7 +27,25 @@ def get_short_error_message(e: requests.exceptions.RequestException):
return str(e) return str(e)
def update_clusters(cluster_discoverer, query_cluster: callable, store, debug: bool): def handle_query_failure(e: Exception, cluster, backoff: dict):
if not backoff:
backoff = {}
tries = backoff.get('tries', 0) + 1
backoff['tries'] = tries
wait_seconds = calculate_backoff(tries)
backoff['next_try'] = time.time() + wait_seconds
if isinstance(e, requests.exceptions.RequestException):
message = get_short_error_message(e)
log = logger.error
else:
message = str(e)
log = logger.exception
log('Failed to query cluster {} ({}): {} (try {}, wait {} seconds)'.format(
cluster.id, cluster.api_server_url, message, tries, round(wait_seconds)))
return backoff
def update_clusters(cluster_discoverer, query_cluster: callable, store, query_interval=5, debug: bool=False):
while True: while True:
lock = store.acquire_lock() lock = store.acquire_lock()
if lock: if lock:
@@ -36,33 +54,26 @@ def update_clusters(cluster_discoverer, query_cluster: callable, store, debug: b
cluster_ids = set() cluster_ids = set()
for cluster in clusters: for cluster in clusters:
cluster_ids.add(cluster.id) cluster_ids.add(cluster.id)
backoff_key = '{}:backoff'.format(cluster.id) status_key = '{}:status'.format(cluster.id)
backoff = store.get(backoff_key) status = store.get(status_key) or {}
if backoff and time.time() < backoff['next_try']: now = time.time()
if now < status.get('last_query_time', 0) + query_interval:
continue
backoff = status.get('backoff')
if backoff and now < backoff['next_try']:
# cluster is still in backoff, skip # cluster is still in backoff, skip
continue continue
try: try:
logger.debug('Querying cluster {} ({})..'.format(cluster.id, cluster.api_server_url))
data = query_cluster(cluster) data = query_cluster(cluster)
except Exception as e: except Exception as e:
if not backoff: backoff = handle_query_failure(e, cluster, backoff)
backoff = {} status['backoff'] = backoff
tries = backoff.get('tries', 0) + 1
backoff['tries'] = tries
wait_seconds = calculate_backoff(tries)
backoff['next_try'] = time.time() + wait_seconds
if isinstance(e, requests.exceptions.RequestException):
message = get_short_error_message(e)
log = logger.error
else:
message = str(e)
log = logger.exception
log('Failed to query cluster {} ({}): {} (try {}, wait {} seconds)'.format(
cluster.id, cluster.api_server_url, message, tries, round(wait_seconds)))
store.set(backoff_key, backoff)
else: else:
status['last_query_time'] = now
if backoff: if backoff:
# reset backoff logger.info('Cluster {} ({}) recovered after {} tries.'.format(cluster.id, cluster.api_server_url, backoff['tries']))
store.set(backoff_key, None) del status['backoff']
old_data = store.get(data['id']) old_data = store.get(data['id'])
if old_data: if old_data:
# https://pikacode.com/phijaro/json_delta/ticket/11/ # https://pikacode.com/phijaro/json_delta/ticket/11/
@@ -72,11 +83,13 @@ def update_clusters(cluster_discoverer, query_cluster: callable, store, debug: b
if delta: if delta:
store.set(cluster.id, data) store.set(cluster.id, data)
else: else:
logger.info('Discovered new cluster {} ({}).'.format(cluster.id, cluster.api_server_url))
store.publish('clusterupdate', data) store.publish('clusterupdate', data)
store.set(cluster.id, data) store.set(cluster.id, data)
store.set(status_key, status)
store.set('cluster-ids', list(sorted(cluster_ids))) store.set('cluster-ids', list(sorted(cluster_ids)))
except: except:
logger.exception('Failed to update') logger.exception('Failed to update')
finally: finally:
store.release_lock(lock) store.release_lock(lock)
gevent.sleep(5) gevent.sleep(random_jitter(1))