-
Notifications
You must be signed in to change notification settings - Fork 303
/
status.go
242 lines (206 loc) · 7.41 KB
/
status.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package e2e
import (
"fmt"
"time"
"github.com/golang/glog"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
informerv1 "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/tools/cache"
)
// IngressStability denotes the stabilization status of all Ingresses in a sandbox.
type IngressStability string
var (
// Stable indicates an Ingress is stable (i.e consistently serving 200's)
Stable IngressStability = "Stable"
// Unstable indicates an Ingress is unstable (i.e serving 404/502's).
Unstable IngressStability = "Unstable"
)
const (
configMapName = "status-cm"
cmPollInterval = 30 * time.Second
flushInterval = 30 * time.Second
// ExitKey is the key used to indicate to the status manager
// whether to gracefully finish the e2e test execution.
// Value associated with it is a timestamp string.
exitKey = "exit"
// masterUpgradingKey is the key used to indicate to the status manager that
// the k8s master is in the process of upgrading.
// Value associated with it is a timestamp string.
masterUpgradingKey = "master-upgrading"
// masterUpgradedKey is the key used to indicate to the status manager that
// the k8s master has successfully finished upgrading.
// Value associated with it is a timestamp string.
masterUpgradedKey = "master-upgraded"
)
// StatusManager manages the status of sandboxed Ingresses via a ConfigMap.
// It interacts with the an external framework test portion as follows:
// 1. StatusManager initializes and creates the ConfigMap status-cm. It listens
// on updates via informers.
// 2. e2e test calls StatusManager.putStatus with the Ingress name as key,
// and Unstable as the status
// 3. e2e test watches for when Ingress stabilizes, then uses StatusManager to
// update the Ingress's status to Stable
// 4. The external framework test reads from ConfigMap status-cm. When it detects that all
// Ingresses are stable (i.e., no value in the map is Unstable), it starts
// the MasterUpgrade.
// 5. When the k8s master finishes upgrading, the framework test writes the
// timestamp to the master-upgraded key in the ConfigMap
// 6. The external framework test writes the exit key in the ConfigMap to indicate that the e2e
// test can exit.
// 7. The StatusManager loop reads the exit key, then starts shutdown().
type StatusManager struct {
cm *v1.ConfigMap
f *Framework
informerCh chan struct{}
informerRunning bool
}
func NewStatusManager(f *Framework) *StatusManager {
return &StatusManager{
cm: &v1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: configMapName,
},
},
f: f,
}
}
func (sm *StatusManager) init() error {
var err error
sm.cm, err = sm.f.Clientset.Core().ConfigMaps("default").Create(sm.cm)
if err != nil {
return fmt.Errorf("error creating ConfigMap: %v", err)
}
go func() {
for _ = range time.NewTicker(flushInterval).C {
sm.flush()
}
}()
sm.startInformer()
return nil
}
func (sm *StatusManager) startInformer() {
newIndexer := func() cache.Indexers {
return cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}
}
sm.informerCh = make(chan struct{})
cmInformer := informerv1.NewConfigMapInformer(sm.f.Clientset, "default", cmPollInterval, newIndexer())
cmInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
UpdateFunc: func(old, cur interface{}) {
curCm := cur.(*v1.ConfigMap)
upgradeTs := curCm.Data[masterUpgradingKey]
// We need to pick up the master-upgrading flag here, because otherwise
// if the k8s master is in the process of upgrading we can't access
// the ConfigMap.
if len(upgradeTs) > 0 {
glog.V(2).Infof("Master upgrade began at %v", upgradeTs)
sm.setMasterUpgrading(upgradeTs)
}
if len(curCm.Data[exitKey]) > 0 {
glog.V(2).Infof("ConfigMap was updated with exit switch at %s", curCm.Data[exitKey])
close(sm.informerCh)
sm.f.shutdown(0)
}
},
})
glog.V(2).Info("Started ConfigMap informer")
sm.informerRunning = true
go cmInformer.Run(sm.informerCh)
}
func (sm *StatusManager) stopInformer() {
glog.V(2).Info("Stopped ConfigMap informer")
sm.informerRunning = false
close(sm.informerCh)
}
func (sm *StatusManager) shutdown() {
glog.V(2).Infof("Shutting down status manager.")
glog.V(3).Infof("ConfigMap: %+v", sm.cm.Data)
if err := sm.f.Clientset.Core().ConfigMaps("default").Delete(configMapName, &metav1.DeleteOptions{}); err != nil {
glog.Errorf("Error deleting ConfigMap: %v", err)
}
}
func (sm *StatusManager) putStatus(key string, status IngressStability) {
sm.f.lock.Lock()
defer sm.f.lock.Unlock()
if sm.cm.Data == nil {
sm.cm.Data = make(map[string]string)
}
sm.cm.Data[key] = string(status)
}
func (sm *StatusManager) setMasterUpgrading(ts string) {
sm.f.lock.Lock()
defer sm.f.lock.Unlock()
if sm.cm.Data == nil {
sm.cm.Data = make(map[string]string)
}
sm.cm.Data[masterUpgradingKey] = ts
}
func (sm *StatusManager) masterUpgrading() bool {
sm.f.lock.Lock()
defer sm.f.lock.Unlock()
return len(sm.cm.Data[masterUpgradingKey]) > 0
}
func (sm *StatusManager) masterUpgraded() bool {
sm.f.lock.Lock()
defer sm.f.lock.Unlock()
return len(sm.cm.Data[masterUpgradedKey]) > 0
}
func (sm *StatusManager) flush() {
sm.f.lock.Lock()
defer sm.f.lock.Unlock()
glog.V(3).Infof("Attempting to flush %v", sm.cm.Data)
// If master is in the process of upgrading, we stop the informer.
if sm.informerRunning && len(sm.cm.Data[masterUpgradingKey]) > 0 {
sm.stopInformer()
}
// Restart ConfigMap informer if it was previously shut down
if !sm.informerRunning && len(sm.cm.Data[masterUpgradedKey]) > 0 {
glog.V(2).Infof("Master has successfully upgraded at %s", sm.cm.Data[masterUpgradedKey])
sm.startInformer()
}
// K8s considers its version of the ConfigMap to be latest, so we must get
// the configmap from k8s first.
updatedCm, err := sm.f.Clientset.Core().ConfigMaps("default").Get(configMapName, metav1.GetOptions{})
// The k8s API returns an empty ConfigMap upon error - we return early in
// order to not overwrite our ConfigMap data.
if err != nil {
// if the k8s master is upgrading, we suppress the error message because
// we expect a "connection refused" error in this situation.
if len(sm.cm.Data[masterUpgradingKey]) > 0 {
return
}
glog.Warningf("Error getting ConfigMap: %v", err)
return
}
if updatedCm.Data == nil {
updatedCm.Data = make(map[string]string)
}
// We give precedence to the master-upgraded and master-upgrading flags
// set by the external test framework, but otherwise we prioritize
// Ingress statuses set by StatusManager.
for key, value := range sm.cm.Data {
if key != masterUpgradedKey && key != masterUpgradingKey {
updatedCm.Data[key] = value
}
}
sm.cm = updatedCm
sm.cm.Name = configMapName
_, err = sm.f.Clientset.Core().ConfigMaps("default").Update(sm.cm)
if err != nil {
glog.Warningf("Error updating ConfigMap: %v", err)
} else {
glog.V(3).Infof("Flushed statuses %v to ConfigMap", sm.cm.Data)
}
}