Files
kops-arm64/kube_ops_view/cluster_discovery.py
Henning Jacobs 76a498bacc pre-commit linting (#259)
* pre-commit linting

* fix pydocs
2020-04-25 21:01:21 +02:00

175 lines
5.7 KiB
Python

import logging
import re
import time
from pathlib import Path
from typing import List
from urllib.parse import urljoin
import requests
import tokens
from pykube import HTTPClient
from pykube import KubeConfig
from requests.auth import AuthBase
# default URL points to kubectl proxy
DEFAULT_CLUSTERS = "http://localhost:8001/"
CLUSTER_ID_INVALID_CHARS = re.compile("[^a-z0-9:-]")
logger = logging.getLogger(__name__)
tokens.configure(from_file_only=True)
def generate_cluster_id(url: str):
"""Generate some "cluster ID" from given API server URL."""
for prefix in ("https://", "http://"):
if url.startswith(prefix):
url = url[len(prefix) :]
return CLUSTER_ID_INVALID_CHARS.sub("-", url.lower()).strip("-")
class StaticAuthorizationHeaderAuth(AuthBase):
"""Static authentication with given "Authorization" header."""
def __init__(self, authorization):
self.authorization = authorization
def __call__(self, request):
request.headers["Authorization"] = self.authorization
return request
class OAuthTokenAuth(AuthBase):
"""Dynamic authentication using the "tokens" library to load OAuth tokens from file (potentially mounted from a Kubernetes secret)."""
def __init__(self, token_name):
self.token_name = token_name
tokens.manage(token_name)
def __call__(self, request):
token = tokens.get(self.token_name)
request.headers["Authorization"] = f"Bearer {token}"
return request
class Cluster:
def __init__(self, id: str, name: str, api_server_url: str, client: HTTPClient):
self.id = id
self.name = name
self.api_server_url = api_server_url
self.client = client
class StaticClusterDiscoverer:
def __init__(self, api_server_urls: list):
self._clusters = []
if not api_server_urls:
try:
config = KubeConfig.from_service_account()
except FileNotFoundError:
# we are not running inside a cluster
# => assume default kubectl proxy URL
config = KubeConfig.from_url(DEFAULT_CLUSTERS)
client = HTTPClient(config)
cluster = Cluster(
generate_cluster_id(DEFAULT_CLUSTERS),
"cluster",
DEFAULT_CLUSTERS,
client,
)
else:
client = HTTPClient(config)
cluster = Cluster(
generate_cluster_id(config.cluster["server"]),
"cluster",
config.cluster["server"],
client,
)
self._clusters.append(cluster)
else:
for api_server_url in api_server_urls:
config = KubeConfig.from_url(api_server_url)
client = HTTPClient(config)
generated_id = generate_cluster_id(api_server_url)
self._clusters.append(
Cluster(generated_id, generated_id, api_server_url, client)
)
def get_clusters(self):
return self._clusters
class ClusterRegistryDiscoverer:
def __init__(self, cluster_registry_url: str, cache_lifetime=60):
self._url = cluster_registry_url
self._cache_lifetime = cache_lifetime
self._last_cache_refresh = 0
self._clusters: List[Cluster] = []
self._session = requests.Session()
self._session.auth = OAuthTokenAuth("read-only")
def refresh(self):
try:
response = self._session.get(
urljoin(self._url, "/kubernetes-clusters"), timeout=10
)
response.raise_for_status()
clusters = []
for row in response.json()["items"]:
# only consider "ready" clusters
if row.get("lifecycle_status", "ready") == "ready":
config = KubeConfig.from_url(row["api_server_url"])
client = HTTPClient(config)
client.session.auth = OAuthTokenAuth("read-only")
clusters.append(
Cluster(row["id"], row["alias"], row["api_server_url"], client)
)
self._clusters = clusters
self._last_cache_refresh = time.time()
except Exception as e:
logger.exception(
f"Failed to refresh from cluster registry {self._url}: {e}"
)
def get_clusters(self):
now = time.time()
if now - self._last_cache_refresh > self._cache_lifetime:
self.refresh()
return self._clusters
class KubeconfigDiscoverer:
def __init__(self, kubeconfig_path: Path, contexts: set):
self._path = kubeconfig_path
self._contexts = contexts
def get_clusters(self):
# Kubernetes Python client expects "vintage" string path
config_file = str(self._path)
config = KubeConfig.from_file(config_file)
for context in config.contexts:
if self._contexts and context not in self._contexts:
# filter out
continue
# create a new KubeConfig with new "current context"
context_config = KubeConfig(config.doc, context)
client = HTTPClient(context_config)
cluster = Cluster(
context, context, context_config.cluster["server"], client
)
yield cluster
class MockDiscoverer:
def get_clusters(self):
for i in range(3):
yield Cluster(
f"mock-cluster-{i}",
f"mock-cluster-{i}",
api_server_url=f"https://kube-{i}.example.org",
client=None,
)