#!/usr/bin/env python3 import gevent.monkey gevent.monkey.patch_all() import click import flask import functools import gevent import gevent.wsgi import json import logging import os import signal import time import kube_ops_view from pathlib import Path from flask import Flask, redirect from flask_oauthlib.client import OAuth from .oauth import OAuthRemoteAppWithRefresh from urllib.parse import urljoin from .mock import query_mock_cluster from .kubernetes import query_kubernetes_cluster from .stores import MemoryStore, RedisStore from .cluster_discovery import DEFAULT_CLUSTERS, StaticClusterDiscoverer, ClusterRegistryDiscoverer, KubeconfigDiscoverer, MockDiscoverer from .update import update_clusters logger = logging.getLogger(__name__) SERVER_STATUS = {'shutdown': False} AUTHORIZE_URL = os.getenv('AUTHORIZE_URL') APP_URL = os.getenv('APP_URL') app = Flask(__name__) oauth = OAuth(app) auth = OAuthRemoteAppWithRefresh( oauth, 'auth', request_token_url=None, access_token_method='POST', access_token_url=os.getenv('ACCESS_TOKEN_URL'), authorize_url=AUTHORIZE_URL ) oauth.remote_apps['auth'] = auth def authorize(f): @functools.wraps(f) def wrapper(*args, **kwargs): if AUTHORIZE_URL and 'auth_token' not in flask.session: return redirect(urljoin(APP_URL, '/login')) return f(*args, **kwargs) return wrapper @app.route('/health') def health(): if SERVER_STATUS['shutdown']: flask.abort(503) else: return 'OK' @app.route('/') @authorize def index(): static_build_path = Path(__file__).parent / 'static' / 'build' candidates = sorted(static_build_path.glob('app*.js')) if candidates: app_js = candidates[0].name if app.debug: # cache busting for local development app_js += '?_={}'.format(time.time()) else: logger.error('Could not find JavaScript application bundle app*.js in {}'.format(static_build_path)) flask.abort(503, 'JavaScript application bundle not found (missing build)') return flask.render_template('index.html', app_js=app_js, version=kube_ops_view.__version__) def event(cluster_ids: set): # first sent full data once for cluster_id in app.store.get_cluster_ids(): if not cluster_ids or cluster_id in cluster_ids: status = app.store.get_cluster_status(cluster_id) if status: # send the cluster status including last_query_time BEFORE the cluster data # so the UI knows how to render correctly from the start yield 'event: clusterstatus\ndata: ' + json.dumps({'cluster_id': cluster_id, 'status': status}, separators=(',', ':')) + '\n\n' cluster = app.store.get_cluster_data(cluster_id) if cluster: yield 'event: clusterupdate\ndata: ' + json.dumps(cluster, separators=(',', ':')) + '\n\n' yield 'event: bootstrapend\ndata: \n\n' while True: for event_type, event_data in app.store.listen(): # hacky, event_data can be delta or full cluster object if not cluster_ids or event_data.get('cluster_id', event_data.get('id')) in cluster_ids: yield 'event: ' + event_type + '\ndata: ' + json.dumps(event_data, separators=(',', ':')) + '\n\n' @app.route('/events') @authorize def get_events(): '''SSE (Server Side Events), for an EventSource''' cluster_ids = set() for _id in flask.request.args.get('cluster_ids', '').split(): if _id: cluster_ids.add(_id) return flask.Response(event(cluster_ids), mimetype='text/event-stream') @app.route('/screen-tokens', methods=['GET', 'POST']) @authorize def screen_tokens(): new_token = None if flask.request.method == 'POST': new_token = app.store.create_screen_token() return flask.render_template('screen-tokens.html', new_token=new_token) @app.route('/screen/') def redeem_screen_token(token: str): remote_addr = flask.request.headers.get('X-Forwarded-For') or flask.request.remote_addr logger.info('Trying to redeem screen token "{}" for IP {}..'.format(token, remote_addr)) try: app.store.redeem_screen_token(token, remote_addr) except: flask.abort(401) flask.session['auth_token'] = (token, '') return redirect(urljoin(APP_URL, '/')) @app.route('/login') def login(): redirect_uri = urljoin(APP_URL, '/login/authorized') return auth.authorize(callback=redirect_uri) @app.route('/logout') def logout(): flask.session.pop('auth_token', None) return redirect(urljoin(APP_URL, '/')) @app.route('/login/authorized') def authorized(): resp = auth.authorized_response() if resp is None: return 'Access denied: reason=%s error=%s' % ( flask.request.args['error'], flask.request.args['error_description'] ) if not isinstance(resp, dict): return 'Invalid auth response' flask.session['auth_token'] = (resp['access_token'], '') return redirect(urljoin(APP_URL, '/')) def shutdown(): # just wait some time to give Kubernetes time to update endpoints # this requires changing the readinessProbe's # PeriodSeconds and FailureThreshold appropriately # see https://godoc.org/k8s.io/kubernetes/pkg/api/v1#Probe gevent.sleep(10) exit(0) def exit_gracefully(signum, frame): logger.info('Received TERM signal, shutting down..') SERVER_STATUS['shutdown'] = True gevent.spawn(shutdown) def print_version(ctx, param, value): if not value or ctx.resilient_parsing: return click.echo('Kubernetes Operational View {}'.format(kube_ops_view.__version__)) ctx.exit() class CommaSeparatedValues(click.ParamType): name = 'comma_separated_values' def convert(self, value, param, ctx): if isinstance(value, str): values = filter(None, value.split(',')) else: values = value return values @click.command(context_settings={'help_option_names': ['-h', '--help']}) @click.option('-V', '--version', is_flag=True, callback=print_version, expose_value=False, is_eager=True, help='Print the current version number and exit.') @click.option('-p', '--port', type=int, help='HTTP port to listen on (default: 8080)', envvar='SERVER_PORT', default=8080) @click.option('-d', '--debug', is_flag=True, help='Run in debugging mode', envvar='DEBUG') @click.option('-m', '--mock', is_flag=True, help='Mock Kubernetes clusters', envvar='MOCK') @click.option('--secret-key', help='Secret key for session cookies', envvar='SECRET_KEY', default='development') @click.option('--redis-url', help='Redis URL to use for pub/sub and job locking', envvar='REDIS_URL') @click.option('--clusters', type=CommaSeparatedValues(), help='Comma separated list of Kubernetes API server URLs (default: {})'.format(DEFAULT_CLUSTERS), envvar='CLUSTERS') @click.option('--cluster-registry-url', help='URL to cluster registry', envvar='CLUSTER_REGISTRY_URL') @click.option('--kubeconfig-path', type=click.Path(exists=True), help='Path to kubeconfig file', envvar='KUBECONFIG_PATH') @click.option('--kubeconfig-contexts', type=CommaSeparatedValues(), help='List of kubeconfig contexts to use (default: use all defined contexts)', envvar='KUBECONFIG_CONTEXTS') @click.option('--query-interval', type=float, help='Interval in seconds for querying clusters (default: 5)', envvar='QUERY_INTERVAL', default=5) def main(port, debug, mock, secret_key, redis_url, clusters: list, cluster_registry_url, kubeconfig_path, kubeconfig_contexts: list, query_interval): logging.basicConfig(level=logging.DEBUG if debug else logging.INFO) store = RedisStore(redis_url) if redis_url else MemoryStore() app.debug = debug app.secret_key = secret_key app.store = store if mock: cluster_query = query_mock_cluster discoverer = MockDiscoverer() else: cluster_query = query_kubernetes_cluster if cluster_registry_url: discoverer = ClusterRegistryDiscoverer(cluster_registry_url) elif kubeconfig_path: discoverer = KubeconfigDiscoverer(Path(kubeconfig_path), set(kubeconfig_contexts or [])) else: api_server_urls = clusters or [] discoverer = StaticClusterDiscoverer(api_server_urls) gevent.spawn(update_clusters, cluster_discoverer=discoverer, query_cluster=cluster_query, store=store, query_interval=query_interval, debug=debug) signal.signal(signal.SIGTERM, exit_gracefully) http_server = gevent.wsgi.WSGIServer(('0.0.0.0', port), app) logger.info('Listening on :{}..'.format(port)) http_server.serve_forever()