Skip to content

Commit

Permalink
Initial implmentation of warning headers
Browse files Browse the repository at this point in the history
Attempts to pass through warning headers which k8s returns.
Requires an update to rancher/apiserver.
  • Loading branch information
MbolotSuse authored and crobby committed Dec 29, 2022
1 parent 7565dba commit a04cbb6
Show file tree
Hide file tree
Showing 9 changed files with 148 additions and 118 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ require (
github.com/pborman/uuid v1.2.0
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.12.1
github.com/rancher/apiserver v0.0.0-20221220225852-94cba4f28cfd
github.com/rancher/apiserver v0.0.0-20221229135954-26bed53611c4
github.com/rancher/dynamiclistener v0.3.5
github.com/rancher/kubernetes-provider-detector v0.1.2
github.com/rancher/norman v0.0.0-20221205184727-32ef2e185b99
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -502,8 +502,8 @@ github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1
github.com/prometheus/procfs v0.7.3 h1:4jVXhlkAyzOScmCkXBTOLRLTz8EeU+eyjrwB/EPq0VU=
github.com/prometheus/procfs v0.7.3/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU=
github.com/rancher/apiserver v0.0.0-20221220225852-94cba4f28cfd h1:g0hNrbONfmY4lxvrD2q9KkueYYY4wKUYscm6Ih0QfQ0=
github.com/rancher/apiserver v0.0.0-20221220225852-94cba4f28cfd/go.mod h1:xwQhXv3XFxWfA6tLa4ZeaERu8ldNbyKv2sF+mT+c5WA=
github.com/rancher/apiserver v0.0.0-20221229135954-26bed53611c4 h1:eOq/tiwMCzcwexrbUQ9Agd9PHhwwtH9/G4usE0MoN8s=
github.com/rancher/apiserver v0.0.0-20221229135954-26bed53611c4/go.mod h1:xwQhXv3XFxWfA6tLa4ZeaERu8ldNbyKv2sF+mT+c5WA=
github.com/rancher/client-go v1.25.4-rancher1 h1:9MlBC8QbgngUkhNzMR8rZmmCIj6WNRHFOnYiwC2Kty4=
github.com/rancher/client-go v1.25.4-rancher1/go.mod h1:8trHCAC83XKY0wsBIpbirZU4NTUpbuhc2JnI7OruGZw=
github.com/rancher/dynamiclistener v0.3.5 h1:5TaIHvkDGmZKvc96Huur16zfTKOiLhDtK4S+WV0JA6A=
Expand Down
51 changes: 26 additions & 25 deletions pkg/client/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,52 +111,52 @@ func (p *Factory) AdminK8sInterface() (kubernetes.Interface, error) {
return kubernetes.NewForConfig(p.clientCfg)
}

func (p *Factory) DynamicClient(ctx *types.APIRequest) (dynamic.Interface, error) {
return newDynamicClient(ctx, p.clientCfg, p.impersonate)
func (p *Factory) DynamicClient(ctx *types.APIRequest, warningHandler rest.WarningHandler) (dynamic.Interface, error) {
return newDynamicClient(ctx, p.clientCfg, p.impersonate, warningHandler)
}

func (p *Factory) Client(ctx *types.APIRequest, s *types.APISchema, namespace string) (dynamic.ResourceInterface, error) {
return newClient(ctx, p.clientCfg, s, namespace, p.impersonate)
func (p *Factory) Client(ctx *types.APIRequest, s *types.APISchema, namespace string, warningHandler rest.WarningHandler) (dynamic.ResourceInterface, error) {
return newClient(ctx, p.clientCfg, s, namespace, p.impersonate, warningHandler)
}

func (p *Factory) AdminClient(ctx *types.APIRequest, s *types.APISchema, namespace string) (dynamic.ResourceInterface, error) {
return newClient(ctx, p.clientCfg, s, namespace, false)
func (p *Factory) AdminClient(ctx *types.APIRequest, s *types.APISchema, namespace string, warningHandler rest.WarningHandler) (dynamic.ResourceInterface, error) {
return newClient(ctx, p.clientCfg, s, namespace, false, warningHandler)
}

func (p *Factory) ClientForWatch(ctx *types.APIRequest, s *types.APISchema, namespace string) (dynamic.ResourceInterface, error) {
return newClient(ctx, p.watchClientCfg, s, namespace, p.impersonate)
func (p *Factory) ClientForWatch(ctx *types.APIRequest, s *types.APISchema, namespace string, warningHandler rest.WarningHandler) (dynamic.ResourceInterface, error) {
return newClient(ctx, p.clientCfg, s, namespace, p.impersonate, warningHandler)
}

func (p *Factory) AdminClientForWatch(ctx *types.APIRequest, s *types.APISchema, namespace string) (dynamic.ResourceInterface, error) {
return newClient(ctx, p.watchClientCfg, s, namespace, false)
func (p *Factory) AdminClientForWatch(ctx *types.APIRequest, s *types.APISchema, namespace string, warningHandler rest.WarningHandler) (dynamic.ResourceInterface, error) {
return newClient(ctx, p.clientCfg, s, namespace, false, warningHandler)
}

func (p *Factory) TableClient(ctx *types.APIRequest, s *types.APISchema, namespace string) (dynamic.ResourceInterface, error) {
func (p *Factory) TableClient(ctx *types.APIRequest, s *types.APISchema, namespace string, warningHandler rest.WarningHandler) (dynamic.ResourceInterface, error) {
if attributes.Table(s) {
return newClient(ctx, p.tableClientCfg, s, namespace, p.impersonate)
return newClient(ctx, p.clientCfg, s, namespace, p.impersonate, warningHandler)
}
return p.Client(ctx, s, namespace)
return p.Client(ctx, s, namespace, warningHandler)
}

func (p *Factory) TableAdminClient(ctx *types.APIRequest, s *types.APISchema, namespace string) (dynamic.ResourceInterface, error) {
func (p *Factory) TableAdminClient(ctx *types.APIRequest, s *types.APISchema, namespace string, warningHandler rest.WarningHandler) (dynamic.ResourceInterface, error) {
if attributes.Table(s) {
return newClient(ctx, p.tableClientCfg, s, namespace, false)
return newClient(ctx, p.clientCfg, s, namespace, false, warningHandler)
}
return p.AdminClient(ctx, s, namespace)
return p.AdminClient(ctx, s, namespace, warningHandler)
}

func (p *Factory) TableClientForWatch(ctx *types.APIRequest, s *types.APISchema, namespace string) (dynamic.ResourceInterface, error) {
func (p *Factory) TableClientForWatch(ctx *types.APIRequest, s *types.APISchema, namespace string, warningHandler rest.WarningHandler) (dynamic.ResourceInterface, error) {
if attributes.Table(s) {
return newClient(ctx, p.tableWatchClientCfg, s, namespace, p.impersonate)
return newClient(ctx, p.clientCfg, s, namespace, p.impersonate, warningHandler)
}
return p.ClientForWatch(ctx, s, namespace)
return p.ClientForWatch(ctx, s, namespace, warningHandler)
}

func (p *Factory) TableAdminClientForWatch(ctx *types.APIRequest, s *types.APISchema, namespace string) (dynamic.ResourceInterface, error) {
func (p *Factory) TableAdminClientForWatch(ctx *types.APIRequest, s *types.APISchema, namespace string, warningHandler rest.WarningHandler) (dynamic.ResourceInterface, error) {
if attributes.Table(s) {
return newClient(ctx, p.tableWatchClientCfg, s, namespace, false)
return newClient(ctx, p.clientCfg, s, namespace, false, warningHandler)
}
return p.AdminClientForWatch(ctx, s, namespace)
return p.AdminClientForWatch(ctx, s, namespace, warningHandler)
}

func setupConfig(ctx *types.APIRequest, cfg *rest.Config, impersonate bool) (*rest.Config, error) {
Expand All @@ -173,17 +173,18 @@ func setupConfig(ctx *types.APIRequest, cfg *rest.Config, impersonate bool) (*re
return cfg, nil
}

func newDynamicClient(ctx *types.APIRequest, cfg *rest.Config, impersonate bool) (dynamic.Interface, error) {
func newDynamicClient(ctx *types.APIRequest, cfg *rest.Config, impersonate bool, warningHandler rest.WarningHandler) (dynamic.Interface, error) {
cfg, err := setupConfig(ctx, cfg, impersonate)
cfg.WarningHandler = warningHandler
if err != nil {
return nil, err
}

return dynamic.NewForConfig(cfg)
}

func newClient(ctx *types.APIRequest, cfg *rest.Config, s *types.APISchema, namespace string, impersonate bool) (dynamic.ResourceInterface, error) {
client, err := newDynamicClient(ctx, cfg, impersonate)
func newClient(ctx *types.APIRequest, cfg *rest.Config, s *types.APISchema, namespace string, impersonate bool, warningHandler rest.WarningHandler) (dynamic.ResourceInterface, error) {
client, err := newDynamicClient(ctx, cfg, impersonate, warningHandler)
if err != nil {
return nil, err
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/resources/cluster/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/rest"
)

type Apply struct {
Expand Down Expand Up @@ -110,7 +111,8 @@ func (a *Apply) createApply(apiContext *types.APIRequest) (apply.Apply, error) {
}

apply := apply.New(client.Discovery(), func(gvr schema.GroupVersionResource) (dynamic.NamespaceableResourceInterface, error) {
dynamicClient, err := a.cg.DynamicClient(apiContext)
// don't record warnings from apply
dynamicClient, err := a.cg.DynamicClient(apiContext, rest.NoWarnings{})
if err != nil {
return nil, err
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/stores/partition/parallel.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"encoding/base64"
"encoding/json"

"github.com/rancher/apiserver/pkg/types"

"golang.org/x/sync/errgroup"
"golang.org/x/sync/semaphore"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
Expand Down Expand Up @@ -33,7 +35,7 @@ type ParallelPartitionLister struct {
}

// PartitionLister lists objects for one partition.
type PartitionLister func(ctx context.Context, partition Partition, cont string, revision string, limit int) (*unstructured.UnstructuredList, error)
type PartitionLister func(ctx context.Context, partition Partition, cont string, revision string, limit int) (*unstructured.UnstructuredList, []types.Warning, error)

// Err returns the latest error encountered.
func (p *ParallelPartitionLister) Err() error {
Expand Down Expand Up @@ -174,7 +176,7 @@ func (p *ParallelPartitionLister) feeder(ctx context.Context, state listState, l
if partition.Name() == state.PartitionName {
cont = state.Continue
}
list, err := p.Lister(ctx, partition, cont, state.Revision, limit)
list, _, err := p.Lister(ctx, partition, cont, state.Revision, limit)
if err != nil {
return err
}
Expand Down
39 changes: 20 additions & 19 deletions pkg/stores/partition/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,11 @@ type cacheKey struct {

// UnstructuredStore is like types.Store but deals in k8s unstructured objects instead of apiserver types.
type UnstructuredStore interface {
ByID(apiOp *types.APIRequest, schema *types.APISchema, id string) (*unstructured.Unstructured, error)
List(apiOp *types.APIRequest, schema *types.APISchema) (*unstructured.UnstructuredList, error)
Create(apiOp *types.APIRequest, schema *types.APISchema, data types.APIObject) (*unstructured.Unstructured, error)
Update(apiOp *types.APIRequest, schema *types.APISchema, data types.APIObject, id string) (*unstructured.Unstructured, error)
Delete(apiOp *types.APIRequest, schema *types.APISchema, id string) (*unstructured.Unstructured, error)
ByID(apiOp *types.APIRequest, schema *types.APISchema, id string) (*unstructured.Unstructured, []types.Warning, error)
List(apiOp *types.APIRequest, schema *types.APISchema) (*unstructured.UnstructuredList, []types.Warning, error)
Create(apiOp *types.APIRequest, schema *types.APISchema, data types.APIObject) (*unstructured.Unstructured, []types.Warning, error)
Update(apiOp *types.APIRequest, schema *types.APISchema, data types.APIObject, id string) (*unstructured.Unstructured, []types.Warning, error)
Delete(apiOp *types.APIRequest, schema *types.APISchema, id string) (*unstructured.Unstructured, []types.Warning, error)
Watch(apiOp *types.APIRequest, schema *types.APISchema, w types.WatchRequest) (chan watch.Event, error)
}

Expand All @@ -103,11 +103,11 @@ func (s *Store) Delete(apiOp *types.APIRequest, schema *types.APISchema, id stri
return types.APIObject{}, err
}

obj, err := target.Delete(apiOp, schema, id)
obj, warnings, err := target.Delete(apiOp, schema, id)
if err != nil {
return types.APIObject{}, err
}
return toAPI(schema, obj), nil
return toAPI(schema, obj, warnings), nil
}

// ByID looks up a single object by its ID.
Expand All @@ -117,18 +117,18 @@ func (s *Store) ByID(apiOp *types.APIRequest, schema *types.APISchema, id string
return types.APIObject{}, err
}

obj, err := target.ByID(apiOp, schema, id)
obj, warnings, err := target.ByID(apiOp, schema, id)
if err != nil {
return types.APIObject{}, err
}
return toAPI(schema, obj), nil
return toAPI(schema, obj, warnings), nil
}

func (s *Store) listPartition(ctx context.Context, apiOp *types.APIRequest, schema *types.APISchema, partition Partition,
cont string, revision string, limit int) (*unstructured.UnstructuredList, error) {
cont string, revision string, limit int) (*unstructured.UnstructuredList, []types.Warning, error) {
store, err := s.Partitioner.Store(apiOp, partition)
if err != nil {
return nil, err
return nil, nil, err
}

req := apiOp.Clone()
Expand Down Expand Up @@ -163,7 +163,7 @@ func (s *Store) List(apiOp *types.APIRequest, schema *types.APISchema) (types.AP
}

lister := ParallelPartitionLister{
Lister: func(ctx context.Context, partition Partition, cont string, revision string, limit int) (*unstructured.UnstructuredList, error) {
Lister: func(ctx context.Context, partition Partition, cont string, revision string, limit int) (*unstructured.UnstructuredList, []types.Warning, error) {
return s.listPartition(ctx, apiOp, schema, partition, cont, revision, limit)
},
Concurrency: 3,
Expand Down Expand Up @@ -217,7 +217,7 @@ func (s *Store) List(apiOp *types.APIRequest, schema *types.APISchema) (types.AP

for _, item := range list {
item := item
result.Objects = append(result.Objects, toAPI(schema, &item))
result.Objects = append(result.Objects, toAPI(schema, &item, nil))
}

result.Revision = key.revision
Expand Down Expand Up @@ -254,11 +254,11 @@ func (s *Store) Create(apiOp *types.APIRequest, schema *types.APISchema, data ty
return types.APIObject{}, err
}

obj, err := target.Create(apiOp, schema, data)
obj, warnings, err := target.Create(apiOp, schema, data)
if err != nil {
return types.APIObject{}, err
}
return toAPI(schema, obj), nil
return toAPI(schema, obj, warnings), nil
}

// Update updates a single object in the store.
Expand All @@ -268,11 +268,11 @@ func (s *Store) Update(apiOp *types.APIRequest, schema *types.APISchema, data ty
return types.APIObject{}, err
}

obj, err := target.Update(apiOp, schema, data, id)
obj, warnings, err := target.Update(apiOp, schema, data, id)
if err != nil {
return types.APIObject{}, err
}
return toAPI(schema, obj), nil
return toAPI(schema, obj, warnings), nil
}

// Watch returns a channel of events for a list or resource.
Expand Down Expand Up @@ -318,7 +318,7 @@ func (s *Store) Watch(apiOp *types.APIRequest, schema *types.APISchema, wr types
return response, nil
}

func toAPI(schema *types.APISchema, obj runtime.Object) types.APIObject {
func toAPI(schema *types.APISchema, obj runtime.Object, warnings []types.Warning) types.APIObject {
if obj == nil || reflect.ValueOf(obj).IsNil() {
return types.APIObject{}
}
Expand All @@ -344,6 +344,7 @@ func toAPI(schema *types.APISchema, obj runtime.Object) types.APIObject {
}

apiObject.ID = id
apiObject.Warnings = warnings
return apiObject
}

Expand Down Expand Up @@ -384,7 +385,7 @@ func toAPIEvent(apiOp *types.APIRequest, schema *types.APISchema, event watch.Ev
return apiEvent
}

apiEvent.Object = toAPI(schema, event.Object)
apiEvent.Object = toAPI(schema, event.Object, nil)

m, err := meta.Accessor(event.Object)
if err != nil {
Expand Down
28 changes: 14 additions & 14 deletions pkg/stores/partition/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1881,12 +1881,12 @@ type mockStore struct {
called int
}

func (m *mockStore) List(apiOp *types.APIRequest, schema *types.APISchema) (*unstructured.UnstructuredList, error) {
func (m *mockStore) List(apiOp *types.APIRequest, schema *types.APISchema) (*unstructured.UnstructuredList, []types.Warning, error) {
m.called++
query, _ := url.ParseQuery(apiOp.Request.URL.RawQuery)
l := query.Get("limit")
if l == "" {
return m.contents, nil
return m.contents, nil, nil
}
i := 0
if c := query.Get("continue"); c != "" {
Expand All @@ -1904,29 +1904,29 @@ func (m *mockStore) List(apiOp *types.APIRequest, schema *types.APISchema) (*uns
contents.SetContinue(base64.StdEncoding.EncodeToString([]byte(contents.Items[i+lInt].GetName())))
}
if i > len(contents.Items) {
return contents, nil
return contents, nil, nil
}
if i+lInt > len(contents.Items) {
contents.Items = contents.Items[i:]
return contents, nil
return contents, nil, nil
}
contents.Items = contents.Items[i : i+lInt]
return contents, nil
return contents, nil, nil
}

func (m *mockStore) ByID(apiOp *types.APIRequest, schema *types.APISchema, id string) (*unstructured.Unstructured, error) {
func (m *mockStore) ByID(apiOp *types.APIRequest, schema *types.APISchema, id string) (*unstructured.Unstructured, []types.Warning, error) {
panic("not implemented")
}

func (m *mockStore) Create(apiOp *types.APIRequest, schema *types.APISchema, data types.APIObject) (*unstructured.Unstructured, error) {
func (m *mockStore) Create(apiOp *types.APIRequest, schema *types.APISchema, data types.APIObject) (*unstructured.Unstructured, []types.Warning, error) {
panic("not implemented")
}

func (m *mockStore) Update(apiOp *types.APIRequest, schema *types.APISchema, data types.APIObject, id string) (*unstructured.Unstructured, error) {
func (m *mockStore) Update(apiOp *types.APIRequest, schema *types.APISchema, data types.APIObject, id string) (*unstructured.Unstructured, []types.Warning, error) {
panic("not implemented")
}

func (m *mockStore) Delete(apiOp *types.APIRequest, schema *types.APISchema, id string) (*unstructured.Unstructured, error) {
func (m *mockStore) Delete(apiOp *types.APIRequest, schema *types.APISchema, id string) (*unstructured.Unstructured, []types.Warning, error) {
panic("not implemented")
}

Expand All @@ -1939,7 +1939,7 @@ type mockVersionedStore struct {
versions []mockStore
}

func (m *mockVersionedStore) List(apiOp *types.APIRequest, schema *types.APISchema) (*unstructured.UnstructuredList, error) {
func (m *mockVersionedStore) List(apiOp *types.APIRequest, schema *types.APISchema) (*unstructured.UnstructuredList, []types.Warning, error) {
m.called++
query, _ := url.ParseQuery(apiOp.Request.URL.RawQuery)
rv := len(m.versions) - 1
Expand All @@ -1949,7 +1949,7 @@ func (m *mockVersionedStore) List(apiOp *types.APIRequest, schema *types.APISche
}
l := query.Get("limit")
if l == "" {
return m.versions[rv].contents, nil
return m.versions[rv].contents, nil, nil
}
i := 0
if c := query.Get("continue"); c != "" {
Expand All @@ -1967,14 +1967,14 @@ func (m *mockVersionedStore) List(apiOp *types.APIRequest, schema *types.APISche
contents.SetContinue(base64.StdEncoding.EncodeToString([]byte(contents.Items[i+lInt].GetName())))
}
if i > len(contents.Items) {
return contents, nil
return contents, nil, nil
}
if i+lInt > len(contents.Items) {
contents.Items = contents.Items[i:]
return contents, nil
return contents, nil, nil
}
contents.Items = contents.Items[i : i+lInt]
return contents, nil
return contents, nil, nil
}

type mockCache struct {
Expand Down
Loading

0 comments on commit a04cbb6

Please sign in to comment.