Skip to content

Commit

Permalink
Initial implmentation of warning headers
Browse files Browse the repository at this point in the history
Attempts to pass through warning headers which k8s returns.
Requires an update to rancher/apiserver.
  • Loading branch information
MbolotSuse authored and crobby committed Dec 19, 2022
1 parent 48efe7f commit 5679bad
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 59 deletions.
62 changes: 40 additions & 22 deletions pkg/client/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,52 +111,70 @@ func (p *Factory) AdminK8sInterface() (kubernetes.Interface, error) {
return kubernetes.NewForConfig(p.clientCfg)
}

func (p *Factory) DynamicClient(ctx *types.APIRequest) (dynamic.Interface, error) {
return newDynamicClient(ctx, p.clientCfg, p.impersonate)
func (p *Factory) DynamicClient(ctx *types.APIRequest, warningHandler rest.WarningHandler) (dynamic.Interface, error) {
config := *p.clientCfg
config.WarningHandler = warningHandler
return newDynamicClient(ctx, &config, p.impersonate)
}

func (p *Factory) Client(ctx *types.APIRequest, s *types.APISchema, namespace string) (dynamic.ResourceInterface, error) {
return newClient(ctx, p.clientCfg, s, namespace, p.impersonate)
func (p *Factory) Client(ctx *types.APIRequest, s *types.APISchema, namespace string, warningHandler rest.WarningHandler) (dynamic.ResourceInterface, error) {
config := *p.clientCfg
config.WarningHandler = warningHandler
return newClient(ctx, &config, s, namespace, p.impersonate)
}

func (p *Factory) AdminClient(ctx *types.APIRequest, s *types.APISchema, namespace string) (dynamic.ResourceInterface, error) {
return newClient(ctx, p.clientCfg, s, namespace, false)
func (p *Factory) AdminClient(ctx *types.APIRequest, s *types.APISchema, namespace string, warningHandler rest.WarningHandler) (dynamic.ResourceInterface, error) {
config := *p.clientCfg
config.WarningHandler = warningHandler
return newClient(ctx, &config, s, namespace, false)
}

func (p *Factory) ClientForWatch(ctx *types.APIRequest, s *types.APISchema, namespace string) (dynamic.ResourceInterface, error) {
return newClient(ctx, p.watchClientCfg, s, namespace, p.impersonate)
func (p *Factory) ClientForWatch(ctx *types.APIRequest, s *types.APISchema, namespace string, warningHandler rest.WarningHandler) (dynamic.ResourceInterface, error) {
config := *p.watchClientCfg
config.WarningHandler = warningHandler
return newClient(ctx, &config, s, namespace, p.impersonate)
}

func (p *Factory) AdminClientForWatch(ctx *types.APIRequest, s *types.APISchema, namespace string) (dynamic.ResourceInterface, error) {
return newClient(ctx, p.watchClientCfg, s, namespace, false)
func (p *Factory) AdminClientForWatch(ctx *types.APIRequest, s *types.APISchema, namespace string, warningHandler rest.WarningHandler) (dynamic.ResourceInterface, error) {
config := *p.watchClientCfg
config.WarningHandler = warningHandler
return newClient(ctx, &config, s, namespace, false)
}

func (p *Factory) TableClient(ctx *types.APIRequest, s *types.APISchema, namespace string) (dynamic.ResourceInterface, error) {
func (p *Factory) TableClient(ctx *types.APIRequest, s *types.APISchema, namespace string, warningHandler rest.WarningHandler) (dynamic.ResourceInterface, error) {
if attributes.Table(s) {
return newClient(ctx, p.tableClientCfg, s, namespace, p.impersonate)
config := *p.tableClientCfg
config.WarningHandler = warningHandler
return newClient(ctx, &config, s, namespace, p.impersonate)
}
return p.Client(ctx, s, namespace)
return p.Client(ctx, s, namespace, warningHandler)
}

func (p *Factory) TableAdminClient(ctx *types.APIRequest, s *types.APISchema, namespace string) (dynamic.ResourceInterface, error) {
func (p *Factory) TableAdminClient(ctx *types.APIRequest, s *types.APISchema, namespace string, warningHandler rest.WarningHandler) (dynamic.ResourceInterface, error) {
if attributes.Table(s) {
return newClient(ctx, p.tableClientCfg, s, namespace, false)
config := *p.tableClientCfg
config.WarningHandler = warningHandler
return newClient(ctx, &config, s, namespace, false)
}
return p.AdminClient(ctx, s, namespace)
return p.AdminClient(ctx, s, namespace, warningHandler)
}

func (p *Factory) TableClientForWatch(ctx *types.APIRequest, s *types.APISchema, namespace string) (dynamic.ResourceInterface, error) {
func (p *Factory) TableClientForWatch(ctx *types.APIRequest, s *types.APISchema, namespace string, warningHandler rest.WarningHandler) (dynamic.ResourceInterface, error) {
if attributes.Table(s) {
return newClient(ctx, p.tableWatchClientCfg, s, namespace, p.impersonate)
config := *p.tableWatchClientCfg
config.WarningHandler = warningHandler
return newClient(ctx, &config, s, namespace, p.impersonate)
}
return p.ClientForWatch(ctx, s, namespace)
return p.ClientForWatch(ctx, s, namespace, warningHandler)
}

func (p *Factory) TableAdminClientForWatch(ctx *types.APIRequest, s *types.APISchema, namespace string) (dynamic.ResourceInterface, error) {
func (p *Factory) TableAdminClientForWatch(ctx *types.APIRequest, s *types.APISchema, namespace string, warningHandler rest.WarningHandler) (dynamic.ResourceInterface, error) {
if attributes.Table(s) {
return newClient(ctx, p.tableWatchClientCfg, s, namespace, false)
config := *p.tableWatchClientCfg
config.WarningHandler = warningHandler
return newClient(ctx, &config, s, namespace, false)
}
return p.AdminClientForWatch(ctx, s, namespace)
return p.AdminClientForWatch(ctx, s, namespace, warningHandler)
}

func setupConfig(ctx *types.APIRequest, cfg *rest.Config, impersonate bool) (*rest.Config, error) {
Expand Down
4 changes: 3 additions & 1 deletion pkg/resources/cluster/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/rest"
)

type Apply struct {
Expand Down Expand Up @@ -110,7 +111,8 @@ func (a *Apply) createApply(apiContext *types.APIRequest) (apply.Apply, error) {
}

apply := apply.New(client.Discovery(), func(gvr schema.GroupVersionResource) (dynamic.NamespaceableResourceInterface, error) {
dynamicClient, err := a.cg.DynamicClient(apiContext)
// don't record warnings from apply
dynamicClient, err := a.cg.DynamicClient(apiContext, rest.NoWarnings{})
if err != nil {
return nil, err
}
Expand Down
97 changes: 61 additions & 36 deletions pkg/stores/proxy/proxy_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
)

const watchTimeoutEnv = "CATTLE_WATCH_TIMEOUT_SECONDS"
Expand All @@ -51,13 +52,28 @@ type ClientGetter interface {
IsImpersonating() bool
K8sInterface(ctx *types.APIRequest) (kubernetes.Interface, error)
AdminK8sInterface() (kubernetes.Interface, error)
Client(ctx *types.APIRequest, schema *types.APISchema, namespace string) (dynamic.ResourceInterface, error)
DynamicClient(ctx *types.APIRequest) (dynamic.Interface, error)
AdminClient(ctx *types.APIRequest, schema *types.APISchema, namespace string) (dynamic.ResourceInterface, error)
TableClient(ctx *types.APIRequest, schema *types.APISchema, namespace string) (dynamic.ResourceInterface, error)
TableAdminClient(ctx *types.APIRequest, schema *types.APISchema, namespace string) (dynamic.ResourceInterface, error)
TableClientForWatch(ctx *types.APIRequest, schema *types.APISchema, namespace string) (dynamic.ResourceInterface, error)
TableAdminClientForWatch(ctx *types.APIRequest, schema *types.APISchema, namespace string) (dynamic.ResourceInterface, error)
Client(ctx *types.APIRequest, schema *types.APISchema, namespace string, warningHandler rest.WarningHandler) (dynamic.ResourceInterface, error)
DynamicClient(ctx *types.APIRequest, warningHandler rest.WarningHandler) (dynamic.Interface, error)
AdminClient(ctx *types.APIRequest, schema *types.APISchema, namespace string, warningHandler rest.WarningHandler) (dynamic.ResourceInterface, error)
TableClient(ctx *types.APIRequest, schema *types.APISchema, namespace string, warningHandler rest.WarningHandler) (dynamic.ResourceInterface, error)
TableAdminClient(ctx *types.APIRequest, schema *types.APISchema, namespace string, warningHandler rest.WarningHandler) (dynamic.ResourceInterface, error)
TableClientForWatch(ctx *types.APIRequest, schema *types.APISchema, namespace string, warningHandler rest.WarningHandler) (dynamic.ResourceInterface, error)
TableAdminClientForWatch(ctx *types.APIRequest, schema *types.APISchema, namespace string, warningHandler rest.WarningHandler) (dynamic.ResourceInterface, error)
}

type WarningBuffer struct {
Warnings []types.Warning
}

func (w *WarningBuffer) HandleWarningHeader(code int, agent string, text string) {
if w.Warnings == nil {
w.Warnings = []types.Warning{}
}
w.Warnings = append(w.Warnings, types.Warning{
Code: code,
Agent: agent,
Text: text,
})
}

// RelationshipNotifier is an interface for handling wrangler summary.Relationship events.
Expand Down Expand Up @@ -90,15 +106,15 @@ func NewProxyStore(clientGetter ClientGetter, notifier RelationshipNotifier, loo

// ByID looks up a single object by its ID.
func (s *Store) ByID(apiOp *types.APIRequest, schema *types.APISchema, id string) (types.APIObject, error) {
result, err := s.byID(apiOp, schema, apiOp.Namespace, id)
return toAPI(schema, result), err
result, warnings, err := s.byID(apiOp, schema, apiOp.Namespace, id)
return toAPI(schema, result, warnings), err
}

func decodeParams(apiOp *types.APIRequest, target runtime.Object) error {
return paramCodec.DecodeParameters(apiOp.Request.URL.Query(), metav1.SchemeGroupVersion, target)
}

func toAPI(schema *types.APISchema, obj runtime.Object) types.APIObject {
func toAPI(schema *types.APISchema, obj runtime.Object, warnings []types.Warning) types.APIObject {
if obj == nil || reflect.ValueOf(obj).IsNil() {
return types.APIObject{}
}
Expand All @@ -124,23 +140,25 @@ func toAPI(schema *types.APISchema, obj runtime.Object) types.APIObject {
}

apiObject.ID = id
apiObject.Warnings = warnings
return apiObject
}

func (s *Store) byID(apiOp *types.APIRequest, schema *types.APISchema, namespace, id string) (*unstructured.Unstructured, error) {
k8sClient, err := metricsStore.Wrap(s.clientGetter.TableClient(apiOp, schema, namespace))
func (s *Store) byID(apiOp *types.APIRequest, schema *types.APISchema, namespace, id string) (*unstructured.Unstructured, []types.Warning, error) {
buffer := &WarningBuffer{}
k8sClient, err := metricsStore.Wrap(s.clientGetter.TableClient(apiOp, schema, namespace, buffer))
if err != nil {
return nil, err
return nil, nil, err
}

opts := metav1.GetOptions{}
if err := decodeParams(apiOp, &opts); err != nil {
return nil, err
return nil, nil, err
}

obj, err := k8sClient.Get(apiOp, id, opts)
rowToObject(obj)
return obj, err
return obj, buffer.Warnings, err
}

func moveFromUnderscore(obj map[string]interface{}) map[string]interface{} {
Expand Down Expand Up @@ -237,12 +255,13 @@ func (s *Store) ByNames(apiOp *types.APIRequest, schema *types.APISchema, names
return types.APIObjectList{}, nil
}

adminClient, err := s.clientGetter.TableAdminClient(apiOp, schema, apiOp.Namespace)
buffer := &WarningBuffer{}
adminClient, err := s.clientGetter.TableAdminClient(apiOp, schema, apiOp.Namespace, buffer)
if err != nil {
return types.APIObjectList{}, err
}

objs, err := s.list(apiOp, schema, adminClient)
objs, err := s.list(apiOp, schema, adminClient, buffer)
if err != nil {
return types.APIObjectList{}, err
}
Expand All @@ -260,14 +279,15 @@ func (s *Store) ByNames(apiOp *types.APIRequest, schema *types.APISchema, names

// List returns a list of resources.
func (s *Store) List(apiOp *types.APIRequest, schema *types.APISchema) (types.APIObjectList, error) {
client, err := s.clientGetter.TableClient(apiOp, schema, apiOp.Namespace)
buffer := &WarningBuffer{}
client, err := s.clientGetter.TableClient(apiOp, schema, apiOp.Namespace, buffer)
if err != nil {
return types.APIObjectList{}, err
}
return s.list(apiOp, schema, client)
return s.list(apiOp, schema, client, buffer)
}

func (s *Store) list(apiOp *types.APIRequest, schema *types.APISchema, client dynamic.ResourceInterface) (types.APIObjectList, error) {
func (s *Store) list(apiOp *types.APIRequest, schema *types.APISchema, client dynamic.ResourceInterface, buffer *WarningBuffer) (types.APIObjectList, error) {
opts := metav1.ListOptions{}
if err := decodeParams(apiOp, &opts); err != nil {
return types.APIObjectList{}, nil
Expand All @@ -287,7 +307,7 @@ func (s *Store) list(apiOp *types.APIRequest, schema *types.APISchema, client dy
}

for i := range resultList.Items {
result.Objects = append(result.Objects, toAPI(schema, &resultList.Items[i]))
result.Objects = append(result.Objects, toAPI(schema, &resultList.Items[i], buffer.Warnings))
}

return result, nil
Expand Down Expand Up @@ -340,9 +360,9 @@ func (s *Store) listAndWatch(apiOp *types.APIRequest, client dynamic.ResourceInt
if s.notifier != nil {
eg.Go(func() error {
for rel := range s.notifier.OnInboundRelationshipChange(ctx, schema, apiOp.Namespace) {
obj, err := s.byID(apiOp, schema, rel.Namespace, rel.Name)
obj, warnings, err := s.byID(apiOp, schema, rel.Namespace, rel.Name)
if err == nil {
result <- s.toAPIEvent(apiOp, schema, watch.Modified, obj)
result <- s.toAPIEvent(apiOp, schema, watch.Modified, obj, warnings)
} else {
logrus.Debugf("notifier watch error: %v", err)
returnErr(errors.Wrapf(err, "notifier watch error: %v", err), result)
Expand All @@ -363,7 +383,7 @@ func (s *Store) listAndWatch(apiOp *types.APIRequest, client dynamic.ResourceInt
}
continue
}
result <- s.toAPIEvent(apiOp, schema, event.Type, event.Object)
result <- s.toAPIEvent(apiOp, schema, event.Type, event.Object, nil)
}
return fmt.Errorf("closed")
})
Expand All @@ -379,7 +399,8 @@ func (s *Store) listAndWatch(apiOp *types.APIRequest, client dynamic.ResourceInt
// With this filter, the request can be performed successfully, and only the allowed resources will
// be returned in watch.
func (s *Store) WatchNames(apiOp *types.APIRequest, schema *types.APISchema, w types.WatchRequest, names sets.String) (chan types.APIEvent, error) {
adminClient, err := s.clientGetter.TableAdminClientForWatch(apiOp, schema, apiOp.Namespace)
buffer := &WarningBuffer{}
adminClient, err := s.clientGetter.TableAdminClientForWatch(apiOp, schema, apiOp.Namespace, buffer)
if err != nil {
return nil, err
}
Expand All @@ -403,7 +424,8 @@ func (s *Store) WatchNames(apiOp *types.APIRequest, schema *types.APISchema, w t

// Watch returns a channel of events for a list or resource.
func (s *Store) Watch(apiOp *types.APIRequest, schema *types.APISchema, w types.WatchRequest) (chan types.APIEvent, error) {
client, err := s.clientGetter.TableClientForWatch(apiOp, schema, apiOp.Namespace)
buffer := &WarningBuffer{}
client, err := s.clientGetter.TableClientForWatch(apiOp, schema, apiOp.Namespace, buffer)
if err != nil {
return nil, err
}
Expand All @@ -420,7 +442,7 @@ func (s *Store) watch(apiOp *types.APIRequest, schema *types.APISchema, w types.
return result, nil
}

func (s *Store) toAPIEvent(apiOp *types.APIRequest, schema *types.APISchema, et watch.EventType, obj runtime.Object) types.APIEvent {
func (s *Store) toAPIEvent(apiOp *types.APIRequest, schema *types.APISchema, et watch.EventType, obj runtime.Object, warnings []types.Warning) types.APIEvent {
name := types.ChangeAPIEvent
switch et {
case watch.Deleted:
Expand All @@ -435,7 +457,7 @@ func (s *Store) toAPIEvent(apiOp *types.APIRequest, schema *types.APISchema, et

event := types.APIEvent{
Name: name,
Object: toAPI(schema, obj),
Object: toAPI(schema, obj, warnings),
}

m, err := meta.Accessor(obj)
Expand Down Expand Up @@ -472,7 +494,8 @@ func (s *Store) Create(apiOp *types.APIRequest, schema *types.APISchema, params
gvk := attributes.GVK(schema)
input["apiVersion"], input["kind"] = gvk.ToAPIVersionAndKind()

k8sClient, err := metricsStore.Wrap(s.clientGetter.TableClient(apiOp, schema, ns))
buffer := &WarningBuffer{}
k8sClient, err := metricsStore.Wrap(s.clientGetter.TableClient(apiOp, schema, ns, buffer))
if err != nil {
return types.APIObject{}, err
}
Expand All @@ -484,7 +507,7 @@ func (s *Store) Create(apiOp *types.APIRequest, schema *types.APISchema, params

resp, err = k8sClient.Create(apiOp, &unstructured.Unstructured{Object: input}, opts)
rowToObject(resp)
apiObject := toAPI(schema, resp)
apiObject := toAPI(schema, resp, buffer.Warnings)
return apiObject, err
}

Expand All @@ -496,7 +519,8 @@ func (s *Store) Update(apiOp *types.APIRequest, schema *types.APISchema, params
)

ns := types.Namespace(input)
k8sClient, err := metricsStore.Wrap(s.clientGetter.TableClient(apiOp, schema, ns))
buffer := &WarningBuffer{}
k8sClient, err := metricsStore.Wrap(s.clientGetter.TableClient(apiOp, schema, ns, buffer))
if err != nil {
return types.APIObject{}, err
}
Expand Down Expand Up @@ -534,7 +558,7 @@ func (s *Store) Update(apiOp *types.APIRequest, schema *types.APISchema, params
return types.APIObject{}, err
}

return toAPI(schema, resp), nil
return toAPI(schema, resp, buffer.Warnings), nil
}

resourceVersion := input.String("metadata", "resourceVersion")
Expand All @@ -553,7 +577,7 @@ func (s *Store) Update(apiOp *types.APIRequest, schema *types.APISchema, params
}

rowToObject(resp)
return toAPI(schema, resp), nil
return toAPI(schema, resp, buffer.Warnings), nil
}

// Delete deletes an object from a store.
Expand All @@ -563,7 +587,8 @@ func (s *Store) Delete(apiOp *types.APIRequest, schema *types.APISchema, id stri
return types.APIObject{}, nil
}

k8sClient, err := metricsStore.Wrap(s.clientGetter.TableClient(apiOp, schema, apiOp.Namespace))
buffer := &WarningBuffer{}
k8sClient, err := metricsStore.Wrap(s.clientGetter.TableClient(apiOp, schema, apiOp.Namespace, buffer))
if err != nil {
return types.APIObject{}, err
}
Expand All @@ -572,12 +597,12 @@ func (s *Store) Delete(apiOp *types.APIRequest, schema *types.APISchema, id stri
return types.APIObject{}, err
}

obj, err := s.byID(apiOp, schema, apiOp.Namespace, id)
obj, warnings, err := s.byID(apiOp, schema, apiOp.Namespace, id)
if err != nil {
// ignore lookup error
return types.APIObject{}, validation.ErrorCode{
Status: http.StatusNoContent,
}
}
return toAPI(schema, obj), nil
return toAPI(schema, obj, warnings), nil
}

0 comments on commit 5679bad

Please sign in to comment.