From a60f1b58b344b6ffd5873441af019d9a61a9a389 Mon Sep 17 00:00:00 2001 From: Henning Jacobs Date: Sun, 15 Jan 2017 16:37:12 +0100 Subject: [PATCH] #96 implement exponential backoff --- kube_ops_view/backoff.py | 48 +++++++++++++++++++++++ kube_ops_view/main.py | 3 +- kube_ops_view/update.py | 82 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 132 insertions(+), 1 deletion(-) create mode 100644 kube_ops_view/backoff.py create mode 100644 kube_ops_view/update.py diff --git a/kube_ops_view/backoff.py b/kube_ops_view/backoff.py new file mode 100644 index 0000000..ad36b27 --- /dev/null +++ b/kube_ops_view/backoff.py @@ -0,0 +1,48 @@ +import random + + +def expo(n: int, base=2, factor=1, max_value=None): + """Exponential decay. + + Adapted from https://github.com/litl/backoff/blob/master/backoff.py (MIT License) + + Args: + base: The mathematical base of the exponentiation operation + factor: Factor to multiply the exponentation by. + max_value: The maximum value to yield. Once the value in the + true exponential sequence exceeds this, the value + of max_value will forever after be yielded. + """ + a = factor * base ** n + if max_value is None or a < max_value: + return a + else: + return max_value + + +def random_jitter(value, jitter=1): + """Jitter the value a random number of milliseconds. + + Copied from https://github.com/litl/backoff/blob/master/backoff.py (MIT License) + + This adds up to 1 second of additional time to the original value. + Prior to backoff version 1.2 this was the default jitter behavior. + Args: + value: The unadulterated backoff value. + """ + return value + random.uniform(0, jitter) + + +def full_jitter(value): + """Jitter the value across the full range (0 to value). + + Copied from https://github.com/litl/backoff/blob/master/backoff.py (MIT License) + + This corresponds to the "Full Jitter" algorithm specified in the + AWS blog's post on the performance of various jitter algorithms. + (http://www.awsarchitectureblog.com/2015/03/backoff.html) + + Args: + value: The unadulterated backoff value. + """ + return random.uniform(0, value) diff --git a/kube_ops_view/main.py b/kube_ops_view/main.py index f9bfe11..72e541f 100644 --- a/kube_ops_view/main.py +++ b/kube_ops_view/main.py @@ -89,7 +89,8 @@ def event(cluster_ids: set): for cluster_id in (app.store.get('cluster-ids') or []): if not cluster_ids or cluster_id in cluster_ids: cluster = app.store.get(cluster_id) - yield 'event: clusterupdate\ndata: ' + json.dumps(cluster, separators=(',', ':')) + '\n\n' + if cluster: + yield 'event: clusterupdate\ndata: ' + json.dumps(cluster, separators=(',', ':')) + '\n\n' while True: for event_type, event_data in app.store.listen(): # hacky, event_data can be delta or full cluster object diff --git a/kube_ops_view/update.py b/kube_ops_view/update.py new file mode 100644 index 0000000..0ce257d --- /dev/null +++ b/kube_ops_view/update.py @@ -0,0 +1,82 @@ +import logging +import time + +import gevent +import json_delta +import requests.exceptions + +from .backoff import expo, random_jitter + +logger = logging.getLogger(__name__) + + +def calculate_backoff(tries: int): + return random_jitter(expo(tries, factor=2, max_value=120), jitter=4) + + +def get_short_error_message(e: requests.exceptions.RequestException): + '''Generate a reasonable short message why the HTTP request failed''' + + if e.response is not None: + # e.g. "401 Unauthorized" + return '{} {}'.format(e.response.status_code, e.response.reason) + elif isinstance(e, requests.exceptions.ConnectionError): + # e.g. "ConnectionError" or "ConnectTimeout" + return e.__class__.__name__ + else: + return str(e) + + +def update_clusters(cluster_discoverer, query_cluster: callable, store, debug: bool): + while True: + lock = store.acquire_lock() + if lock: + try: + clusters = cluster_discoverer.get_clusters() + cluster_ids = set() + for cluster in clusters: + cluster_ids.add(cluster.id) + backoff_key = '{}:backoff'.format(cluster.id) + backoff = store.get(backoff_key) + if backoff and time.time() < backoff['next_try']: + # cluster is still in backoff, skip + continue + try: + data = query_cluster(cluster) + except Exception as e: + if not backoff: + backoff = {} + tries = backoff.get('tries', 0) + 1 + backoff['tries'] = tries + wait_seconds = calculate_backoff(tries) + backoff['next_try'] = time.time() + wait_seconds + if isinstance(e, requests.exceptions.RequestException): + message = get_short_error_message(e) + log = logger.error + else: + message = str(e) + log = logger.exception + log('Failed to query cluster {} ({}): {} (try {}, wait {} seconds)'.format( + cluster.id, cluster.api_server_url, message, tries, round(wait_seconds))) + store.set(backoff_key, backoff) + else: + if backoff: + # reset backoff + store.set(backoff_key, None) + old_data = store.get(data['id']) + if old_data: + # https://pikacode.com/phijaro/json_delta/ticket/11/ + # diff is extremely slow without array_align=False + delta = json_delta.diff(old_data, data, verbose=debug, array_align=False) + store.publish('clusterdelta', {'cluster_id': cluster.id, 'delta': delta}) + if delta: + store.set(cluster.id, data) + else: + store.publish('clusterupdate', data) + store.set(cluster.id, data) + store.set('cluster-ids', list(sorted(cluster_ids))) + except: + logger.exception('Failed to update') + finally: + store.release_lock(lock) + gevent.sleep(5)