Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactors sockets and moves reconnect logic and error handling into it. #7459

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 18 additions & 2 deletions docusaurus/docs/code-base-works/api-resources-and-schemas.md
Original file line number Diff line number Diff line change
Expand Up @@ -216,13 +216,29 @@ If you already called `findAll` for a resource, then do `findMatching`, no addit

The `unsubscribe` function is used to unwatch resources. However, as a general rule, resources are never unwatched because Rancher assumes that any data you have already loaded needs to be kept up-to-date.

The code related to watching resources is in `plugins/steve/subscribe.js`.
The code related to watching resources is in `shell/plugins/steve/subscribe.js` and `shell/utils/resourceWatcher.js`

Code found in `subscribe.js` primarily acts as an interface so that Vue can dispatch actions, retrieve information via getters, and store related information for the socket object via Vuex. Other functionality related to websockets found in `subscribe.js` also include:
* Creating the actual socket object and storing it in the appropriate Vuex store.
* Destroying a websocket that is no longer needed (when a store is being destroyed).
* Queing messages to the websocket if the websocket is currently unable to send them.

To avoid excessive rerendering, the messages that change state, such as `resource.{create, change, remove}`, are saved in a buffer in the store. Once per second they are all flushed together to update the store.

Code found in `resourceWatcher.js` is primarily geared towards the websocket itself, the resource watches it maintains and can be used without Vuex depending on the use case. The socket class exported from here is responsible for:
* Connecting the created websocket to the appropriate API and maintaining data to reconnect if required.
* Watching a resource if an existing watch is not already created, maintaining and/or refreshing the resource collection's resourceVersion as needed.
* Unwatching a resource if it exists.
* Maintaining sufficient information for each watch such that a watch can be recreated should it enter an error state or stop.
* Maintain sufficient information for each watch so as to provide an interface for other code to retrieve information about a particular watch's status.
* Process messages from the websocket and execute any code required to maintain existing watches and then fire an event that can be picked up outside of the socket.
* Maintain the connection status of the websocket itself.


### Pinging

The UI normally has three websocket connections with `rancher` (Steve's global cluster management), `management` (Norman) and `cluster` (Steve for an individual cluster). The UI is pinged by Steve every five seconds and by Norman every thirty seconds. Steve's messages send the server version they are sent from, which sends another action and reloads the page if the server has been upgraded.

To avoid excessive rerendering, the messages that change state, such as `resource.{create, change, remove}`, are saved in a buffer. Once per second they are all flushed together to update the store.

## Resource Selectors

Expand Down
5 changes: 1 addition & 4 deletions shell/plugins/dashboard-store/actions.js
Original file line number Diff line number Diff line change
Expand Up @@ -522,10 +522,7 @@ export default {
};

if (getters['schemaFor'](type) && getters['watchStarted'](obj)) {
// Set that we don't want to watch this type
// Otherwise, the dispatch to unwatch below will just cause a re-watch when we
// detect the stop message from the backend over the web socket
commit('setWatchStopped', obj);
// the socket's send method called in the "watch" action here will pick up the "stop" in obj and automatically unwatch the resource
dispatch('watch', obj); // Ask the backend to stop watching the type
// Make sure anything in the pending queue for the type is removed, since we've now removed the type
commit('clearFromQueue', type);
Expand Down
1 change: 0 additions & 1 deletion shell/plugins/steve/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ export function SteveFactory(namespace, baseUrl) {
pendingFrames: [],
deferredRequests: {},
started: [],
inError: {},
podsByNamespace: {}, // Cache of pods by namespace
};
},
Expand Down
13 changes: 9 additions & 4 deletions shell/plugins/steve/mutations.js
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
import { addObject, removeObject } from '@shell/utils/array';
import { NAMESPACE, POD, SCHEMA } from '@shell/config/types';
import {
forgetType,
forgetType as dashboardStoreForgetAll,
resetStore,
loadAll,
load,
remove
} from '@shell/plugins/dashboard-store/mutations';
import { keyForSubscribe } from '@shell/plugins/steve/subscribe';
import { perfLoadAll } from '@shell/plugins/steve/performanceTesting';
import Vue from 'vue';

Expand Down Expand Up @@ -70,8 +69,14 @@ export default {
},

forgetType(state, type) {
if ( forgetType(state, type) ) {
delete state.inError[keyForSubscribe({ type })];
if ( dashboardStoreForgetAll(state, type) ) {
Object.keys(state?.socket?.watches || {})
.filter((watchKey) => {
return state.socket.watches[watchKey].resourceType === type;
})
.forEach((watchKey) => {
state.socket.unwatch(watchKey);
});
}
},

Expand Down
224 changes: 23 additions & 201 deletions shell/plugins/steve/subscribe.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import { addObject, clear, removeObject } from '@shell/utils/array';
import { clear, removeObject } from '@shell/utils/array';
import { get } from '@shell/utils/object';
import { COUNT, SCHEMA } from '@shell/config/types';
import { getPerformanceSetting } from '@shell/utils/settings';
import Socket, {
import ResourceWatcher from '@shell/utils/resourceWatcher';
import {
EVENT_CONNECTED,
EVENT_DISCONNECTED,
EVENT_MESSAGE,
Expand All @@ -18,13 +19,9 @@ import { escapeHtml } from '@shell/utils/string';
// eslint-disable-next-line
import webworker from './web-worker.steve-sub-worker.js';

export const NO_WATCH = 'NO_WATCH';
export const NO_SCHEMA = 'NO_SCHEMA';

// minimum length of time a disconnect notification is shown
const MINIMUM_TIME_NOTIFIED = 3000;

// We only create a worker for the cluster store
export function createWorker(store, ctx) {
const { getters } = ctx;
const storeName = getters.storeName;
Expand Down Expand Up @@ -68,11 +65,16 @@ export function createWorker(store, ctx) {
export function keyForSubscribe({
resourceType, type, namespace, id, selector
} = {}) {
return `${ resourceType || type || '' }/${ namespace || '' }/${ id || '' }/${ selector || '' }`;
return [(resourceType || type), namespace, id, selector] // each watch param in an array
.filter(param => !!param) // filter out all the empty ones
.join('/'); // join into a string so we can use it as an object key
}

export function equivalentWatch(a, b) {
if ( a.type !== b.type ) {
const aresourceType = a.resourceType || a.type;
const bresourceType = b.resourceType || b.type;

if ( aresourceType !== bresourceType ) {
return false;
}

Expand Down Expand Up @@ -164,7 +166,7 @@ export const actions = {
} else {
const maxTries = growlsDisabled(rootGetters) ? null : 3;

socket = new Socket(`${ state.config.baseUrl }/subscribe`, true, null, null, maxTries);
socket = new ResourceWatcher(`${ state.config.baseUrl }/subscribe`, true, null, null, maxTries);

commit('setSocket', socket);
socket.addEventListener(EVENT_CONNECTED, (e) => {
Expand Down Expand Up @@ -297,10 +299,6 @@ export const actions = {
return;
}

if ( typeof revision === 'undefined' ) {
revision = getters.nextResourceVersion(type, id);
}

const msg = { resourceType: type };

if ( revision ) {
Expand All @@ -326,85 +324,7 @@ export const actions = {
return dispatch('send', msg);
},

reconnectWatches({
state, getters, commit, dispatch
}) {
const promises = [];

for ( const entry of state.started.slice() ) {
console.info(`Reconnect [${ getters.storeName }]`, JSON.stringify(entry)); // eslint-disable-line no-console

if ( getters.schemaFor(entry.type) ) {
commit('setWatchStopped', entry);
delete entry.revision;
promises.push(dispatch('watch', entry));
}
}

return Promise.all(promises);
},

async resyncWatch({
state, getters, dispatch, commit
}, params) {
const {
resourceType, namespace, id, selector
} = params;

console.info(`Resync [${ getters.storeName }]`, params); // eslint-disable-line no-console

const opt = { force: true, forceWatch: true };

if ( id ) {
await dispatch('find', {
type: resourceType,
id,
opt,
});
commit('clearInError', params);

return;
}

let have, want;

if ( selector ) {
have = getters['matching'](resourceType, selector).slice();
want = await dispatch('findMatching', {
type: resourceType,
selector,
opt,
});
} else {
have = getters['all'](resourceType).slice();

if ( namespace ) {
have = have.filter(x => x.metadata?.namespace === namespace);
}

want = await dispatch('findAll', {
type: resourceType,
watchNamespace: namespace,
opt
});
}

const wantMap = {};

for ( const obj of want ) {
wantMap[obj.id] = true;
}

for ( const obj of have ) {
if ( !wantMap[obj.id] ) {
state.debugSocket && console.info(`Remove stale [${ getters.storeName }]`, resourceType, obj.id); // eslint-disable-line no-console

commit('remove', obj);
}
}
},

async opened({
opened({
commit, dispatch, state, getters, rootGetters
}, event) {
state.debugSocket && console.info(`WebSocket Opened [${ getters.storeName }]`); // eslint-disable-line no-console
Expand Down Expand Up @@ -432,7 +352,6 @@ export const actions = {
}

if ( socket.hasReconnected ) {
await dispatch('reconnectWatches');
// Check for disconnect notifications and clear them
const growlErr = rootGetters['growl/find']({ key: 'url', val: socket.url });

Expand Down Expand Up @@ -546,50 +465,14 @@ export const actions = {

'ws.resource.start'({ state, getters, commit }, msg) {
state.debugSocket && console.info(`Resource start: [${ getters.storeName }]`, msg); // eslint-disable-line no-console
commit('setWatchStarted', {
type: msg.resourceType,
namespace: msg.namespace,
id: msg.id,
selector: msg.selector
});
},

'ws.resource.error'({ getters, commit, dispatch }, msg) {
'ws.resource.error'({ getters }, msg) {
console.warn(`Resource error [${ getters.storeName }]`, msg.resourceType, ':', msg.data.error); // eslint-disable-line no-console

const err = msg.data?.error?.toLowerCase();

if ( err.includes('watch not allowed') ) {
commit('setInError', { type: msg.resourceType, reason: NO_WATCH });
} else if ( err.includes('failed to find schema') ) {
commit('setInError', { type: msg.resourceType, reason: NO_SCHEMA });
} else if ( err.includes('too old') ) {
dispatch('resyncWatch', msg);
}
},

'ws.resource.stop'({ getters, commit, dispatch }, msg) {
const type = msg.resourceType;
const obj = {
type,
id: msg.id,
namespace: msg.namespace,
selector: msg.selector
};

// console.warn(`Resource stop: [${ getters.storeName }]`, msg); // eslint-disable-line no-console

if ( getters['schemaFor'](type) && getters['watchStarted'](obj) ) {
// Try reconnecting once

commit('setWatchStopped', obj);

setTimeout(() => {
// Delay a bit so that immediate start/error/stop causes
// only a slow infinite loop instead of a tight one.
dispatch('watch', obj);
}, 5000);
}
'ws.resource.stop'({ getters }, msg) {
console.warn(`Resource stop: [${ getters.storeName }]`, msg); // eslint-disable-line no-console
},

'ws.resource.create'(ctx, msg) {
Expand Down Expand Up @@ -694,44 +577,16 @@ export const mutations = {
removeObject(state.pendingFrames, obj);
},

setWatchStarted(state, obj) {
const existing = state.started.find(entry => equivalentWatch(obj, entry));

if ( !existing ) {
addObject(state.started, obj);
}

delete state.inError[keyForSubscribe(obj)];
},

setWatchStopped(state, obj) {
const existing = state.started.find(entry => equivalentWatch(obj, entry));

if ( existing ) {
removeObject(state.started, existing);
} else {
console.warn("Tried to remove a watch that doesn't exist", obj); // eslint-disable-line no-console
}
},

setInError(state, msg) {
const key = keyForSubscribe(msg);

state.inError[key] = msg.reason;
},

clearInError(state, msg) {
const key = keyForSubscribe(msg);

delete state.inError[key];
},

debug(state, on) {
state.debugSocket = on !== false;
},

resetSubscriptions(state) {
clear(state.started);
const watcheKeys = Object.keys(state.socket.watches);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this throws an exception when going from the Virtualization Management product to a cluster's explorer product


watcheKeys.forEach((watchKey) => {
state.socket.unwatch(watchKey);
});
clear(state.pendingFrames);
clear(state.queue);
clearTimeout(state.queueTimer);
Expand All @@ -742,46 +597,13 @@ export const mutations = {

export const getters = {
canWatch: state => (obj) => {
return !state.inError[keyForSubscribe(obj)];
return !state?.socket?.[keyForSubscribe(obj)]?.error;
},

watchStarted: state => (obj) => {
return !!state.started.find(entry => equivalentWatch(obj, entry));
},

nextResourceVersion: (state, getters) => (type, id) => {
type = normalizeType(type);
let revision = 0;

if ( id ) {
const existing = getters['byId'](type, id);

revision = parseInt(existing?.metadata?.resourceVersion, 10);
}

if ( !revision ) {
const cache = state.types[type];

if ( !cache ) {
return null;
}

revision = cache.revision;

for ( const obj of cache.list ) {
if ( obj && obj.metadata ) {
const neu = parseInt(obj.metadata.resourceVersion, 10);

revision = Math.max(revision, neu);
}
}
}

if ( revision ) {
return revision;
}
const watches = Object.values(state.socket?.watches || {});

return null;
return !!watches.find(watch => equivalentWatch(obj, watch));
},

currentGeneration: state => (type) => {
Expand Down
Loading