-
Notifications
You must be signed in to change notification settings - Fork 351
/
runner.go
132 lines (113 loc) · 4.07 KB
/
runner.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
// Copyright Envoy Gateway Authors
// SPDX-License-Identifier: Apache-2.0
// The full text of the Apache license is available in the LICENSE file at
// the root of the repo.
package runner
import (
"context"
"reflect"
ktypes "k8s.io/apimachinery/pkg/types"
egv1a1 "github.com/envoyproxy/gateway/api/v1alpha1"
"github.com/envoyproxy/gateway/internal/envoygateway/config"
extension "github.com/envoyproxy/gateway/internal/extension/types"
"github.com/envoyproxy/gateway/internal/infrastructure/kubernetes/ratelimit"
"github.com/envoyproxy/gateway/internal/ir"
"github.com/envoyproxy/gateway/internal/message"
"github.com/envoyproxy/gateway/internal/xds/translator"
)
type Config struct {
config.Server
XdsIR *message.XdsIR
Xds *message.Xds
ExtensionManager extension.Manager
ProviderResources *message.ProviderResources
}
type Runner struct {
Config
}
func New(cfg *Config) *Runner {
return &Runner{Config: *cfg}
}
func (r *Runner) Name() string {
return string(egv1a1.LogComponentXdsTranslatorRunner)
}
// Start starts the xds-translator runner
func (r *Runner) Start(ctx context.Context) (err error) {
r.Logger = r.Logger.WithName(r.Name()).WithValues("runner", r.Name())
go r.subscribeAndTranslate(ctx)
r.Logger.Info("started")
return
}
func (r *Runner) subscribeAndTranslate(ctx context.Context) {
// Subscribe to resources
message.HandleSubscription(message.Metadata{Runner: string(egv1a1.LogComponentXdsTranslatorRunner), Message: "xds-ir"}, r.XdsIR.Subscribe(ctx),
func(update message.Update[string, *ir.Xds], errChan chan error) {
r.Logger.Info("received an update")
key := update.Key
val := update.Value
if update.Delete {
r.Xds.Delete(key)
} else {
// Translate to xds resources
t := &translator.Translator{
FilterOrder: val.FilterOrder,
}
// Set the extension manager if an extension is loaded
if r.ExtensionManager != nil {
t.ExtensionManager = &r.ExtensionManager
}
// Set the rate limit service URL if global rate limiting is enabled.
if r.EnvoyGateway.RateLimit != nil {
t.GlobalRateLimit = &translator.GlobalRateLimitSettings{
ServiceURL: ratelimit.GetServiceURL(r.Namespace, r.DNSDomain),
FailClosed: r.EnvoyGateway.RateLimit.FailClosed,
}
if r.EnvoyGateway.RateLimit.Timeout != nil {
t.GlobalRateLimit.Timeout = r.EnvoyGateway.RateLimit.Timeout.Duration
}
}
result, err := t.Translate(val)
if err != nil {
r.Logger.Error(err, "failed to translate xds ir")
errChan <- err
}
// xDS translation is done in a best-effort manner, so the result
// may contain partial resources even if there are errors.
if result == nil {
r.Logger.Info("no xds resources to publish")
return
}
// Get all status keys from watchable and save them in the map statusesToDelete.
// Iterating through result.EnvoyPatchPolicyStatuses, any valid keys will be removed from statusesToDelete.
// Remaining keys will be deleted from watchable before we exit this function.
statusesToDelete := make(map[ktypes.NamespacedName]bool)
for key := range r.ProviderResources.EnvoyPatchPolicyStatuses.LoadAll() {
statusesToDelete[key] = true
}
// Publish EnvoyPatchPolicyStatus
for _, e := range result.EnvoyPatchPolicyStatuses {
key := ktypes.NamespacedName{
Name: e.Name,
Namespace: e.Namespace,
}
// Skip updating status for policies with empty status
// They may have been skipped in this translation because
// their target is not found (not relevant)
if !(reflect.ValueOf(e.Status).IsZero()) {
r.ProviderResources.EnvoyPatchPolicyStatuses.Store(key, e.Status)
}
delete(statusesToDelete, key)
}
// Discard the EnvoyPatchPolicyStatuses to reduce memory footprint
result.EnvoyPatchPolicyStatuses = nil
// Publish
r.Xds.Store(key, result)
// Delete all the deletable status keys
for key := range statusesToDelete {
r.ProviderResources.EnvoyPatchPolicyStatuses.Delete(key)
}
}
},
)
r.Logger.Info("subscriber shutting down")
}