#49 redis backend for pub/sub
This commit is contained in:
62
app.py
62
app.py
@@ -14,9 +14,11 @@ import os
|
||||
import re
|
||||
import requests
|
||||
import datetime
|
||||
import redis
|
||||
import time
|
||||
import tokens
|
||||
from queue import Queue
|
||||
from redlock import Redlock
|
||||
|
||||
from flask import Flask, redirect
|
||||
from flask_oauthlib.client import OAuth, OAuthRemoteApp
|
||||
@@ -29,9 +31,9 @@ class MemoryStore:
|
||||
|
||||
def acquire_lock(self):
|
||||
# no-op for memory store
|
||||
pass
|
||||
return 'fake-lock'
|
||||
|
||||
def release_lock(self):
|
||||
def release_lock(self, lock):
|
||||
# no op for memory store
|
||||
pass
|
||||
|
||||
@@ -50,7 +52,28 @@ class MemoryStore:
|
||||
self._queues.remove(queue)
|
||||
|
||||
|
||||
STORE = MemoryStore()
|
||||
class RedisStore:
|
||||
def __init__(self, url):
|
||||
self._url = url
|
||||
self._redis = redis.StrictRedis.from_url(url)
|
||||
self._redlock = Redlock([url])
|
||||
|
||||
def acquire_lock(self):
|
||||
return self._redlock.lock('update', 10000)
|
||||
|
||||
def release_lock(self, lock):
|
||||
self._redlock.unlock(lock)
|
||||
|
||||
def publish(self, event_type, event_data):
|
||||
self._redis.publish('default', '{}:{}'.format(event_type, json.dumps(event_data, separators=(',', ':'))))
|
||||
|
||||
def listen(self):
|
||||
p = self._redis.pubsub()
|
||||
p.subscribe('default')
|
||||
for message in p.listen():
|
||||
if message['type'] == 'message':
|
||||
event_type, data = message['data'].decode('utf-8').split(':', 1)
|
||||
yield (event_type, json.loads(data))
|
||||
|
||||
|
||||
CLUSTER_ID_INVALID_CHARS = re.compile('[^a-z0-9:-]')
|
||||
@@ -60,11 +83,14 @@ def get_bool(name: str):
|
||||
return os.getenv(name, '').lower() in ('1', 'true')
|
||||
|
||||
|
||||
SERVER_PORT = int(os.getenv('SERVER_PORT', 8080))
|
||||
DEFAULT_CLUSTERS = 'http://localhost:8001/'
|
||||
CREDENTIALS_DIR = os.getenv('CREDENTIALS_DIR', '')
|
||||
AUTHORIZE_URL = os.getenv('AUTHORIZE_URL')
|
||||
APP_URL = os.getenv('APP_URL')
|
||||
MOCK = get_bool('MOCK')
|
||||
REDIS_URL = os.getenv('REDIS_URL')
|
||||
STORE = RedisStore(REDIS_URL) if REDIS_URL else MemoryStore()
|
||||
|
||||
app = Flask(__name__)
|
||||
app.debug = get_bool('DEBUG')
|
||||
@@ -332,25 +358,25 @@ def get_auth_oauth_token():
|
||||
|
||||
def update():
|
||||
while True:
|
||||
try:
|
||||
STORE.acquire_lock()
|
||||
if MOCK:
|
||||
clusters = get_mock_clusters()
|
||||
else:
|
||||
clusters = get_kubernetes_clusters()
|
||||
for cluster in clusters:
|
||||
STORE.publish('clusterupdate', cluster)
|
||||
except:
|
||||
logging.exception('Failed to update')
|
||||
finally:
|
||||
STORE.release_lock()
|
||||
lock = STORE.acquire_lock()
|
||||
if lock:
|
||||
try:
|
||||
if MOCK:
|
||||
clusters = get_mock_clusters()
|
||||
else:
|
||||
clusters = get_kubernetes_clusters()
|
||||
for cluster in clusters:
|
||||
STORE.publish('clusterupdate', cluster)
|
||||
except:
|
||||
logging.exception('Failed to update')
|
||||
finally:
|
||||
STORE.release_lock(lock)
|
||||
gevent.sleep(5)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
port = 8080
|
||||
http_server = gevent.wsgi.WSGIServer(('0.0.0.0', port), app)
|
||||
http_server = gevent.wsgi.WSGIServer(('0.0.0.0', SERVER_PORT), app)
|
||||
gevent.spawn(update)
|
||||
logging.info('Listening on :{}..'.format(port))
|
||||
logging.info('Listening on :{}..'.format(SERVER_PORT))
|
||||
http_server.serve_forever()
|
||||
|
||||
@@ -3,3 +3,4 @@ Flask-OAuthlib
|
||||
gevent
|
||||
requests
|
||||
stups-tokens>=1.0.19
|
||||
redlock-py
|
||||
|
||||
Reference in New Issue
Block a user