diff --git a/.changelog/18068.txt b/.changelog/18068.txt new file mode 100644 index 000000000000..be55ad365877 --- /dev/null +++ b/.changelog/18068.txt @@ -0,0 +1,3 @@ +```release-note:bug +xds: Prevent partial application of non-Required Envoy extensions in the case of failure. +``` \ No newline at end of file diff --git a/agent/xds/delta.go b/agent/xds/delta.go index f84b633a852b..ef3218dbb58d 100644 --- a/agent/xds/delta.go +++ b/agent/xds/delta.go @@ -258,7 +258,7 @@ func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discove s.ResourceMapMutateFn(newResourceMap) } - if err = s.applyEnvoyExtensions(newResourceMap, cfgSnap, node); err != nil { + if newResourceMap, err = s.applyEnvoyExtensions(newResourceMap, cfgSnap, node); err != nil { // err is already the result of calling status.Errorf return err } @@ -403,30 +403,30 @@ func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discove } } -func (s *Server) applyEnvoyExtensions(resources *xdscommon.IndexedResources, cfgSnap *proxycfg.ConfigSnapshot, node *envoy_config_core_v3.Node) error { +func (s *Server) applyEnvoyExtensions(resources *xdscommon.IndexedResources, cfgSnap *proxycfg.ConfigSnapshot, node *envoy_config_core_v3.Node) (*xdscommon.IndexedResources, error) { var err error envoyVersion := xdscommon.DetermineEnvoyVersionFromNode(node) consulVersion, err := goversion.NewVersion(version.Version) if err != nil { - return status.Errorf(codes.InvalidArgument, "failed to parse Consul version") + return nil, status.Errorf(codes.InvalidArgument, "failed to parse Consul version") } serviceConfigs := extensionruntime.GetRuntimeConfigurations(cfgSnap) for _, cfgs := range serviceConfigs { for _, cfg := range cfgs { - err = applyEnvoyExtension(s.Logger, cfgSnap, resources, cfg, envoyVersion, consulVersion) + resources, err = validateAndApplyEnvoyExtension(s.Logger, cfgSnap, resources, cfg, envoyVersion, consulVersion) if err != nil { - return err + return nil, err } } } - return nil + return resources, nil } -func applyEnvoyExtension(logger hclog.Logger, cfgSnap *proxycfg.ConfigSnapshot, resources *xdscommon.IndexedResources, runtimeConfig extensioncommon.RuntimeConfig, envoyVersion, consulVersion *goversion.Version) error { +func validateAndApplyEnvoyExtension(logger hclog.Logger, cfgSnap *proxycfg.ConfigSnapshot, resources *xdscommon.IndexedResources, runtimeConfig extensioncommon.RuntimeConfig, envoyVersion, consulVersion *goversion.Version) (*xdscommon.IndexedResources, error) { logFn := logger.Warn if runtimeConfig.EnvoyExtension.Required { logFn = logger.Error @@ -460,14 +460,14 @@ func applyEnvoyExtension(logger hclog.Logger, cfgSnap *proxycfg.ConfigSnapshot, logFn("failed to parse Envoy extension version constraint", errorParams...) if ext.Required { - return status.Errorf(codes.InvalidArgument, "failed to parse Envoy version constraint for extension %q for service %q", ext.Name, svc.Name) + return nil, status.Errorf(codes.InvalidArgument, "failed to parse Envoy version constraint for extension %q for service %q", ext.Name, svc.Name) } - return nil + return resources, nil } if !c.Check(envoyVersion) { logger.Info("skipping envoy extension due to Envoy version constraint violation", errorParams...) - return nil + return resources, nil } } @@ -477,14 +477,14 @@ func applyEnvoyExtension(logger hclog.Logger, cfgSnap *proxycfg.ConfigSnapshot, logFn("failed to parse Consul extension version constraint", errorParams...) if ext.Required { - return status.Errorf(codes.InvalidArgument, "failed to parse Consul version constraint for extension %q for service %q", ext.Name, svc.Name) + return nil, status.Errorf(codes.InvalidArgument, "failed to parse Consul version constraint for extension %q for service %q", ext.Name, svc.Name) } - return nil + return resources, nil } if !c.Check(consulVersion) { logger.Info("skipping envoy extension due to Consul version constraint violation", errorParams...) - return nil + return resources, nil } } @@ -496,10 +496,10 @@ func applyEnvoyExtension(logger hclog.Logger, cfgSnap *proxycfg.ConfigSnapshot, logFn("failed to construct extension", errorParams...) if ext.Required { - return status.Errorf(codes.InvalidArgument, "failed to construct extension %q for service %q", ext.Name, svc.Name) + return nil, status.Errorf(codes.InvalidArgument, "failed to construct extension %q for service %q", ext.Name, svc.Name) } - return nil + return resources, nil } now = time.Now() @@ -510,25 +510,49 @@ func applyEnvoyExtension(logger hclog.Logger, cfgSnap *proxycfg.ConfigSnapshot, logFn("failed to validate extension arguments", errorParams...) if ext.Required { - return status.Errorf(codes.InvalidArgument, "failed to validate arguments for extension %q for service %q", ext.Name, svc.Name) + return nil, status.Errorf(codes.InvalidArgument, "failed to validate arguments for extension %q for service %q", ext.Name, svc.Name) } - return nil + return resources, nil } now = time.Now() - _, err = extender.Extend(resources, &runtimeConfig) + resources, err = applyEnvoyExtension(extender, resources, &runtimeConfig) metrics.MeasureSinceWithLabels([]string{"envoy_extension", "extend"}, now, getMetricLabels(err)) if err != nil { errorParams = append(errorParams, "error", err) logFn("failed to apply envoy extension", errorParams...) if ext.Required { - return status.Errorf(codes.InvalidArgument, "failed to patch xDS resources in the %q extension: %v", ext.Name, err) + return nil, status.Errorf(codes.InvalidArgument, "failed to patch xDS resources in the %q extension: %v", ext.Name, err) } } - return nil + return resources, nil +} + +// applyEnvoyExtension makes a copy of the provided IndexedResources, then applies the given extension to them. +// The copy ensures against partial application if a non-required extension modifies a resource then fails at a later +// stage; this is necessary because IndexedResources and its proto messages are all passed by reference, and +// non-required extensions do not lead to a terminal failure in xDS updates. +// +// If the application is successful, the modified copy is returned. If not, the original and an error is returned. +// Returning resources in either case allows for applying extensions in a loop and reporting on non-required extension +// failures simultaneously. +func applyEnvoyExtension(extender extensioncommon.EnvoyExtender, resources *xdscommon.IndexedResources, runtimeConfig *extensioncommon.RuntimeConfig) (*xdscommon.IndexedResources, error) { + // First check whether the extension is eligible for application in the current enviroment. + // Do this before copying indexed resources for the sake of efficiency. + if !extender.CanApply(runtimeConfig) { + return resources, nil + } + + resourcesCopy := xdscommon.Clone(resources) + newResources, err := extender.Extend(resourcesCopy, runtimeConfig) + if err != nil { + return resources, err + } + + return newResources, nil } // https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol#eventual-consistency-considerations diff --git a/agent/xds/delta_envoy_extender_oss_test.go b/agent/xds/delta_envoy_extender_oss_test.go index 4da20dfb3153..10411e353cec 100644 --- a/agent/xds/delta_envoy_extender_oss_test.go +++ b/agent/xds/delta_envoy_extender_oss_test.go @@ -748,7 +748,7 @@ end`, cfgs := extensionruntime.GetRuntimeConfigurations(snap) for _, extensions := range cfgs { for _, ext := range extensions { - err := applyEnvoyExtension(hclog.NewNullLogger(), snap, indexedResources, ext, parsedEnvoyVersion, consulVersion) + indexedResources, err = validateAndApplyEnvoyExtension(hclog.NewNullLogger(), snap, indexedResources, ext, parsedEnvoyVersion, consulVersion) require.NoError(t, err) } } diff --git a/agent/xds/delta_test.go b/agent/xds/delta_test.go index f9c77835ad11..b6c1a8d1b9ea 100644 --- a/agent/xds/delta_test.go +++ b/agent/xds/delta_test.go @@ -5,6 +5,7 @@ package xds import ( "errors" + "fmt" "strconv" "strings" "sync" @@ -13,6 +14,8 @@ import ( "time" "github.com/armon/go-metrics" + envoy_cluster_v3 "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" + envoy_listener_v3 "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" envoy_discovery_v3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" "github.com/hashicorp/go-hclog" goversion "github.com/hashicorp/go-version" @@ -1613,7 +1616,7 @@ func requireExtensionMetrics( } } -func Test_applyEnvoyExtension_Validations(t *testing.T) { +func Test_validateAndApplyEnvoyExtension_Validations(t *testing.T) { type testCase struct { name string runtimeConfig extensioncommon.RuntimeConfig @@ -1707,7 +1710,7 @@ func Test_applyEnvoyExtension_Validations(t *testing.T) { ServiceID: structs.NewServiceID("s1", nil), }, } - err := applyEnvoyExtension(hclog.NewNullLogger(), &snap, nil, tc.runtimeConfig, envoyVersion, consulVersion) + _, err := validateAndApplyEnvoyExtension(hclog.NewNullLogger(), &snap, nil, tc.runtimeConfig, envoyVersion, consulVersion) if tc.err { require.Error(t, err) require.Contains(t, err.Error(), tc.errString) @@ -1717,3 +1720,211 @@ func Test_applyEnvoyExtension_Validations(t *testing.T) { }) } } + +func Test_applyEnvoyExtension_CanApply(t *testing.T) { + type testCase struct { + canApply bool + } + + cases := map[string]testCase{ + "cannot apply: is not applied": { + canApply: false, + }, + "can apply: is applied": { + canApply: true, + }, + } + + for n, tc := range cases { + t.Run(n, func(t *testing.T) { + extender := extensioncommon.BasicEnvoyExtender{ + Extension: &maybeCanApplyExtension{ + canApply: tc.canApply, + }, + } + config := &extensioncommon.RuntimeConfig{ + Kind: api.ServiceKindConnectProxy, + ServiceName: api.CompoundServiceName{Name: "api"}, + Upstreams: map[api.CompoundServiceName]*extensioncommon.UpstreamData{}, + IsSourcedFromUpstream: false, + EnvoyExtension: api.EnvoyExtension{ + Name: "maybeCanApplyExtension", + Required: false, + }, + } + listener := &envoy_listener_v3.Listener{ + Name: xdscommon.OutboundListenerName, + IgnoreGlobalConnLimit: false, + } + indexedResources := xdscommon.IndexResources(testutil.Logger(t), map[string][]proto.Message{ + xdscommon.ListenerType: { + listener, + }, + }) + + result, err := applyEnvoyExtension(&extender, indexedResources, config) + require.NoError(t, err) + resultListener := result.Index[xdscommon.ListenerType][xdscommon.OutboundListenerName].(*envoy_listener_v3.Listener) + require.Equal(t, tc.canApply, resultListener.IgnoreGlobalConnLimit) + }) + } +} + +func Test_applyEnvoyExtension_PartialApplicationDisallowed(t *testing.T) { + type testCase struct { + fail bool + returnOnFailure bool + expectModified bool + } + + cases := map[string]testCase{ + "failure: returns nothing": { + fail: true, + returnOnFailure: false, + expectModified: false, + }, + // Not expected, but cover to be sure. + "failure: returns values": { + fail: true, + returnOnFailure: true, + expectModified: false, + }, + // Ensure that under normal circumstances, the extension would succeed in + // modifying resources. + "success: resources modified": { + fail: false, + expectModified: true, + }, + } + + for n, tc := range cases { + for _, indexType := range []string{ + xdscommon.ListenerType, + xdscommon.ClusterType, + } { + typeShortName := indexType[strings.LastIndex(indexType, ".")+1:] + t.Run(fmt.Sprintf("%s: %s", n, typeShortName), func(t *testing.T) { + extender := extensioncommon.BasicEnvoyExtender{ + Extension: &partialFailureExtension{ + returnOnFailure: tc.returnOnFailure, + // Alternate which resource fails so that we can test for + // partial modification independent of patch order. + failListener: tc.fail && indexType == xdscommon.ListenerType, + failCluster: tc.fail && indexType == xdscommon.ClusterType, + }, + } + config := &extensioncommon.RuntimeConfig{ + Kind: api.ServiceKindConnectProxy, + ServiceName: api.CompoundServiceName{Name: "api"}, + Upstreams: map[api.CompoundServiceName]*extensioncommon.UpstreamData{}, + IsSourcedFromUpstream: false, + EnvoyExtension: api.EnvoyExtension{ + Name: "partialFailureExtension", + Required: false, + }, + } + cluster := &envoy_cluster_v3.Cluster{ + Name: xdscommon.LocalAppClusterName, + RespectDnsTtl: false, + } + listener := &envoy_listener_v3.Listener{ + Name: xdscommon.OutboundListenerName, + IgnoreGlobalConnLimit: false, + } + indexedResources := xdscommon.IndexResources(testutil.Logger(t), map[string][]proto.Message{ + xdscommon.ClusterType: { + cluster, + }, + xdscommon.ListenerType: { + listener, + }, + }) + + result, err := applyEnvoyExtension(&extender, indexedResources, config) + if tc.fail { + require.Error(t, err) + } else { + require.NoError(t, err) + } + + resultListener := result.Index[xdscommon.ListenerType][xdscommon.OutboundListenerName].(*envoy_listener_v3.Listener) + resultCluster := result.Index[xdscommon.ClusterType][xdscommon.LocalAppClusterName].(*envoy_cluster_v3.Cluster) + require.Equal(t, tc.expectModified, resultListener.IgnoreGlobalConnLimit) + require.Equal(t, tc.expectModified, resultCluster.RespectDnsTtl) + + // Regardless of success, original values should not be modified. + originalListener := indexedResources.Index[xdscommon.ListenerType][xdscommon.OutboundListenerName].(*envoy_listener_v3.Listener) + originalCluster := indexedResources.Index[xdscommon.ClusterType][xdscommon.LocalAppClusterName].(*envoy_cluster_v3.Cluster) + require.False(t, originalListener.IgnoreGlobalConnLimit) + require.False(t, originalCluster.RespectDnsTtl) + }) + } + } +} + +type maybeCanApplyExtension struct { + extensioncommon.BasicExtensionAdapter + canApply bool +} + +var _ extensioncommon.BasicExtension = (*maybeCanApplyExtension)(nil) + +func (m *maybeCanApplyExtension) CanApply(_ *extensioncommon.RuntimeConfig) bool { + return m.canApply +} + +func (m *maybeCanApplyExtension) PatchListener(payload extensioncommon.ListenerPayload) (*envoy_listener_v3.Listener, bool, error) { + payload.Message.IgnoreGlobalConnLimit = true + return payload.Message, true, nil +} + +type partialFailureExtension struct { + extensioncommon.BasicExtensionAdapter + returnOnFailure bool + failCluster bool + failListener bool +} + +var _ extensioncommon.BasicExtension = (*partialFailureExtension)(nil) + +func (p *partialFailureExtension) CanApply(_ *extensioncommon.RuntimeConfig) bool { + return true +} + +func (p *partialFailureExtension) PatchListener(payload extensioncommon.ListenerPayload) (*envoy_listener_v3.Listener, bool, error) { + // Modify original input message + payload.Message.IgnoreGlobalConnLimit = true + + err := fmt.Errorf("oops - listener patch failed") + if !p.failListener { + err = nil + } + + returnMsg := payload.Message + if err != nil && !p.returnOnFailure { + returnMsg = nil + } + + patched := err == nil || p.returnOnFailure + + return returnMsg, patched, err +} + +func (p *partialFailureExtension) PatchCluster(payload extensioncommon.ClusterPayload) (*envoy_cluster_v3.Cluster, bool, error) { + // Modify original input message + payload.Message.RespectDnsTtl = true + + err := fmt.Errorf("oops - cluster patch failed") + if !p.failCluster { + err = nil + } + + returnMsg := payload.Message + if err != nil && !p.returnOnFailure { + returnMsg = nil + } + + patched := err == nil || p.returnOnFailure + + return returnMsg, patched, err +} diff --git a/envoyextensions/extensioncommon/basic_envoy_extender.go b/envoyextensions/extensioncommon/basic_envoy_extender.go index eee8cc32fb8d..a99d7439fea8 100644 --- a/envoyextensions/extensioncommon/basic_envoy_extender.go +++ b/envoyextensions/extensioncommon/basic_envoy_extender.go @@ -103,6 +103,10 @@ type BasicEnvoyExtender struct { Extension BasicExtension } +func (b *BasicEnvoyExtender) CanApply(config *RuntimeConfig) bool { + return b.Extension.CanApply(config) +} + func (b *BasicEnvoyExtender) Validate(config *RuntimeConfig) error { return b.Extension.Validate(config) } @@ -123,10 +127,6 @@ func (b *BasicEnvoyExtender) Extend(resources *xdscommon.IndexedResources, confi return resources, nil } - if !b.Extension.CanApply(config) { - return resources, nil - } - clusters := make(ClusterMap) clusterLoadAssignments := make(ClusterLoadAssignmentMap) routes := make(RouteMap) diff --git a/envoyextensions/extensioncommon/envoy_extender.go b/envoyextensions/extensioncommon/envoy_extender.go index 8cf0bc289e12..0c8f9141c925 100644 --- a/envoyextensions/extensioncommon/envoy_extender.go +++ b/envoyextensions/extensioncommon/envoy_extender.go @@ -11,6 +11,10 @@ import ( // to be dynamically executed during runtime. type EnvoyExtender interface { + // CanApply checks whether the extension configured for this extender is eligible + // for application based on the specified RuntimeConfig. + CanApply(*RuntimeConfig) bool + // Validate ensures the data in config can successfuly be used // to apply the specified Envoy extension. Validate(*RuntimeConfig) error diff --git a/envoyextensions/extensioncommon/upstream_envoy_extender.go b/envoyextensions/extensioncommon/upstream_envoy_extender.go index 0df0d0049d75..135aca3f82e7 100644 --- a/envoyextensions/extensioncommon/upstream_envoy_extender.go +++ b/envoyextensions/extensioncommon/upstream_envoy_extender.go @@ -30,6 +30,10 @@ type UpstreamEnvoyExtender struct { var _ EnvoyExtender = (*UpstreamEnvoyExtender)(nil) +func (ext *UpstreamEnvoyExtender) CanApply(config *RuntimeConfig) bool { + return ext.Extension.CanApply(config) +} + func (ext *UpstreamEnvoyExtender) Validate(_ *RuntimeConfig) error { return nil } @@ -56,10 +60,6 @@ func (ext *UpstreamEnvoyExtender) Extend(resources *xdscommon.IndexedResources, return resources, nil } - if !ext.Extension.CanApply(config) { - return resources, nil - } - for _, indexType := range []string{ xdscommon.ListenerType, xdscommon.RouteType, diff --git a/envoyextensions/xdscommon/xdscommon.go b/envoyextensions/xdscommon/xdscommon.go index e4d293f8c9bc..637d452259f6 100644 --- a/envoyextensions/xdscommon/xdscommon.go +++ b/envoyextensions/xdscommon/xdscommon.go @@ -66,6 +66,25 @@ type IndexedResources struct { ChildIndex map[string]map[string][]string } +// Clone makes a deep copy of the IndexedResources value at the given pointer and +// returns a pointer to the copy. +func Clone(i *IndexedResources) *IndexedResources { + iCopy := EmptyIndexedResources() + + for typeURL, typeMap := range i.Index { + for name, msg := range typeMap { + iCopy.Index[typeURL][name] = proto.Clone(msg) + } + } + for typeURL, parentMap := range i.ChildIndex { + for name, childName := range parentMap { + iCopy.ChildIndex[typeURL][name] = childName + } + } + + return iCopy +} + func IndexResources(logger hclog.Logger, resources map[string][]proto.Message) *IndexedResources { data := EmptyIndexedResources()