diff --git a/pkg/stores/proxy/proxy_store_test.go b/pkg/stores/proxy/proxy_store_test.go new file mode 100644 index 000000000..1d498736e --- /dev/null +++ b/pkg/stores/proxy/proxy_store_test.go @@ -0,0 +1,87 @@ +package proxy + +import ( + "net/http" + "testing" + "time" + + "github.com/pkg/errors" + "github.com/rancher/apiserver/pkg/types" + "github.com/rancher/steve/pkg/client" + "github.com/rancher/wrangler/pkg/schemas" + "github.com/stretchr/testify/assert" + "golang.org/x/sync/errgroup" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + schema2 "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/dynamic/fake" + "k8s.io/client-go/rest" + clientgotesting "k8s.io/client-go/testing" +) + +var c *watch.FakeWatcher + +type testFactory struct { + *client.Factory + + fakeClient *fake.FakeDynamicClient +} + +func TestWatchNamesErrReceive(t *testing.T) { + testClientFactory, err := client.NewFactory(&rest.Config{}, false) + assert.Nil(t, err) + + fakeClient := fake.NewSimpleDynamicClient(runtime.NewScheme()) + c = watch.NewFakeWithChanSize(3, true) + defer c.Stop() + errMsgsToSend := []string{"err1", "err2", "err3"} + + for index := range errMsgsToSend { + c.Error(&metav1.Status{ + Message: errMsgsToSend[index], + }) + } + + fakeClient.PrependWatchReactor("*", func(action clientgotesting.Action) (handled bool, ret watch.Interface, err error) { + return true, c, nil + }) + testStore := Store{ + clientGetter: &testFactory{Factory: testClientFactory, + fakeClient: fakeClient, + }, + } + apiSchema := &types.APISchema{Schema: &schemas.Schema{Attributes: map[string]interface{}{"table": "something"}}} + wc, err := testStore.WatchNames(&types.APIRequest{Namespace: "", Schema: apiSchema, Request: &http.Request{}}, apiSchema, types.WatchRequest{}, nil) + assert.Nil(t, err) + + eg := errgroup.Group{} + eg.Go(func() error { return receiveUntil(wc, 5*time.Second) }) + + err = eg.Wait() + assert.Nil(t, err) + + assert.Equal(t, 0, len(c.ResultChan())) +} + +func (t *testFactory) TableAdminClientForWatch(ctx *types.APIRequest, schema *types.APISchema, namespace string, warningHandler rest.WarningHandler) (dynamic.ResourceInterface, error) { + return t.fakeClient.Resource(schema2.GroupVersionResource{}), nil +} + +func receiveUntil(wc chan watch.Event, d time.Duration) error { + timer := time.NewTicker(d) + defer timer.Stop() + for { + select { + case _, ok := <-wc: + if !ok { + c.Stop() + return errors.New("watch chan should not have been closed") + } + continue + case <-timer.C: + return nil + } + } +}