diff --git a/kube_ops_view/update.py b/kube_ops_view/update.py index 0ce257d..fcc7275 100644 --- a/kube_ops_view/update.py +++ b/kube_ops_view/update.py @@ -27,7 +27,25 @@ def get_short_error_message(e: requests.exceptions.RequestException): return str(e) -def update_clusters(cluster_discoverer, query_cluster: callable, store, debug: bool): +def handle_query_failure(e: Exception, cluster, backoff: dict): + if not backoff: + backoff = {} + tries = backoff.get('tries', 0) + 1 + backoff['tries'] = tries + wait_seconds = calculate_backoff(tries) + backoff['next_try'] = time.time() + wait_seconds + if isinstance(e, requests.exceptions.RequestException): + message = get_short_error_message(e) + log = logger.error + else: + message = str(e) + log = logger.exception + log('Failed to query cluster {} ({}): {} (try {}, wait {} seconds)'.format( + cluster.id, cluster.api_server_url, message, tries, round(wait_seconds))) + return backoff + + +def update_clusters(cluster_discoverer, query_cluster: callable, store, query_interval=5, debug: bool=False): while True: lock = store.acquire_lock() if lock: @@ -36,33 +54,26 @@ def update_clusters(cluster_discoverer, query_cluster: callable, store, debug: b cluster_ids = set() for cluster in clusters: cluster_ids.add(cluster.id) - backoff_key = '{}:backoff'.format(cluster.id) - backoff = store.get(backoff_key) - if backoff and time.time() < backoff['next_try']: + status_key = '{}:status'.format(cluster.id) + status = store.get(status_key) or {} + now = time.time() + if now < status.get('last_query_time', 0) + query_interval: + continue + backoff = status.get('backoff') + if backoff and now < backoff['next_try']: # cluster is still in backoff, skip continue try: + logger.debug('Querying cluster {} ({})..'.format(cluster.id, cluster.api_server_url)) data = query_cluster(cluster) except Exception as e: - if not backoff: - backoff = {} - tries = backoff.get('tries', 0) + 1 - backoff['tries'] = tries - wait_seconds = calculate_backoff(tries) - backoff['next_try'] = time.time() + wait_seconds - if isinstance(e, requests.exceptions.RequestException): - message = get_short_error_message(e) - log = logger.error - else: - message = str(e) - log = logger.exception - log('Failed to query cluster {} ({}): {} (try {}, wait {} seconds)'.format( - cluster.id, cluster.api_server_url, message, tries, round(wait_seconds))) - store.set(backoff_key, backoff) + backoff = handle_query_failure(e, cluster, backoff) + status['backoff'] = backoff else: + status['last_query_time'] = now if backoff: - # reset backoff - store.set(backoff_key, None) + logger.info('Cluster {} ({}) recovered after {} tries.'.format(cluster.id, cluster.api_server_url, backoff['tries'])) + del status['backoff'] old_data = store.get(data['id']) if old_data: # https://pikacode.com/phijaro/json_delta/ticket/11/ @@ -72,11 +83,13 @@ def update_clusters(cluster_discoverer, query_cluster: callable, store, debug: b if delta: store.set(cluster.id, data) else: + logger.info('Discovered new cluster {} ({}).'.format(cluster.id, cluster.api_server_url)) store.publish('clusterupdate', data) store.set(cluster.id, data) + store.set(status_key, status) store.set('cluster-ids', list(sorted(cluster_ids))) except: logger.exception('Failed to update') finally: store.release_lock(lock) - gevent.sleep(5) + gevent.sleep(random_jitter(1))