replace kubernetes client with pykube-ng

This commit is contained in:
Henning Jacobs
2019-03-19 18:37:01 +01:00
parent d257337a5a
commit aefbf446ba
2 changed files with 62 additions and 53 deletions

View File

@@ -1,34 +1,27 @@
[[source]] [[source]]
url = "https://pypi.org/simple" url = "https://pypi.org/simple"
verify_ssl = true verify_ssl = true
name = "pypi" name = "pypi"
[packages] [packages]
click = "*" click = "*"
gevent = "*" gevent = "*"
requests = "*" requests = "*"
stups-tokens = ">=1.1.19" stups-tokens = ">=1.1.19"
redlock-py = "*" redlock-py = "*"
json-delta = ">=2.0" json-delta = ">=2.0"
kubernetes = "==7.0.0a1"
flask = "*" flask = "*"
flask-oauthlib = "*" flask-oauthlib = "*"
pykube-ng = "*"
[dev-packages] [dev-packages]
"flake8" = "*" "flake8" = "*"
pytest = "*" pytest = "*"
pipenv = "*" pipenv = "*"
pytest-cov = "*" pytest-cov = "*"
coveralls = "*" coveralls = "*"
[requires] [requires]
python_version = "3.7" python_version = "3.7"
[pipenv] [pipenv]

View File

@@ -4,12 +4,11 @@ import time
from pathlib import Path from pathlib import Path
from urllib.parse import urljoin from urllib.parse import urljoin
import kubernetes.client
import kubernetes.config
import requests import requests
import tokens import tokens
from requests.auth import AuthBase from requests.auth import AuthBase
from pykube import HTTPClient, KubeConfig
# default URL points to kubectl proxy # default URL points to kubectl proxy
DEFAULT_CLUSTERS = 'http://localhost:8001/' DEFAULT_CLUSTERS = 'http://localhost:8001/'
CLUSTER_ID_INVALID_CHARS = re.compile('[^a-z0-9:-]') CLUSTER_ID_INVALID_CHARS = re.compile('[^a-z0-9:-]')
@@ -48,18 +47,22 @@ class OAuthTokenAuth(AuthBase):
def __call__(self, request): def __call__(self, request):
token = tokens.get(self.token_name) token = tokens.get(self.token_name)
request.headers['Authorization'] = 'Bearer {}'.format(token) request.headers['Authorization'] = f'Bearer {token}'
return request return request
class Cluster: class Cluster:
def __init__(self, id, api_server_url, ssl_ca_cert=None, auth=None, cert_file=None, key_file=None): def __init__(
self,
id: str,
name: str,
api_server_url: str,
client: HTTPClient
):
self.id = id self.id = id
self.name = name
self.api_server_url = api_server_url self.api_server_url = api_server_url
self.ssl_ca_cert = ssl_ca_cert self.client = client
self.auth = auth
self.cert_file = cert_file
self.key_file = key_file
class StaticClusterDiscoverer: class StaticClusterDiscoverer:
@@ -69,29 +72,30 @@ class StaticClusterDiscoverer:
if not api_server_urls: if not api_server_urls:
try: try:
kubernetes.config.load_incluster_config() config = KubeConfig.from_service_account()
except kubernetes.config.ConfigException: except FileNotFoundError:
# we are not running inside a cluster # we are not running inside a cluster
# => assume default kubectl proxy URL # => assume default kubectl proxy URL
cluster = Cluster(generate_cluster_id(DEFAULT_CLUSTERS), DEFAULT_CLUSTERS) config = KubeConfig.from_url(DEFAULT_CLUSTERS)
else: client = HTTPClient(config)
# "load_incluster_config" set defaults in the config class
config = kubernetes.client.configuration.Configuration()
cluster = Cluster( cluster = Cluster(
generate_cluster_id(config.host), generate_cluster_id(DEFAULT_CLUSTERS), "cluster", DEFAULT_CLUSTERS, client
config.host, )
ssl_ca_cert=config.ssl_ca_cert, else:
auth=StaticAuthorizationHeaderAuth(config.api_key['authorization'])) client = HTTPClient(config)
cluster = Cluster(
generate_cluster_id(config.cluster['server']),
"cluster",
config.cluster['server'],
client
)
self._clusters.append(cluster) self._clusters.append(cluster)
else: else:
for api_server_url in api_server_urls: for api_server_url in api_server_urls:
config = KubeConfig.from_url(api_server_url)
if 'localhost' not in api_server_url: client = HTTPClient(config)
# TODO: hacky way of detecting whether we need a token or not generated_id = generate_cluster_id(api_server_url)
auth = OAuthTokenAuth('read-only') self._clusters.append(Cluster(generated_id, generated_id, api_server_url, client))
else:
auth = None
self._clusters.append(Cluster(generate_cluster_id(api_server_url), api_server_url, auth=auth))
def get_clusters(self): def get_clusters(self):
return self._clusters return self._clusters
@@ -114,12 +118,24 @@ class ClusterRegistryDiscoverer:
clusters = [] clusters = []
for row in response.json()['items']: for row in response.json()['items']:
# only consider "ready" clusters # only consider "ready" clusters
if row.get('lifecycle_status', 'ready') == 'ready': if row.get("lifecycle_status", "ready") == "ready":
clusters.append(Cluster(row['id'], row['api_server_url'], auth=OAuthTokenAuth('read-only'))) config = KubeConfig.from_url(row['api_server_url'])
client = HTTPClient(config)
client.session.auth = OAuthTokenAuth("read-only")
clusters.append(
Cluster(
row["id"],
row["alias"],
row["api_server_url"],
client
)
)
self._clusters = clusters self._clusters = clusters
self._last_cache_refresh = time.time() self._last_cache_refresh = time.time()
except: except:
logger.exception('Failed to refresh from cluster registry {}'.format(self._url)) logger.exception(
f"Failed to refresh from cluster registry {self._url}"
)
def get_clusters(self): def get_clusters(self):
now = time.time() now = time.time()
@@ -137,25 +153,20 @@ class KubeconfigDiscoverer:
def get_clusters(self): def get_clusters(self):
# Kubernetes Python client expects "vintage" string path # Kubernetes Python client expects "vintage" string path
config_file = str(self._path) config_file = str(self._path)
contexts, current_context = kubernetes.config.list_kube_config_contexts(config_file) config = KubeConfig.from_file(config_file)
for context in contexts: for context in config.contexts:
if self._contexts and context['name'] not in self._contexts: if self._contexts and context not in self._contexts:
# filter out # filter out
continue continue
config = kubernetes.client.configuration.Configuration() # create a new KubeConfig with new "current context"
kubernetes.config.load_kube_config(config_file, context=context['name'], client_configuration=config) context_config = KubeConfig(config.doc, context)
authorization = config.api_key.get('authorization') client = HTTPClient(context_config)
if authorization:
auth = StaticAuthorizationHeaderAuth(authorization)
else:
auth = None
cluster = Cluster( cluster = Cluster(
context['name'], context,
config.host, context,
ssl_ca_cert=config.ssl_ca_cert, context_config.cluster['server'],
cert_file=config.cert_file, client
key_file=config.key_file, )
auth=auth)
yield cluster yield cluster
@@ -163,4 +174,9 @@ class MockDiscoverer:
def get_clusters(self): def get_clusters(self):
for i in range(3): for i in range(3):
yield Cluster('mock-cluster-{}'.format(i), api_server_url='https://kube-{}.example.org'.format(i)) yield Cluster(
f"mock-cluster-{i}",
f"mock-cluster-{i}",
api_server_url=f"https://kube-{i}.example.org",
client=None
)