diff --git a/app.py b/app.py index 196acca..8cdfccd 100755 --- a/app.py +++ b/app.py @@ -9,6 +9,7 @@ import functools import gevent import gevent.wsgi import json +import json_delta import logging import os import re @@ -60,9 +61,16 @@ class MemoryStore: '''Memory-only backend, mostly useful for local debugging''' def __init__(self): + self._data = {} self._queues = [] self._screen_tokens = {} + def set(self, key, value): + self._data[key] = value + + def get(self, key): + return self._data.get(key) + def acquire_lock(self): # no-op for memory store return 'fake-lock' @@ -105,6 +113,14 @@ class RedisStore: self._redis = redis.StrictRedis.from_url(url) self._redlock = Redlock([url]) + def set(self, key, value): + self._redis.set(key, json.dumps(value, separators=(',', ':'))) + + def get(self, key): + value = self._redis.get(key) + if value: + return json.loads(value.decode('utf-8')) + def acquire_lock(self): return self._redlock.lock('update', 10000) @@ -147,6 +163,7 @@ def get_bool(name: str): return os.getenv(name, '').lower() in ('1', 'true') +DEBUG = get_bool('DEBUG') SERVER_PORT = int(os.getenv('SERVER_PORT', 8080)) SERVER_STATUS = {'shutdown': False} DEFAULT_CLUSTERS = 'http://localhost:8001/' @@ -158,7 +175,7 @@ REDIS_URL = os.getenv('REDIS_URL') STORE = RedisStore(REDIS_URL) if REDIS_URL else MemoryStore() app = Flask(__name__) -app.debug = get_bool('DEBUG') +app.debug = DEBUG app.secret_key = os.getenv('SECRET_KEY', 'development') oauth = OAuth(app) @@ -295,8 +312,8 @@ def generate_mock_cluster_data(index: int): labels['master'] = 'true' pods = [] for j in range(hash_int((index + 1) * (i + 1)) % 32): - # add/remove some pods every 6 seconds - if j % 17 == 0 and int(time.time() / 6) % 2 == 0: + # add/remove some pods every 7 seconds + if j % 17 == 0 and int(time.time() / 7) % 2 == 0: pass else: pods.append(generate_mock_pod(index, i, j)) @@ -316,6 +333,22 @@ def get_mock_clusters(): yield data +def map_node_status(status: dict): + return { + 'addresses': status.get('addresses'), + 'capacity': status.get('capacity'), + } + + +def map_node(node: dict): + return { + 'name': node['metadata']['name'], + 'labels': node['metadata']['labels'], + 'status': map_node_status(node['status']), + 'pods': [] + } + + def get_kubernetes_clusters(): for api_server_url in (os.getenv('CLUSTERS') or DEFAULT_CLUSTERS).split(','): cluster_id = generate_cluster_id(api_server_url) @@ -329,8 +362,7 @@ def get_kubernetes_clusters(): pods_by_namespace_name = {} unassigned_pods = [] for node in response.json()['items']: - obj = {'name': node['metadata']['name'], 'labels': node['metadata']['labels'], 'status': node['status'], - 'pods': []} + obj = map_node(node) nodes.append(obj) nodes_by_name[obj['name']] = obj response = session.get(urljoin(api_server_url, '/api/v1/pods'), timeout=5) @@ -388,6 +420,11 @@ def get_kubernetes_clusters(): def event(cluster_ids: set): + # first sent full data once + for cluster_id in (STORE.get('cluster-ids') or []): + if not cluster_ids or cluster_id in cluster_ids: + cluster = STORE.get(cluster_id) + yield 'event: clusterupdate\ndata: ' + json.dumps(cluster, separators=(',', ':')) + '\n\n' while True: for event_type, cluster in STORE.listen(): if not cluster_ids or cluster['id'] in cluster_ids: @@ -461,8 +498,19 @@ def update(): clusters = get_mock_clusters() else: clusters = get_kubernetes_clusters() + cluster_ids = [] for cluster in clusters: - STORE.publish('clusterupdate', cluster) + old_data = STORE.get(cluster['id']) + if old_data: + # https://pikacode.com/phijaro/json_delta/ticket/11/ + # diff is extremely slow without array_align=False + delta = json_delta.diff(old_data, cluster, verbose=DEBUG, array_align=False) + STORE.publish('clusterdelta', {'cluster_id': cluster['id'], 'delta': delta}) + else: + STORE.publish('clusterupdate', cluster) + STORE.set(cluster['id'], cluster) + cluster_ids.append(cluster['id']) + STORE.set('cluster-ids', cluster_ids) except: logging.exception('Failed to update') finally: diff --git a/app/.eslintignore b/app/.eslintignore new file mode 100644 index 0000000..3fecb71 --- /dev/null +++ b/app/.eslintignore @@ -0,0 +1 @@ +src/vendor/*.js diff --git a/app/src/app.js b/app/src/app.js index 27af956..b3ff475 100644 --- a/app/src/app.js +++ b/app/src/app.js @@ -4,6 +4,7 @@ import {Pod, ALL_PODS, sortByName, sortByMemory, sortByCPU, sortByAge} from './p import SelectBox from './selectbox' import { Theme, ALL_THEMES} from './themes.js' import { DESATURATION_FILTER } from './filters.js' +import { JSON_delta } from './vendor/json_delta.js' const PIXI = require('pixi.js') @@ -18,6 +19,8 @@ export default class App { this.sorterFn = '' this.theme = Theme.get(localStorage.getItem('theme')) this.eventSource = null + this.keepAliveTimer = null + this.keepAliveSeconds = 20 this.clusters = new Map() } @@ -262,11 +265,13 @@ export default class App { this.stage.addChild(pod) } update() { + // make sure we create a copy (this.clusters might get modified) + const clusters = Array.from(this.clusters.entries()).sort().map(idCluster => idCluster[1]) const that = this let changes = 0 const firstTime = this.seenPods.size == 0 const podKeys = new Set() - for (const cluster of this.clusters.values()) { + for (const cluster of clusters) { for (const node of cluster.nodes) { for (const pod of node.pods) { podKeys.add(cluster.id + '/' + pod.namespace + '/' + pod.name) @@ -301,10 +306,10 @@ export default class App { } let y = 0 const clusterIds = new Set() - for (const [clusterId, cluster] of Array.from(this.clusters.entries()).sort()) { - if (!this.selectedClusters.size || this.selectedClusters.has(clusterId)) { - clusterIds.add(clusterId) - let clusterBox = clusterComponentById[clusterId] + for (const cluster of clusters) { + if (!this.selectedClusters.size || this.selectedClusters.has(cluster.id)) { + clusterIds.add(cluster.id) + let clusterBox = clusterComponentById[cluster.id] if (!clusterBox) { clusterBox = new Cluster(cluster, this.tooltip) this.viewContainer.addChild(clusterBox) @@ -368,6 +373,14 @@ export default class App { this.update() } + keepAlive() { + if (this.keepAliveTimer != null) { + clearTimeout(this.keepAliveTimer) + } + this._errors = 0 + this.keepAliveTimer = setTimeout(this.listen.bind(this), this.keepAliveSeconds * 1000) + } + listen() { if (this.eventSource != null) { this.eventSource.close() @@ -381,14 +394,30 @@ export default class App { url += '?cluster_ids=' + clusterIds } const eventSource = this.eventSource = new EventSource(url, {credentials: 'include'}) + this.keepAlive() eventSource.onerror = function(event) { - that.listen() + that._errors++ + that.eventSource.close() + that.eventSource = null } eventSource.addEventListener('clusterupdate', function(event) { + that.keepAlive() const cluster = JSON.parse(event.data) that.clusters.set(cluster.id, cluster) that.update() }) + eventSource.addEventListener('clusterdelta', function(event) { + that.keepAlive() + const data = JSON.parse(event.data) + let cluster = that.clusters.get(data.cluster_id) + if (cluster && data.delta) { + // deep copy cluster object (patch function mutates inplace!) + cluster = JSON.parse(JSON.stringify(cluster)) + cluster = JSON_delta.patch(cluster, data.delta) + that.clusters.set(cluster.id, cluster) + that.update() + } + }) } run() { diff --git a/app/src/utils.js b/app/src/utils.js index 8e7238a..3199538 100644 --- a/app/src/utils.js +++ b/app/src/utils.js @@ -73,7 +73,7 @@ const metric = (metric, type) => const podResource = type => (containers, resource) => containers - .map(({resources}) => metric(resources[resource], type)) + .map(({resources}) => resources ? metric(resources[resource], type) : 0) .reduce((a, b) => a + b, 0) export {FACTORS, hsvToRgb, getBarColor, parseResource, metric, podResource} diff --git a/app/src/vendor/json_delta.js b/app/src/vendor/json_delta.js new file mode 100644 index 0000000..9860ae4 --- /dev/null +++ b/app/src/vendor/json_delta.js @@ -0,0 +1,533 @@ +/* JSON-delta v2.0 - A diff/patch pair for JSON-serialized data +structures. + +Copyright 2013-2015 Philip J. Roberts . +All rights reserved + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are +met: + +1. Redistributions of source code must retain the above copyright +notice, this list of conditions and the following disclaimer. + +2. Redistributions in binary form must reproduce the above copyright +notice, this list of conditions and the following disclaimer in the +documentation and/or other materials provided with the distribution. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +This implementation is based heavily on the original python2 version: +see http://www.phil-roberts.name/json-delta/ for further +documentation. */ + +export const JSON_delta = { + // Main entry points: ====================================================== + patch: function(struc, diff) { + /* Apply the sequence of diff stanzas diff to the structure + struc, and returns the patched structure. */ + var stan_key; + for (stan_key = 0; stan_key < diff.length; stan_key++) { + struc = this.patchStanza(struc, diff[stan_key]); + } + return struc; + }, + + diff: function(left, right, minimal, key) { + /* Build a diff between the structures left and right. + + Parameters: + key: this is used for mutual recursion between this + function and those it calls. Normally it should be + left unset or set as its default []. + + minimal: if this flag is set true, the function will try + harder to find the diff that encodes as the shortest + possible JSON string, at the expense of using more of + both memory and processor time (as alternatives are + computed and compared). + */ + key = key !== undefined ? key : []; + minimal = minimal !== undefined ? minimal : true; + var dumbdiff = [[key, right]], my_diff = [], common; + + if (this.structureWorthInvestigating(left, right)) { + common = this.commonality(left, right); + if (minimal) { + my_diff = this.needleDiff(left, right, minimal, key); + } else if (common < 0.5) { + my_diff = this.thisLevelDiff(left, right, key, common); + } else { + my_diff = this.keysetDiff(left, right, minimal, key); + } + } else { + my_diff = this.thisLevelDiff(left, right, key, 0.0); + } + + if (minimal) { + if (JSON.stringify(dumbdiff).length < + JSON.stringify(my_diff).length) { + my_diff = dumbdiff; + } + } + + if (key.length === 0) { + if (my_diff.length > 1) { + my_diff = this.sortStanzas(my_diff); + } + } + return my_diff; + }, + + // ========================================================================= + + isStrictlyEqual: function(left, right) { + /* Recursively compare the (potentially nested) objects left + * and right */ + var idx, ks, key; + if (this.isTerminal(left) && this.isTerminal(right)) { + return (left === right); + } + if (this.isTerminal(left) || this.isTerminal(right)) { + return false; + } + if (left instanceof Array && right instanceof Array) { + if (left.length !== right.length) { + return false; + } + for (idx = 0; idx < left.length; idx++) { + if (! this.isStrictlyEqual(left[idx], right[idx])) { + return false; + } + } + return true; + } + if (left instanceof Array || right instanceof Array) { + return false; + } + ks = this.computeKeysets(left, right); + if (ks[1].length !== 0 || ks[2].length !== 0) { + return false; + } + for (idx = 0; idx < ks[0].length; idx++) { + key = ks[0][idx]; + if (! this.isStrictlyEqual(left[key], right[key])) { + return false; + } + } + return true; + }, + + isTerminal: function(obj) { + /* Test whether obj will be a terminal node in the tree when + * serialized as JSON. */ + if (typeof obj === 'string' || typeof obj === 'number' || + typeof obj === 'boolean' || obj === null) { + return true; + } + return false; + }, + + appendKey: function(stanzas, arr, key) { + /* Get the appropriate key for appending to the array arr, + * assuming that stanzas will also be applied, and arr appears + * at key within the overall structure. */ + key = key !== undefined ? key : []; + var addition_key = arr.length, prior_key, i; + for (i = 0; i < stanzas.length; i++) { + prior_key = stanzas[i][0]; + if (stanzas[i].length > 1 && + prior_key.length === key.length + 1 && + prior_key[prior_key.length-1] >= addition_key) + { addition_key = prior_key[prior_key.length-1] + 1; } + } + return addition_key; + }, + + loopOver: function(obj, callback) { + /* Helper function for looping over obj. Does the Right Thing + * whether obj is an array or not. */ + var i, key; + if (obj instanceof Array) { + for (i = 0; i < obj.length; i++) { + callback(obj, i); + } + } else { + for (key in obj) { + if (obj.hasOwnProperty(key)) { + callback(obj, key); + } + } + } + }, + + inArray: function(keypath) { + var terminal = keypath[keypath.length - 1]; + return (typeof terminal === 'number') + }, + + inObject: function(keypath) { + var terminal = keypath[keypath.length - 1]; + return (typeof terminal === 'string') + }, + + splitDiff: function(diff) { + /* Split the stanzas in diff into an array of three arrays: + * [modifications, deletions, insertions]. */ + var idx, objs = [], mods = [], dels = [], inss = []; + var dests = {3: inss, 1: dels}, stanza, keypath; + if (diff.length === 0) {return [[], diff];} + for (idx = 0; idx < diff.length; idx++) { + stanza = diff[idx] + if (stanza.length === 2) { + if (this.inObject(stanza[0])) { + objs.push(stanza); + } else { + mods.push(stanza); + } + } else { + dests[stanza.length].push(stanza) + } + } + return [objs, mods, dels, inss]; + }, + + stableKeypathLengthSort: function(stanzas) { + var comparator = function (a, b) { + var swap; + if (a[0].length === b[0].length) { + return a[0][0] - b[0][0]; + } + return b[0].length - a[0].length; + } + for (var i = 0; i < stanzas.length; i++) { + stanzas[i][0].unshift(i) + } + stanzas.sort(comparator) + for (i = 0; i < stanzas.length; i++) { + stanzas[i][0].shift() + } + return stanzas + }, + + keypathCompare: function(a, b) { + a = a[0]; b = b[0]; + if (a.length !== b.length) { + return a.length - b.length; + } + for (var i = 0; i < a.length; i++) { + if (typeof a[i] === 'number' && a[i] !== b[i]) { + return a[i] - b[i]; + } + } + return 0; + }, + + keypathCompareReverse: function(a, b) { + a = a[0]; b = b[0]; + if (a.length !== b.length) { + return b.length - a.length; + } + for (var i = 0; i < a.length; i++) { + if (typeof a[i] === 'number' && a[i] !== b[i]) { + return b[i] - a[i]; + } + } + return 0; + }, + + sortStanzas: function(diff) { + /* Sorts the stanzas in a diff: object changes can occur in + * any order, but deletions from arrays have to happen last + * node first: ['foo', 'bar', 'baz'] -> ['foo', 'bar'] -> + * ['foo'] -> []; additions to sequences have to happen + * leftmost-node-first: [] -> ['foo'] -> ['foo', 'bar'] -> + * ['foo', 'bar', 'baz'], and insert-and-shift alterations to + * arrays must happen last. */ + + // First we divide the stanzas using splitDiff(): + var split_thing = this.splitDiff(diff); + // Then we sort modifications of arrays in ascending order of keypath + // (note that we can?t tell appends from mods on the info available): + split_thing[1].sort(this.keypathCompare); + // Deletions from arrays in descending order of keypath: + split_thing[2].sort(this.keypathCompareReverse); + // And insert-and-shifts in ascending order of keypath: + split_thing[3].sort(this.keypathCompare) + diff = split_thing[0].concat( + split_thing[1], split_thing[2], split_thing[3] + ); + // Finally, we sort by length of keypath: + diff = this.stableKeypathLengthSort(diff, true) + return diff + }, + + computeKeysets: function(left, right) { + /* Returns an array of three arrays (overlap, left_only, + * right_only), representing the properties common to left and + * right, only defined for left, and only defined for right, + * respectively. */ + var overlap = [], left_only = [], right_only = []; + var target = overlap; + + this.loopOver(left, function(obj, key) { + if (right[key] !== undefined) { + target = overlap; + } + else { + target = left_only; + } + target.push(key); + }); + this.loopOver(right, function(obj, key) { + if (left[key] === undefined) { + right_only.push(key); + } + }); + return [overlap, left_only, right_only]; + }, + + structureWorthInvestigating: function(left, right) { + /* Test whether it is worth looking at the internal structure + * of `left` and `right` to see if they can be efficiently + * diffed. */ + if (this.isTerminal(left) || this.isTerminal(right)) { + return false; + } + if ((left.length === 0) || (right.length === 0)) { + return false; + } + if ((left instanceof Array) && (right instanceof Array)) { + return true; + } + if ((left instanceof Array) || (right instanceof Array)) { + return false; + } + if ((typeof left === 'object') && (typeof right === 'object')) { + return true; + } + return false; + }, + + commonality: function(left, right) { + /* Calculate the amount that the structures left and right + * have in common */ + var com = 0, tot = 0; + var elem, keysets, o, l, r, idx; + if (this.isTerminal(left) || this.isTerminal(right)) { + return 0; + } + + if ((left instanceof Array) && (right instanceof Array)) { + for (idx = 0; idx < left.length; idx++) { + elem = left[idx]; + if (right.indexOf(elem) !== -1) { + com++; + } + } + tot = Math.max(left.length, right.length); + } + else { + if ((left instanceof Array) || (right instanceof Array)) { + return 0; + } + keysets = this.computeKeysets(left, right); + o = keysets[0]; l = keysets[1]; r = keysets[2]; + com = o.length; + tot = o.length + l.length + r.length; + for (idx = 0; idx < r.length; idx++) { + elem = r[idx]; + if (l.indexOf(elem) === -1) { + tot++; + } + } + } + if (tot === 0) {return 0;} + return com / tot; + }, + + thisLevelDiff: function(left, right, key, common) { + /* Returns a sequence of diff stanzas between the objects left + * and right, assuming that they are each at the position key + * within the overall structure. */ + var out = [], idx, okey; + key = key !== undefined ? key : []; + + if (common === undefined) { + common = this.commonality(left, right); + } + + if (common) { + var ks = this.computeKeysets(left, right); + for (idx = 0; idx < ks[0].length; idx++) { + okey = ks[0][idx]; + if (left[okey] !== right[okey]) { + out.push([key.concat([okey]), right[okey]]); + } + } + for (idx = 0; idx < ks[1].length; idx++) { + okey = ks[1][idx]; + out.push([key.concat([okey])]); + } + for (idx = 0; idx < ks[2].length; idx++) { + okey = ks[2][idx]; + out.push([key.concat([okey]), right[okey]]); + } + return out; + } + if (! this.isStrictlyEqual(left, right)) { + return [[key, right]]; + } + return []; + }, + + keysetDiff: function(left, right, minimal, key) { + /* Compute a diff between left and right, without treating + * arrays differently from objects. */ + minimal = minimal !== undefined ? minimal : true; + var out = [], k; + var ks = this.computeKeysets(left, right); + for (k = 0; k < ks[1].length; k++) { + out.push([key.concat(ks[1][k])]); + } + for (k = 0; k < ks[2].length; k++) { + out.push([key.concat(ks[2][k]), right[ks[2][k]]]); + } + for (k = 0; k < ks[0].length; k++) { + out = out.concat(this.diff(left[ks[0][k]], right[ks[0][k]], + minimal, key.concat([ks[0][k]]))); + } + return out; + }, + + needleDiff: function(left, right, minimal, key) { + /* Compute a diff between left and right. If both are arrays, + * a variant of Needleman-Wunsch sequence alignment is used to + * make the diff minimal (at a significant cost in both + * storage and processing). Otherwise, the parms are passed on + * to keysetDiff.*/ + if (! (left instanceof Array && right instanceof Array)) { + return this.keysetDiff(left, right, minimal, key); + } + minimal = minimal !== undefined ? minimal : true; + var down_col = 0, lastrow = [], i, sub_i, left_i, right_i, col_i; + var row, first_left_i, left_elem, right_elem; + var cand_length, win_length, cand, winner; + + var modify_cand = function () { + if (col_i + 1 < lastrow.length) { + return lastrow[col_i+1].concat( + JSON_delta.diff(left_elem, right_elem, + minimal, key.concat([left_i])) + ); + } + }; + + var delete_cand = function () { + if (row.length > 0) { + return row[0].concat([[key.concat([left_i])]]); + } + }; + + var append_cand = function () { + if (col_i === down_col) { + return lastrow[col_i].concat( + [[key.concat([JSON_delta.appendKey(lastrow[col_i], left, key)]), + right_elem]] + ); + } + }; + + var insert_cand = function () { + if (col_i !== down_col) { + return lastrow[col_i].concat( + [[key.concat([right_i]), right_elem, "i"]] + ); + } + }; + + var cand_funcs = [modify_cand, delete_cand, append_cand, insert_cand]; + + for (i = 0; i <= left.length; i++) { + lastrow.unshift([]); + for (sub_i = 0; sub_i < i; sub_i++) { + lastrow[0].push([key.concat([sub_i])]); + } + } + + for (right_i = 0; right_i < right.length; right_i++) { + right_elem = right[right_i]; + row = [] + for (left_i = 0; left_i < left.length; left_i++) { + left_elem = left[left_i]; + col_i = left.length - left_i - 1; + win_length = Infinity; + for (i = 0; i < cand_funcs.length; i++) { + cand = cand_funcs[i](); + if (cand !== undefined) { + cand_length = JSON.stringify(cand).length; + if (cand_length < win_length) { + winner = cand; + win_length = cand_length; + } + } + } + row.unshift(winner); + } + lastrow = row; + } + return winner; + }, + + patchStanza: function(struc, diff) { + /* Applies the diff stanza diff to the structure struc. + Returns the modified structure. */ + var key = diff[0]; + switch (key.length) { + case 0: + struc = diff[1]; + break; + case 1: + if (diff.length === 1) { + if (struc.splice === undefined) { + delete struc[key[0]]; + } + else { + struc.splice(key[0], 1); + } + } else if (diff.length === 3) { + if (struc.splice === undefined) { + struc[key[0]] = diff[1]; + } else { + struc.splice(key[0], 0, diff[1]); + } + } + else { + struc[key[0]] = diff[1]; + } + break; + default: + var pass_key = key.slice(1), pass_struc = struc[key[0]]; + var pass_diff = [pass_key].concat(diff.slice(1)); + if (pass_struc === undefined) { + if (typeof pass_key[0] === 'string') { + pass_struc = {}; + } else { + pass_struc = []; + } + } + struc[key[0]] = this.patchStanza(pass_struc, pass_diff); + } + return struc; + } +}; diff --git a/requirements.txt b/requirements.txt index 9e4a1d1..77a8f77 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,3 +4,4 @@ gevent requests stups-tokens>=1.1.19 redlock-py +json_delta>=2.0