-
Notifications
You must be signed in to change notification settings - Fork 335
/
dataplane_status_sink.go
156 lines (136 loc) · 5.55 KB
/
dataplane_status_sink.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
package callbacks
import (
"time"
"github.com/golang/protobuf/proto"
mesh_proto "github.com/kumahq/kuma/api/mesh/v1alpha1"
"github.com/kumahq/kuma/pkg/core"
core_mesh "github.com/kumahq/kuma/pkg/core/resources/apis/mesh"
"github.com/kumahq/kuma/pkg/core/resources/manager"
core_model "github.com/kumahq/kuma/pkg/core/resources/model"
"github.com/kumahq/kuma/pkg/core/resources/store"
)
var sinkLog = core.Log.WithName("xds").WithName("sink")
type DataplaneInsightSink interface {
Start(stop <-chan struct{})
}
type DataplaneInsightStore interface {
// Upsert creates or updates the subscription, storing it with
// the key dataplaneID. dataplaneType gives the resource type of
// the dataplane proxy that has subscribed.
Upsert(dataplaneType core_model.ResourceType, dataplaneID core_model.ResourceKey, subscription *mesh_proto.DiscoverySubscription) error
}
func NewDataplaneInsightSink(
dataplaneType core_model.ResourceType,
accessor SubscriptionStatusAccessor,
newTicker func() *time.Ticker,
forceUpdateTicker func() *time.Ticker,
flushBackoff time.Duration,
store DataplaneInsightStore) DataplaneInsightSink {
return &dataplaneInsightSink{
flushTicker: newTicker,
forceUpdateTicker: forceUpdateTicker,
dataplaneType: dataplaneType,
accessor: accessor,
flushBackoff: flushBackoff,
store: store,
}
}
var _ DataplaneInsightSink = &dataplaneInsightSink{}
type dataplaneInsightSink struct {
flushTicker func() *time.Ticker
forceUpdateTicker func() *time.Ticker
dataplaneType core_model.ResourceType
accessor SubscriptionStatusAccessor
store DataplaneInsightStore
flushBackoff time.Duration
}
func (s *dataplaneInsightSink) Start(stop <-chan struct{}) {
flushTicker := s.flushTicker()
defer flushTicker.Stop()
forceUpdateTicker := s.forceUpdateTicker()
defer forceUpdateTicker.Stop()
var lastStoredState *mesh_proto.DiscoverySubscription
forceUpdate := false
flush := func(closing bool) {
dataplaneID, currentState := s.accessor.GetStatus()
select {
case <-forceUpdateTicker.C:
forceUpdate = true
default:
}
if proto.Equal(currentState, lastStoredState) && !forceUpdate {
return
}
forceUpdate = false
if err := s.store.Upsert(s.dataplaneType, dataplaneID, currentState); err != nil {
switch {
case closing:
// When XDS stream is closed, Dataplane Status Tracker executes OnStreamClose which closes stop channel
// The problem is that close() does not wait for this sink to do it's final work
// In the meantime Dataplane Lifecycle executes OnStreamClose which can remove Dataplane entity (and Insights due to ownership). Therefore both scenarios can happen:
// 1) upsert fail because it successfully retrieved DataplaneInsight but cannot Update because by this time, Insight is gone (ResourceConflict error)
// 2) upsert fail because it tries to create a new insight, but there is no Dataplane so ownership returns an error
// We could build a synchronous mechanism that waits for Sink to be stopped before moving on to next Callbacks, but this is potentially dangerous
// that we could block waiting for storage instead of executing next callbacks.
sinkLog.V(1).Info("failed to flush Dataplane status on stream close. It can happen when Dataplane is deleted at the same time",
"dataplaneid", dataplaneID,
"err", err)
case store.IsResourceConflict(err):
sinkLog.V(1).Info("failed to flush DataplaneInsight because it was updated in other place. Will retry in the next tick",
"dataplaneid", dataplaneID)
case store.IsResourcePreconditionFailed(err):
sinkLog.V(1).Info("failed to flush DataplaneInsight for unsupported resource",
"dataplaneid", dataplaneID,
"err", err,
)
default:
sinkLog.Error(err, "failed to flush DataplaneInsight", "dataplaneid", dataplaneID)
}
} else {
sinkLog.V(1).Info("DataplaneInsight saved", "dataplaneid", dataplaneID, "subscription", currentState)
lastStoredState = currentState
}
}
for {
select {
case <-flushTicker.C:
flush(false)
// On Kubernetes, because of the cache subsequent Get, Update requests can fail, because the cache is not strongly consistent.
// We handle the Resource Conflict logging on V1, but we can try to avoid the situation with backoff
time.Sleep(s.flushBackoff)
case <-stop:
flush(true)
return
}
}
}
func NewDataplaneInsightStore(resManager manager.ResourceManager) DataplaneInsightStore {
return &dataplaneInsightStore{
resManager: resManager,
}
}
var _ DataplaneInsightStore = &dataplaneInsightStore{}
type dataplaneInsightStore struct {
resManager manager.ResourceManager
}
func (s *dataplaneInsightStore) Upsert(
dataplaneType core_model.ResourceType,
dataplaneID core_model.ResourceKey,
subscription *mesh_proto.DiscoverySubscription,
) error {
switch dataplaneType {
case core_mesh.ZoneIngressType:
return manager.Upsert(s.resManager, dataplaneID, core_mesh.NewZoneIngressInsightResource(), func(resource core_model.Resource) {
insight := resource.(*core_mesh.ZoneIngressInsightResource)
insight.Spec.UpdateSubscription(subscription)
})
case core_mesh.DataplaneType:
return manager.Upsert(s.resManager, dataplaneID, core_mesh.NewDataplaneInsightResource(), func(resource core_model.Resource) {
insight := resource.(*core_mesh.DataplaneInsightResource)
insight.Spec.UpdateSubscription(subscription)
})
default:
// Return a designated precondition error since we don't expect other dataplane types.
return store.ErrorResourcePreconditionFailed(dataplaneType, dataplaneID.Name, dataplaneID.Mesh)
}
}