From a9d35e09e4eddb14ea4709272b9ab84f4cce6e56 Mon Sep 17 00:00:00 2001 From: Henning Jacobs Date: Fri, 6 Jan 2017 23:29:02 +0100 Subject: [PATCH] #49 redis backend for pub/sub --- app.py | 62 ++++++++++++++++++++++++++++++++++-------------- requirements.txt | 1 + 2 files changed, 45 insertions(+), 18 deletions(-) diff --git a/app.py b/app.py index 9da251e..5f97c41 100755 --- a/app.py +++ b/app.py @@ -14,9 +14,11 @@ import os import re import requests import datetime +import redis import time import tokens from queue import Queue +from redlock import Redlock from flask import Flask, redirect from flask_oauthlib.client import OAuth, OAuthRemoteApp @@ -29,9 +31,9 @@ class MemoryStore: def acquire_lock(self): # no-op for memory store - pass + return 'fake-lock' - def release_lock(self): + def release_lock(self, lock): # no op for memory store pass @@ -50,7 +52,28 @@ class MemoryStore: self._queues.remove(queue) -STORE = MemoryStore() +class RedisStore: + def __init__(self, url): + self._url = url + self._redis = redis.StrictRedis.from_url(url) + self._redlock = Redlock([url]) + + def acquire_lock(self): + return self._redlock.lock('update', 10000) + + def release_lock(self, lock): + self._redlock.unlock(lock) + + def publish(self, event_type, event_data): + self._redis.publish('default', '{}:{}'.format(event_type, json.dumps(event_data, separators=(',', ':')))) + + def listen(self): + p = self._redis.pubsub() + p.subscribe('default') + for message in p.listen(): + if message['type'] == 'message': + event_type, data = message['data'].decode('utf-8').split(':', 1) + yield (event_type, json.loads(data)) CLUSTER_ID_INVALID_CHARS = re.compile('[^a-z0-9:-]') @@ -60,11 +83,14 @@ def get_bool(name: str): return os.getenv(name, '').lower() in ('1', 'true') +SERVER_PORT = int(os.getenv('SERVER_PORT', 8080)) DEFAULT_CLUSTERS = 'http://localhost:8001/' CREDENTIALS_DIR = os.getenv('CREDENTIALS_DIR', '') AUTHORIZE_URL = os.getenv('AUTHORIZE_URL') APP_URL = os.getenv('APP_URL') MOCK = get_bool('MOCK') +REDIS_URL = os.getenv('REDIS_URL') +STORE = RedisStore(REDIS_URL) if REDIS_URL else MemoryStore() app = Flask(__name__) app.debug = get_bool('DEBUG') @@ -332,25 +358,25 @@ def get_auth_oauth_token(): def update(): while True: - try: - STORE.acquire_lock() - if MOCK: - clusters = get_mock_clusters() - else: - clusters = get_kubernetes_clusters() - for cluster in clusters: - STORE.publish('clusterupdate', cluster) - except: - logging.exception('Failed to update') - finally: - STORE.release_lock() + lock = STORE.acquire_lock() + if lock: + try: + if MOCK: + clusters = get_mock_clusters() + else: + clusters = get_kubernetes_clusters() + for cluster in clusters: + STORE.publish('clusterupdate', cluster) + except: + logging.exception('Failed to update') + finally: + STORE.release_lock(lock) gevent.sleep(5) if __name__ == '__main__': logging.basicConfig(level=logging.INFO) - port = 8080 - http_server = gevent.wsgi.WSGIServer(('0.0.0.0', port), app) + http_server = gevent.wsgi.WSGIServer(('0.0.0.0', SERVER_PORT), app) gevent.spawn(update) - logging.info('Listening on :{}..'.format(port)) + logging.info('Listening on :{}..'.format(SERVER_PORT)) http_server.serve_forever() diff --git a/requirements.txt b/requirements.txt index e3ec8fd..502df59 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,3 +3,4 @@ Flask-OAuthlib gevent requests stups-tokens>=1.0.19 +redlock-py