From 956b7351aaebbcaae8811fb11a1c725c407719fc Mon Sep 17 00:00:00 2001 From: Michael Bolot Date: Fri, 16 Dec 2022 10:12:57 -0600 Subject: [PATCH] Initial implmentation of warning headers Attempts to pass through warning headers which k8s returns. Requires an update to rancher/apiserver. --- go.mod | 2 +- go.sum | 4 +- pkg/client/factory.go | 51 +++++------ pkg/resources/cluster/apply.go | 4 +- pkg/stores/partition/parallel.go | 6 +- pkg/stores/partition/store.go | 39 ++++----- pkg/stores/partition/store_test.go | 28 +++---- pkg/stores/proxy/proxy_store.go | 130 +++++++++++++++++------------ pkg/stores/proxy/rbac_store.go | 2 +- 9 files changed, 148 insertions(+), 118 deletions(-) diff --git a/go.mod b/go.mod index 52b7f069..171be8e1 100644 --- a/go.mod +++ b/go.mod @@ -18,7 +18,7 @@ require ( github.com/pborman/uuid v1.2.0 github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.12.1 - github.com/rancher/apiserver v0.0.0-20221220225852-94cba4f28cfd + github.com/rancher/apiserver v0.0.0-20221229135954-26bed53611c4 github.com/rancher/dynamiclistener v0.3.5 github.com/rancher/kubernetes-provider-detector v0.1.2 github.com/rancher/norman v0.0.0-20221205184727-32ef2e185b99 diff --git a/go.sum b/go.sum index 6c0994d4..dd01376e 100644 --- a/go.sum +++ b/go.sum @@ -502,8 +502,8 @@ github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1 github.com/prometheus/procfs v0.7.3 h1:4jVXhlkAyzOScmCkXBTOLRLTz8EeU+eyjrwB/EPq0VU= github.com/prometheus/procfs v0.7.3/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU= -github.com/rancher/apiserver v0.0.0-20221220225852-94cba4f28cfd h1:g0hNrbONfmY4lxvrD2q9KkueYYY4wKUYscm6Ih0QfQ0= -github.com/rancher/apiserver v0.0.0-20221220225852-94cba4f28cfd/go.mod h1:xwQhXv3XFxWfA6tLa4ZeaERu8ldNbyKv2sF+mT+c5WA= +github.com/rancher/apiserver v0.0.0-20221229135954-26bed53611c4 h1:eOq/tiwMCzcwexrbUQ9Agd9PHhwwtH9/G4usE0MoN8s= +github.com/rancher/apiserver v0.0.0-20221229135954-26bed53611c4/go.mod h1:xwQhXv3XFxWfA6tLa4ZeaERu8ldNbyKv2sF+mT+c5WA= github.com/rancher/client-go v1.25.4-rancher1 h1:9MlBC8QbgngUkhNzMR8rZmmCIj6WNRHFOnYiwC2Kty4= github.com/rancher/client-go v1.25.4-rancher1/go.mod h1:8trHCAC83XKY0wsBIpbirZU4NTUpbuhc2JnI7OruGZw= github.com/rancher/dynamiclistener v0.3.5 h1:5TaIHvkDGmZKvc96Huur16zfTKOiLhDtK4S+WV0JA6A= diff --git a/pkg/client/factory.go b/pkg/client/factory.go index 064d2b10..09c3f8f4 100644 --- a/pkg/client/factory.go +++ b/pkg/client/factory.go @@ -111,52 +111,52 @@ func (p *Factory) AdminK8sInterface() (kubernetes.Interface, error) { return kubernetes.NewForConfig(p.clientCfg) } -func (p *Factory) DynamicClient(ctx *types.APIRequest) (dynamic.Interface, error) { - return newDynamicClient(ctx, p.clientCfg, p.impersonate) +func (p *Factory) DynamicClient(ctx *types.APIRequest, warningHandler rest.WarningHandler) (dynamic.Interface, error) { + return newDynamicClient(ctx, p.clientCfg, p.impersonate, warningHandler) } -func (p *Factory) Client(ctx *types.APIRequest, s *types.APISchema, namespace string) (dynamic.ResourceInterface, error) { - return newClient(ctx, p.clientCfg, s, namespace, p.impersonate) +func (p *Factory) Client(ctx *types.APIRequest, s *types.APISchema, namespace string, warningHandler rest.WarningHandler) (dynamic.ResourceInterface, error) { + return newClient(ctx, p.clientCfg, s, namespace, p.impersonate, warningHandler) } -func (p *Factory) AdminClient(ctx *types.APIRequest, s *types.APISchema, namespace string) (dynamic.ResourceInterface, error) { - return newClient(ctx, p.clientCfg, s, namespace, false) +func (p *Factory) AdminClient(ctx *types.APIRequest, s *types.APISchema, namespace string, warningHandler rest.WarningHandler) (dynamic.ResourceInterface, error) { + return newClient(ctx, p.clientCfg, s, namespace, false, warningHandler) } -func (p *Factory) ClientForWatch(ctx *types.APIRequest, s *types.APISchema, namespace string) (dynamic.ResourceInterface, error) { - return newClient(ctx, p.watchClientCfg, s, namespace, p.impersonate) +func (p *Factory) ClientForWatch(ctx *types.APIRequest, s *types.APISchema, namespace string, warningHandler rest.WarningHandler) (dynamic.ResourceInterface, error) { + return newClient(ctx, p.clientCfg, s, namespace, p.impersonate, warningHandler) } -func (p *Factory) AdminClientForWatch(ctx *types.APIRequest, s *types.APISchema, namespace string) (dynamic.ResourceInterface, error) { - return newClient(ctx, p.watchClientCfg, s, namespace, false) +func (p *Factory) AdminClientForWatch(ctx *types.APIRequest, s *types.APISchema, namespace string, warningHandler rest.WarningHandler) (dynamic.ResourceInterface, error) { + return newClient(ctx, p.clientCfg, s, namespace, false, warningHandler) } -func (p *Factory) TableClient(ctx *types.APIRequest, s *types.APISchema, namespace string) (dynamic.ResourceInterface, error) { +func (p *Factory) TableClient(ctx *types.APIRequest, s *types.APISchema, namespace string, warningHandler rest.WarningHandler) (dynamic.ResourceInterface, error) { if attributes.Table(s) { - return newClient(ctx, p.tableClientCfg, s, namespace, p.impersonate) + return newClient(ctx, p.clientCfg, s, namespace, p.impersonate, warningHandler) } - return p.Client(ctx, s, namespace) + return p.Client(ctx, s, namespace, warningHandler) } -func (p *Factory) TableAdminClient(ctx *types.APIRequest, s *types.APISchema, namespace string) (dynamic.ResourceInterface, error) { +func (p *Factory) TableAdminClient(ctx *types.APIRequest, s *types.APISchema, namespace string, warningHandler rest.WarningHandler) (dynamic.ResourceInterface, error) { if attributes.Table(s) { - return newClient(ctx, p.tableClientCfg, s, namespace, false) + return newClient(ctx, p.clientCfg, s, namespace, false, warningHandler) } - return p.AdminClient(ctx, s, namespace) + return p.AdminClient(ctx, s, namespace, warningHandler) } -func (p *Factory) TableClientForWatch(ctx *types.APIRequest, s *types.APISchema, namespace string) (dynamic.ResourceInterface, error) { +func (p *Factory) TableClientForWatch(ctx *types.APIRequest, s *types.APISchema, namespace string, warningHandler rest.WarningHandler) (dynamic.ResourceInterface, error) { if attributes.Table(s) { - return newClient(ctx, p.tableWatchClientCfg, s, namespace, p.impersonate) + return newClient(ctx, p.clientCfg, s, namespace, p.impersonate, warningHandler) } - return p.ClientForWatch(ctx, s, namespace) + return p.ClientForWatch(ctx, s, namespace, warningHandler) } -func (p *Factory) TableAdminClientForWatch(ctx *types.APIRequest, s *types.APISchema, namespace string) (dynamic.ResourceInterface, error) { +func (p *Factory) TableAdminClientForWatch(ctx *types.APIRequest, s *types.APISchema, namespace string, warningHandler rest.WarningHandler) (dynamic.ResourceInterface, error) { if attributes.Table(s) { - return newClient(ctx, p.tableWatchClientCfg, s, namespace, false) + return newClient(ctx, p.clientCfg, s, namespace, false, warningHandler) } - return p.AdminClientForWatch(ctx, s, namespace) + return p.AdminClientForWatch(ctx, s, namespace, warningHandler) } func setupConfig(ctx *types.APIRequest, cfg *rest.Config, impersonate bool) (*rest.Config, error) { @@ -173,8 +173,9 @@ func setupConfig(ctx *types.APIRequest, cfg *rest.Config, impersonate bool) (*re return cfg, nil } -func newDynamicClient(ctx *types.APIRequest, cfg *rest.Config, impersonate bool) (dynamic.Interface, error) { +func newDynamicClient(ctx *types.APIRequest, cfg *rest.Config, impersonate bool, warningHandler rest.WarningHandler) (dynamic.Interface, error) { cfg, err := setupConfig(ctx, cfg, impersonate) + cfg.WarningHandler = warningHandler if err != nil { return nil, err } @@ -182,8 +183,8 @@ func newDynamicClient(ctx *types.APIRequest, cfg *rest.Config, impersonate bool) return dynamic.NewForConfig(cfg) } -func newClient(ctx *types.APIRequest, cfg *rest.Config, s *types.APISchema, namespace string, impersonate bool) (dynamic.ResourceInterface, error) { - client, err := newDynamicClient(ctx, cfg, impersonate) +func newClient(ctx *types.APIRequest, cfg *rest.Config, s *types.APISchema, namespace string, impersonate bool, warningHandler rest.WarningHandler) (dynamic.ResourceInterface, error) { + client, err := newDynamicClient(ctx, cfg, impersonate, warningHandler) if err != nil { return nil, err } diff --git a/pkg/resources/cluster/apply.go b/pkg/resources/cluster/apply.go index c36eb989..9e35257c 100644 --- a/pkg/resources/cluster/apply.go +++ b/pkg/resources/cluster/apply.go @@ -17,6 +17,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/dynamic" + "k8s.io/client-go/rest" ) type Apply struct { @@ -110,7 +111,8 @@ func (a *Apply) createApply(apiContext *types.APIRequest) (apply.Apply, error) { } apply := apply.New(client.Discovery(), func(gvr schema.GroupVersionResource) (dynamic.NamespaceableResourceInterface, error) { - dynamicClient, err := a.cg.DynamicClient(apiContext) + // don't record warnings from apply + dynamicClient, err := a.cg.DynamicClient(apiContext, rest.NoWarnings{}) if err != nil { return nil, err } diff --git a/pkg/stores/partition/parallel.go b/pkg/stores/partition/parallel.go index 35403aae..5cf97b56 100644 --- a/pkg/stores/partition/parallel.go +++ b/pkg/stores/partition/parallel.go @@ -5,6 +5,8 @@ import ( "encoding/base64" "encoding/json" + "github.com/rancher/apiserver/pkg/types" + "golang.org/x/sync/errgroup" "golang.org/x/sync/semaphore" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" @@ -33,7 +35,7 @@ type ParallelPartitionLister struct { } // PartitionLister lists objects for one partition. -type PartitionLister func(ctx context.Context, partition Partition, cont string, revision string, limit int) (*unstructured.UnstructuredList, error) +type PartitionLister func(ctx context.Context, partition Partition, cont string, revision string, limit int) (*unstructured.UnstructuredList, []types.Warning, error) // Err returns the latest error encountered. func (p *ParallelPartitionLister) Err() error { @@ -174,7 +176,7 @@ func (p *ParallelPartitionLister) feeder(ctx context.Context, state listState, l if partition.Name() == state.PartitionName { cont = state.Continue } - list, err := p.Lister(ctx, partition, cont, state.Revision, limit) + list, _, err := p.Lister(ctx, partition, cont, state.Revision, limit) if err != nil { return err } diff --git a/pkg/stores/partition/store.go b/pkg/stores/partition/store.go index 1efbda70..185a6cb5 100644 --- a/pkg/stores/partition/store.go +++ b/pkg/stores/partition/store.go @@ -79,11 +79,11 @@ type cacheKey struct { // UnstructuredStore is like types.Store but deals in k8s unstructured objects instead of apiserver types. type UnstructuredStore interface { - ByID(apiOp *types.APIRequest, schema *types.APISchema, id string) (*unstructured.Unstructured, error) - List(apiOp *types.APIRequest, schema *types.APISchema) (*unstructured.UnstructuredList, error) - Create(apiOp *types.APIRequest, schema *types.APISchema, data types.APIObject) (*unstructured.Unstructured, error) - Update(apiOp *types.APIRequest, schema *types.APISchema, data types.APIObject, id string) (*unstructured.Unstructured, error) - Delete(apiOp *types.APIRequest, schema *types.APISchema, id string) (*unstructured.Unstructured, error) + ByID(apiOp *types.APIRequest, schema *types.APISchema, id string) (*unstructured.Unstructured, []types.Warning, error) + List(apiOp *types.APIRequest, schema *types.APISchema) (*unstructured.UnstructuredList, []types.Warning, error) + Create(apiOp *types.APIRequest, schema *types.APISchema, data types.APIObject) (*unstructured.Unstructured, []types.Warning, error) + Update(apiOp *types.APIRequest, schema *types.APISchema, data types.APIObject, id string) (*unstructured.Unstructured, []types.Warning, error) + Delete(apiOp *types.APIRequest, schema *types.APISchema, id string) (*unstructured.Unstructured, []types.Warning, error) Watch(apiOp *types.APIRequest, schema *types.APISchema, w types.WatchRequest) (chan watch.Event, error) } @@ -103,11 +103,11 @@ func (s *Store) Delete(apiOp *types.APIRequest, schema *types.APISchema, id stri return types.APIObject{}, err } - obj, err := target.Delete(apiOp, schema, id) + obj, warnings, err := target.Delete(apiOp, schema, id) if err != nil { return types.APIObject{}, err } - return toAPI(schema, obj), nil + return toAPI(schema, obj, warnings), nil } // ByID looks up a single object by its ID. @@ -117,18 +117,18 @@ func (s *Store) ByID(apiOp *types.APIRequest, schema *types.APISchema, id string return types.APIObject{}, err } - obj, err := target.ByID(apiOp, schema, id) + obj, warnings, err := target.ByID(apiOp, schema, id) if err != nil { return types.APIObject{}, err } - return toAPI(schema, obj), nil + return toAPI(schema, obj, warnings), nil } func (s *Store) listPartition(ctx context.Context, apiOp *types.APIRequest, schema *types.APISchema, partition Partition, - cont string, revision string, limit int) (*unstructured.UnstructuredList, error) { + cont string, revision string, limit int) (*unstructured.UnstructuredList, []types.Warning, error) { store, err := s.Partitioner.Store(apiOp, partition) if err != nil { - return nil, err + return nil, nil, err } req := apiOp.Clone() @@ -163,7 +163,7 @@ func (s *Store) List(apiOp *types.APIRequest, schema *types.APISchema) (types.AP } lister := ParallelPartitionLister{ - Lister: func(ctx context.Context, partition Partition, cont string, revision string, limit int) (*unstructured.UnstructuredList, error) { + Lister: func(ctx context.Context, partition Partition, cont string, revision string, limit int) (*unstructured.UnstructuredList, []types.Warning, error) { return s.listPartition(ctx, apiOp, schema, partition, cont, revision, limit) }, Concurrency: 3, @@ -217,7 +217,7 @@ func (s *Store) List(apiOp *types.APIRequest, schema *types.APISchema) (types.AP for _, item := range list { item := item - result.Objects = append(result.Objects, toAPI(schema, &item)) + result.Objects = append(result.Objects, toAPI(schema, &item, nil)) } result.Revision = key.revision @@ -254,11 +254,11 @@ func (s *Store) Create(apiOp *types.APIRequest, schema *types.APISchema, data ty return types.APIObject{}, err } - obj, err := target.Create(apiOp, schema, data) + obj, warnings, err := target.Create(apiOp, schema, data) if err != nil { return types.APIObject{}, err } - return toAPI(schema, obj), nil + return toAPI(schema, obj, warnings), nil } // Update updates a single object in the store. @@ -268,11 +268,11 @@ func (s *Store) Update(apiOp *types.APIRequest, schema *types.APISchema, data ty return types.APIObject{}, err } - obj, err := target.Update(apiOp, schema, data, id) + obj, warnings, err := target.Update(apiOp, schema, data, id) if err != nil { return types.APIObject{}, err } - return toAPI(schema, obj), nil + return toAPI(schema, obj, warnings), nil } // Watch returns a channel of events for a list or resource. @@ -318,7 +318,7 @@ func (s *Store) Watch(apiOp *types.APIRequest, schema *types.APISchema, wr types return response, nil } -func toAPI(schema *types.APISchema, obj runtime.Object) types.APIObject { +func toAPI(schema *types.APISchema, obj runtime.Object, warnings []types.Warning) types.APIObject { if obj == nil || reflect.ValueOf(obj).IsNil() { return types.APIObject{} } @@ -344,6 +344,7 @@ func toAPI(schema *types.APISchema, obj runtime.Object) types.APIObject { } apiObject.ID = id + apiObject.Warnings = warnings return apiObject } @@ -384,7 +385,7 @@ func toAPIEvent(apiOp *types.APIRequest, schema *types.APISchema, event watch.Ev return apiEvent } - apiEvent.Object = toAPI(schema, event.Object) + apiEvent.Object = toAPI(schema, event.Object, nil) m, err := meta.Accessor(event.Object) if err != nil { diff --git a/pkg/stores/partition/store_test.go b/pkg/stores/partition/store_test.go index 42c8b271..72d668fd 100644 --- a/pkg/stores/partition/store_test.go +++ b/pkg/stores/partition/store_test.go @@ -1881,12 +1881,12 @@ type mockStore struct { called int } -func (m *mockStore) List(apiOp *types.APIRequest, schema *types.APISchema) (*unstructured.UnstructuredList, error) { +func (m *mockStore) List(apiOp *types.APIRequest, schema *types.APISchema) (*unstructured.UnstructuredList, []types.Warning, error) { m.called++ query, _ := url.ParseQuery(apiOp.Request.URL.RawQuery) l := query.Get("limit") if l == "" { - return m.contents, nil + return m.contents, nil, nil } i := 0 if c := query.Get("continue"); c != "" { @@ -1904,29 +1904,29 @@ func (m *mockStore) List(apiOp *types.APIRequest, schema *types.APISchema) (*uns contents.SetContinue(base64.StdEncoding.EncodeToString([]byte(contents.Items[i+lInt].GetName()))) } if i > len(contents.Items) { - return contents, nil + return contents, nil, nil } if i+lInt > len(contents.Items) { contents.Items = contents.Items[i:] - return contents, nil + return contents, nil, nil } contents.Items = contents.Items[i : i+lInt] - return contents, nil + return contents, nil, nil } -func (m *mockStore) ByID(apiOp *types.APIRequest, schema *types.APISchema, id string) (*unstructured.Unstructured, error) { +func (m *mockStore) ByID(apiOp *types.APIRequest, schema *types.APISchema, id string) (*unstructured.Unstructured, []types.Warning, error) { panic("not implemented") } -func (m *mockStore) Create(apiOp *types.APIRequest, schema *types.APISchema, data types.APIObject) (*unstructured.Unstructured, error) { +func (m *mockStore) Create(apiOp *types.APIRequest, schema *types.APISchema, data types.APIObject) (*unstructured.Unstructured, []types.Warning, error) { panic("not implemented") } -func (m *mockStore) Update(apiOp *types.APIRequest, schema *types.APISchema, data types.APIObject, id string) (*unstructured.Unstructured, error) { +func (m *mockStore) Update(apiOp *types.APIRequest, schema *types.APISchema, data types.APIObject, id string) (*unstructured.Unstructured, []types.Warning, error) { panic("not implemented") } -func (m *mockStore) Delete(apiOp *types.APIRequest, schema *types.APISchema, id string) (*unstructured.Unstructured, error) { +func (m *mockStore) Delete(apiOp *types.APIRequest, schema *types.APISchema, id string) (*unstructured.Unstructured, []types.Warning, error) { panic("not implemented") } @@ -1939,7 +1939,7 @@ type mockVersionedStore struct { versions []mockStore } -func (m *mockVersionedStore) List(apiOp *types.APIRequest, schema *types.APISchema) (*unstructured.UnstructuredList, error) { +func (m *mockVersionedStore) List(apiOp *types.APIRequest, schema *types.APISchema) (*unstructured.UnstructuredList, []types.Warning, error) { m.called++ query, _ := url.ParseQuery(apiOp.Request.URL.RawQuery) rv := len(m.versions) - 1 @@ -1949,7 +1949,7 @@ func (m *mockVersionedStore) List(apiOp *types.APIRequest, schema *types.APISche } l := query.Get("limit") if l == "" { - return m.versions[rv].contents, nil + return m.versions[rv].contents, nil, nil } i := 0 if c := query.Get("continue"); c != "" { @@ -1967,14 +1967,14 @@ func (m *mockVersionedStore) List(apiOp *types.APIRequest, schema *types.APISche contents.SetContinue(base64.StdEncoding.EncodeToString([]byte(contents.Items[i+lInt].GetName()))) } if i > len(contents.Items) { - return contents, nil + return contents, nil, nil } if i+lInt > len(contents.Items) { contents.Items = contents.Items[i:] - return contents, nil + return contents, nil, nil } contents.Items = contents.Items[i : i+lInt] - return contents, nil + return contents, nil, nil } type mockCache struct { diff --git a/pkg/stores/proxy/proxy_store.go b/pkg/stores/proxy/proxy_store.go index 21aeba69..fc5f5745 100644 --- a/pkg/stores/proxy/proxy_store.go +++ b/pkg/stores/proxy/proxy_store.go @@ -32,6 +32,7 @@ import ( "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/dynamic" "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" ) const watchTimeoutEnv = "CATTLE_WATCH_TIMEOUT_SECONDS" @@ -51,13 +52,28 @@ type ClientGetter interface { IsImpersonating() bool K8sInterface(ctx *types.APIRequest) (kubernetes.Interface, error) AdminK8sInterface() (kubernetes.Interface, error) - Client(ctx *types.APIRequest, schema *types.APISchema, namespace string) (dynamic.ResourceInterface, error) - DynamicClient(ctx *types.APIRequest) (dynamic.Interface, error) - AdminClient(ctx *types.APIRequest, schema *types.APISchema, namespace string) (dynamic.ResourceInterface, error) - TableClient(ctx *types.APIRequest, schema *types.APISchema, namespace string) (dynamic.ResourceInterface, error) - TableAdminClient(ctx *types.APIRequest, schema *types.APISchema, namespace string) (dynamic.ResourceInterface, error) - TableClientForWatch(ctx *types.APIRequest, schema *types.APISchema, namespace string) (dynamic.ResourceInterface, error) - TableAdminClientForWatch(ctx *types.APIRequest, schema *types.APISchema, namespace string) (dynamic.ResourceInterface, error) + Client(ctx *types.APIRequest, schema *types.APISchema, namespace string, warningHandler rest.WarningHandler) (dynamic.ResourceInterface, error) + DynamicClient(ctx *types.APIRequest, warningHandler rest.WarningHandler) (dynamic.Interface, error) + AdminClient(ctx *types.APIRequest, schema *types.APISchema, namespace string, warningHandler rest.WarningHandler) (dynamic.ResourceInterface, error) + TableClient(ctx *types.APIRequest, schema *types.APISchema, namespace string, warningHandler rest.WarningHandler) (dynamic.ResourceInterface, error) + TableAdminClient(ctx *types.APIRequest, schema *types.APISchema, namespace string, warningHandler rest.WarningHandler) (dynamic.ResourceInterface, error) + TableClientForWatch(ctx *types.APIRequest, schema *types.APISchema, namespace string, warningHandler rest.WarningHandler) (dynamic.ResourceInterface, error) + TableAdminClientForWatch(ctx *types.APIRequest, schema *types.APISchema, namespace string, warningHandler rest.WarningHandler) (dynamic.ResourceInterface, error) +} + +type warningBuffer struct { + Warnings []types.Warning +} + +func (w *warningBuffer) HandleWarningHeader(code int, agent string, text string) { + if w.Warnings == nil { + w.Warnings = []types.Warning{} + } + w.Warnings = append(w.Warnings, types.Warning{ + Code: code, + Agent: agent, + Text: text, + }) } // RelationshipNotifier is an interface for handling wrangler summary.Relationship events. @@ -90,7 +106,7 @@ func NewProxyStore(clientGetter ClientGetter, notifier RelationshipNotifier, loo } // ByID looks up a single object by its ID. -func (s *Store) ByID(apiOp *types.APIRequest, schema *types.APISchema, id string) (*unstructured.Unstructured, error) { +func (s *Store) ByID(apiOp *types.APIRequest, schema *types.APISchema, id string) (*unstructured.Unstructured, []types.Warning, error) { return s.byID(apiOp, schema, apiOp.Namespace, id) } @@ -98,20 +114,21 @@ func decodeParams(apiOp *types.APIRequest, target runtime.Object) error { return paramCodec.DecodeParameters(apiOp.Request.URL.Query(), metav1.SchemeGroupVersion, target) } -func (s *Store) byID(apiOp *types.APIRequest, schema *types.APISchema, namespace, id string) (*unstructured.Unstructured, error) { - k8sClient, err := metricsStore.Wrap(s.clientGetter.TableClient(apiOp, schema, namespace)) +func (s *Store) byID(apiOp *types.APIRequest, schema *types.APISchema, namespace, id string) (*unstructured.Unstructured, []types.Warning, error) { + buffer := &warningBuffer{} + k8sClient, err := metricsStore.Wrap(s.clientGetter.TableClient(apiOp, schema, namespace, buffer)) if err != nil { - return nil, err + return nil, nil, err } opts := metav1.GetOptions{} if err := decodeParams(apiOp, &opts); err != nil { - return nil, err + return nil, nil, err } obj, err := k8sClient.Get(apiOp, id, opts) rowToObject(obj) - return obj, err + return obj, buffer.Warnings, err } func moveFromUnderscore(obj map[string]interface{}) map[string]interface{} { @@ -185,21 +202,21 @@ func tableToObjects(obj map[string]interface{}) []unstructured.Unstructured { // to list *all* resources. // With this filter, the request can be performed successfully, and only the allowed resources will // be returned in the list. -func (s *Store) ByNames(apiOp *types.APIRequest, schema *types.APISchema, names sets.String) (*unstructured.UnstructuredList, error) { +func (s *Store) ByNames(apiOp *types.APIRequest, schema *types.APISchema, names sets.String) (*unstructured.UnstructuredList, []types.Warning, error) { if apiOp.Namespace == "*" { // This happens when you grant namespaced objects with "get" by name in a clusterrolebinding. We will treat // this as an invalid situation instead of listing all objects in the cluster and filtering by name. - return nil, nil + return nil, nil, nil } - - adminClient, err := s.clientGetter.TableAdminClient(apiOp, schema, apiOp.Namespace) + buffer := &warningBuffer{} + adminClient, err := s.clientGetter.TableAdminClient(apiOp, schema, apiOp.Namespace, buffer) if err != nil { - return nil, err + return nil, nil, err } objs, err := s.list(apiOp, schema, adminClient) if err != nil { - return nil, err + return nil, nil, err } var filtered []unstructured.Unstructured @@ -210,16 +227,18 @@ func (s *Store) ByNames(apiOp *types.APIRequest, schema *types.APISchema, names } objs.Items = filtered - return objs, nil + return objs, buffer.Warnings, nil } // List returns an unstructured list of resources. -func (s *Store) List(apiOp *types.APIRequest, schema *types.APISchema) (*unstructured.UnstructuredList, error) { - client, err := s.clientGetter.TableClient(apiOp, schema, apiOp.Namespace) +func (s *Store) List(apiOp *types.APIRequest, schema *types.APISchema) (*unstructured.UnstructuredList, []types.Warning, error) { + buffer := &warningBuffer{} + client, err := s.clientGetter.TableClient(apiOp, schema, apiOp.Namespace, buffer) if err != nil { - return nil, err + return nil, nil, err } - return s.list(apiOp, schema, client) + result, err := s.list(apiOp, schema, client) + return result, buffer.Warnings, err } func (s *Store) list(apiOp *types.APIRequest, schema *types.APISchema, client dynamic.ResourceInterface) (*unstructured.UnstructuredList, error) { @@ -288,7 +307,7 @@ func (s *Store) listAndWatch(apiOp *types.APIRequest, client dynamic.ResourceInt if s.notifier != nil { eg.Go(func() error { for rel := range s.notifier.OnInboundRelationshipChange(ctx, schema, apiOp.Namespace) { - obj, err := s.byID(apiOp, schema, rel.Namespace, rel.Name) + obj, _, err := s.byID(apiOp, schema, rel.Namespace, rel.Name) if err == nil { rowToObject(obj) result <- watch.Event{Type: watch.Modified, Object: obj} @@ -331,7 +350,8 @@ func (s *Store) listAndWatch(apiOp *types.APIRequest, client dynamic.ResourceInt // With this filter, the request can be performed successfully, and only the allowed resources will // be returned in watch. func (s *Store) WatchNames(apiOp *types.APIRequest, schema *types.APISchema, w types.WatchRequest, names sets.String) (chan watch.Event, error) { - adminClient, err := s.clientGetter.TableAdminClientForWatch(apiOp, schema, apiOp.Namespace) + buffer := &warningBuffer{} + adminClient, err := s.clientGetter.TableAdminClientForWatch(apiOp, schema, apiOp.Namespace, buffer) if err != nil { return nil, err } @@ -360,7 +380,8 @@ func (s *Store) WatchNames(apiOp *types.APIRequest, schema *types.APISchema, w t // Watch returns a channel of events for a list or resource. func (s *Store) Watch(apiOp *types.APIRequest, schema *types.APISchema, w types.WatchRequest) (chan watch.Event, error) { - client, err := s.clientGetter.TableClientForWatch(apiOp, schema, apiOp.Namespace) + buffer := &warningBuffer{} + client, err := s.clientGetter.TableClientForWatch(apiOp, schema, apiOp.Namespace, buffer) if err != nil { return nil, err } @@ -378,7 +399,7 @@ func (s *Store) watch(apiOp *types.APIRequest, schema *types.APISchema, w types. } // Create creates a single object in the store. -func (s *Store) Create(apiOp *types.APIRequest, schema *types.APISchema, params types.APIObject) (*unstructured.Unstructured, error) { +func (s *Store) Create(apiOp *types.APIRequest, schema *types.APISchema, params types.APIObject) (*unstructured.Unstructured, []types.Warning, error) { var ( resp *unstructured.Unstructured ) @@ -402,38 +423,40 @@ func (s *Store) Create(apiOp *types.APIRequest, schema *types.APISchema, params gvk := attributes.GVK(schema) input["apiVersion"], input["kind"] = gvk.ToAPIVersionAndKind() - k8sClient, err := metricsStore.Wrap(s.clientGetter.TableClient(apiOp, schema, ns)) + buffer := &warningBuffer{} + k8sClient, err := metricsStore.Wrap(s.clientGetter.TableClient(apiOp, schema, ns, buffer)) if err != nil { - return nil, err + return nil, nil, err } opts := metav1.CreateOptions{} if err := decodeParams(apiOp, &opts); err != nil { - return nil, err + return nil, nil, err } resp, err = k8sClient.Create(apiOp, &unstructured.Unstructured{Object: input}, opts) rowToObject(resp) - return resp, err + return resp, buffer.Warnings, err } // Update updates a single object in the store. -func (s *Store) Update(apiOp *types.APIRequest, schema *types.APISchema, params types.APIObject, id string) (*unstructured.Unstructured, error) { +func (s *Store) Update(apiOp *types.APIRequest, schema *types.APISchema, params types.APIObject, id string) (*unstructured.Unstructured, []types.Warning, error) { var ( err error input = params.Data() ) ns := types.Namespace(input) - k8sClient, err := metricsStore.Wrap(s.clientGetter.TableClient(apiOp, schema, ns)) + buffer := &warningBuffer{} + k8sClient, err := metricsStore.Wrap(s.clientGetter.TableClient(apiOp, schema, ns, buffer)) if err != nil { - return nil, err + return nil, nil, err } if apiOp.Method == http.MethodPatch { bytes, err := ioutil.ReadAll(io.LimitReader(apiOp.Request.Body, 2<<20)) if err != nil { - return nil, err + return nil, nil, err } pType := apitypes.StrategicMergePatchType @@ -443,70 +466,71 @@ func (s *Store) Update(apiOp *types.APIRequest, schema *types.APISchema, params opts := metav1.PatchOptions{} if err := decodeParams(apiOp, &opts); err != nil { - return nil, err + return nil, nil, err } if pType == apitypes.StrategicMergePatchType { data := map[string]interface{}{} if err := json.Unmarshal(bytes, &data); err != nil { - return nil, err + return nil, nil, err } data = moveFromUnderscore(data) bytes, err = json.Marshal(data) if err != nil { - return nil, err + return nil, nil, err } } resp, err := k8sClient.Patch(apiOp, id, pType, bytes, opts) if err != nil { - return nil, err + return nil, nil, err } - return resp, nil + return resp, buffer.Warnings, nil } resourceVersion := input.String("metadata", "resourceVersion") if resourceVersion == "" { - return nil, fmt.Errorf("metadata.resourceVersion is required for update") + return nil, nil, fmt.Errorf("metadata.resourceVersion is required for update") } opts := metav1.UpdateOptions{} if err := decodeParams(apiOp, &opts); err != nil { - return nil, err + return nil, nil, err } resp, err := k8sClient.Update(apiOp, &unstructured.Unstructured{Object: moveFromUnderscore(input)}, metav1.UpdateOptions{}) if err != nil { - return nil, err + return nil, nil, err } rowToObject(resp) - return resp, nil + return resp, buffer.Warnings, nil } // Delete deletes an object from a store. -func (s *Store) Delete(apiOp *types.APIRequest, schema *types.APISchema, id string) (*unstructured.Unstructured, error) { +func (s *Store) Delete(apiOp *types.APIRequest, schema *types.APISchema, id string) (*unstructured.Unstructured, []types.Warning, error) { opts := metav1.DeleteOptions{} if err := decodeParams(apiOp, &opts); err != nil { - return nil, nil + return nil, nil, nil } - k8sClient, err := metricsStore.Wrap(s.clientGetter.TableClient(apiOp, schema, apiOp.Namespace)) + buffer := &warningBuffer{} + k8sClient, err := metricsStore.Wrap(s.clientGetter.TableClient(apiOp, schema, apiOp.Namespace, buffer)) if err != nil { - return nil, err + return nil, nil, err } if err := k8sClient.Delete(apiOp, id, opts); err != nil { - return nil, err + return nil, nil, err } - obj, err := s.byID(apiOp, schema, apiOp.Namespace, id) + obj, _, err := s.byID(apiOp, schema, apiOp.Namespace, id) if err != nil { // ignore lookup error - return nil, validation.ErrorCode{ + return nil, nil, validation.ErrorCode{ Status: http.StatusNoContent, } } - return obj, nil + return obj, buffer.Warnings, nil } diff --git a/pkg/stores/proxy/rbac_store.go b/pkg/stores/proxy/rbac_store.go index f67089d7..8835f698 100644 --- a/pkg/stores/proxy/rbac_store.go +++ b/pkg/stores/proxy/rbac_store.go @@ -101,7 +101,7 @@ type byNameOrNamespaceStore struct { } // List returns a list of resources by partition. -func (b *byNameOrNamespaceStore) List(apiOp *types.APIRequest, schema *types.APISchema) (*unstructured.UnstructuredList, error) { +func (b *byNameOrNamespaceStore) List(apiOp *types.APIRequest, schema *types.APISchema) (*unstructured.UnstructuredList, []types.Warning, error) { if b.partition.Passthrough { return b.Store.List(apiOp, schema) }