import logging import re import time from pathlib import Path from urllib.parse import urljoin import requests import tokens from requests.auth import AuthBase from pykube import HTTPClient, KubeConfig # default URL points to kubectl proxy DEFAULT_CLUSTERS = 'http://localhost:8001/' CLUSTER_ID_INVALID_CHARS = re.compile('[^a-z0-9:-]') logger = logging.getLogger(__name__) tokens.configure(from_file_only=True) def generate_cluster_id(url: str): '''Generate some "cluster ID" from given API server URL''' for prefix in ('https://', 'http://'): if url.startswith(prefix): url = url[len(prefix):] return CLUSTER_ID_INVALID_CHARS.sub('-', url.lower()).strip('-') class StaticAuthorizationHeaderAuth(AuthBase): '''Static authentication with given "Authorization" header''' def __init__(self, authorization): self.authorization = authorization def __call__(self, request): request.headers['Authorization'] = self.authorization return request class OAuthTokenAuth(AuthBase): '''Dynamic authentication using the "tokens" library to load OAuth tokens from file (potentially mounted from a Kubernetes secret)''' def __init__(self, token_name): self.token_name = token_name tokens.manage(token_name) def __call__(self, request): token = tokens.get(self.token_name) request.headers['Authorization'] = f'Bearer {token}' return request class Cluster: def __init__( self, id: str, name: str, api_server_url: str, client: HTTPClient ): self.id = id self.name = name self.api_server_url = api_server_url self.client = client class StaticClusterDiscoverer: def __init__(self, api_server_urls: list): self._clusters = [] if not api_server_urls: try: config = KubeConfig.from_service_account() except FileNotFoundError: # we are not running inside a cluster # => assume default kubectl proxy URL config = KubeConfig.from_url(DEFAULT_CLUSTERS) client = HTTPClient(config) cluster = Cluster( generate_cluster_id(DEFAULT_CLUSTERS), "cluster", DEFAULT_CLUSTERS, client ) else: client = HTTPClient(config) cluster = Cluster( generate_cluster_id(config.cluster['server']), "cluster", config.cluster['server'], client ) self._clusters.append(cluster) else: for api_server_url in api_server_urls: config = KubeConfig.from_url(api_server_url) client = HTTPClient(config) generated_id = generate_cluster_id(api_server_url) self._clusters.append(Cluster(generated_id, generated_id, api_server_url, client)) def get_clusters(self): return self._clusters class ClusterRegistryDiscoverer: def __init__(self, cluster_registry_url: str, cache_lifetime=60): self._url = cluster_registry_url self._cache_lifetime = cache_lifetime self._last_cache_refresh = 0 self._clusters = [] self._session = requests.Session() self._session.auth = OAuthTokenAuth('read-only') def refresh(self): try: response = self._session.get(urljoin(self._url, '/kubernetes-clusters'), timeout=10) response.raise_for_status() clusters = [] for row in response.json()['items']: # only consider "ready" clusters if row.get("lifecycle_status", "ready") == "ready": config = KubeConfig.from_url(row['api_server_url']) client = HTTPClient(config) client.session.auth = OAuthTokenAuth("read-only") clusters.append( Cluster( row["id"], row["alias"], row["api_server_url"], client ) ) self._clusters = clusters self._last_cache_refresh = time.time() except: logger.exception( f"Failed to refresh from cluster registry {self._url}" ) def get_clusters(self): now = time.time() if now - self._last_cache_refresh > self._cache_lifetime: self.refresh() return self._clusters class KubeconfigDiscoverer: def __init__(self, kubeconfig_path: Path, contexts: set): self._path = kubeconfig_path self._contexts = contexts def get_clusters(self): # Kubernetes Python client expects "vintage" string path config_file = str(self._path) config = KubeConfig.from_file(config_file) for context in config.contexts: if self._contexts and context not in self._contexts: # filter out continue # create a new KubeConfig with new "current context" context_config = KubeConfig(config.doc, context) client = HTTPClient(context_config) cluster = Cluster( context, context, context_config.cluster['server'], client ) yield cluster class MockDiscoverer: def get_clusters(self): for i in range(3): yield Cluster( f"mock-cluster-{i}", f"mock-cluster-{i}", api_server_url=f"https://kube-{i}.example.org", client=None )