diff --git a/Makefile b/Makefile index 4f5bdef4..151505ea 100644 --- a/Makefile +++ b/Makefile @@ -113,12 +113,12 @@ test-all: clean-all install-tools verify-gen lint metalint test-all-gen bins tes .PHONY: test test: install-tools test-base @echo "--- $@" - @PATH=$(combined_bin_paths):$(PATH) gocov convert $(coverfile) | gocov report + @$(tools_bin_path)/gocov convert $(coverfile) | $(tools_bin_path)/gocov report .PHONY: test-no-deps test-no-deps: test-base @echo "--- $@" - @PATH=$(combined_bin_paths):$(PATH) gocov convert $(coverfile) | gocov report + @$(tools_bin_path)/gocov convert $(coverfile) | $(tools_bin_path)/gocov report .PHONY: test-e2e test-e2e: @@ -139,7 +139,7 @@ test-ci-unit: install-tools test-base verify-gen .PHONY: install-tools install-tools: @echo "--- $@" - GOBIN=$(tools_bin_path) go install github.com/axw/gocov + GOBIN=$(tools_bin_path) go install github.com/axw/gocov/gocov GOBIN=$(tools_bin_path) go install github.com/garethr/kubeval GOBIN=$(tools_bin_path) go install github.com/golang/mock/mockgen GOBIN=$(tools_bin_path) go install github.com/m3db/build-tools/linters/badtime diff --git a/generated/mocks/generate.go b/generated/mocks/generate.go index d95d86fd..7e5b0cc0 100644 --- a/generated/mocks/generate.go +++ b/generated/mocks/generate.go @@ -21,6 +21,7 @@ // mockgen rules for generating mocks using file mode //go:generate sh -c "mockgen -package=placement -destination=$GOPATH/src/$PACKAGE/pkg/m3admin/placement/client_mock.go -source=$GOPATH/src/$PACKAGE/pkg/m3admin/placement/types.go" //go:generate sh -c "mockgen -package=namespace -destination=$GOPATH/src/$PACKAGE/pkg/m3admin/namespace/client_mock.go -source=$GOPATH/src/$PACKAGE/pkg/m3admin/namespace/types.go" +//go:generate sh -c "mockgen -package=topic -destination=$GOPATH/src/$PACKAGE/pkg/m3admin/topic/client_mock.go -source=$GOPATH/src/$PACKAGE/pkg/m3admin/topic/types.go" //go:generate sh -c "mockgen -package=m3admin -destination=$GOPATH/src/$PACKAGE/pkg/m3admin/client_mock.go -source=$GOPATH/src/$PACKAGE/pkg/m3admin/client.go" //go:generate sh -c "mockgen -package=m3db -destination=$GOPATH/src/$PACKAGE/pkg/k8sops/m3db/k8sops_mock.go -source=$GOPATH/src/$PACKAGE/pkg/k8sops/m3db/types.go" //go:generate sh -c "mockgen -package=podidentity -destination=$GOPATH/src/$PACKAGE/pkg/k8sops/podidentity/provider_mock.go -source=$GOPATH/src/$PACKAGE/pkg/k8sops/podidentity/provider.go" diff --git a/go.mod b/go.mod index adc4699c..a89f8a7d 100644 --- a/go.mod +++ b/go.mod @@ -41,7 +41,7 @@ require ( github.com/hashicorp/golang-lru v0.5.3 // indirect github.com/imdario/mergo v0.3.7 // indirect github.com/m3db/build-tools v0.0.0-20181013000606-edd1bdd1df8a - github.com/m3db/m3 v0.8.4 + github.com/m3db/m3 v0.14.2 github.com/m3db/m3x v0.0.0-20190408051622-ebf3c7b94afd github.com/m3db/prometheus_client_golang v0.8.1 // indirect github.com/m3db/prometheus_client_model v0.0.0-20180517145114-8b2299a4bf7d // indirect diff --git a/go.sum b/go.sum index 4673e43e..6ef52e4b 100644 --- a/go.sum +++ b/go.sum @@ -343,8 +343,8 @@ github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/logrusorgru/aurora v0.0.0-20181002194514-a7b3b318ed4e/go.mod h1:7rIyQOR62GCctdiQpZ/zOJlFyk6y+94wXzv6RNZgaR4= github.com/m3db/build-tools v0.0.0-20181013000606-edd1bdd1df8a h1:CwsSHIJLeCESKdZ844jXg/3rQD3yA5azuVlJBp5w8U8= github.com/m3db/build-tools v0.0.0-20181013000606-edd1bdd1df8a/go.mod h1:Pk9AtZeKuCO2xcAth0gxwzRNFv4lV26GPSx4I6A7DQ8= -github.com/m3db/m3 v0.8.4 h1:b7yOpeHsdr4WUt9ODYfnE/7pA8B/FwD6RQMJ/2dxMzQ= -github.com/m3db/m3 v0.8.4/go.mod h1:7izI0EeTws4qSNJ1mb1rF0c3PKbQbogUFDsfAgcH2jo= +github.com/m3db/m3 v0.14.2 h1:cdEsX+4f0eufXf/Cn7zibMLngK4ncZYHZlbGeSxIKDQ= +github.com/m3db/m3 v0.14.2/go.mod h1:7izI0EeTws4qSNJ1mb1rF0c3PKbQbogUFDsfAgcH2jo= github.com/m3db/m3x v0.0.0-20190408051622-ebf3c7b94afd h1:wzLBtXzxZM9b6IXwLSRE5crynocLTCuRDpGDaOJzyuI= github.com/m3db/m3x v0.0.0-20190408051622-ebf3c7b94afd/go.mod h1:zLbcVb352e3Jsg62A6zzEhZ1gumeFsiamTqDs9ZmZrs= github.com/m3db/prometheus_client_golang v0.8.1 h1:t7w/tcFws81JL1j5sqmpqcOyQOpH4RDOmIe3A3fdN3w= diff --git a/pkg/controller/m3admin_client.go b/pkg/controller/m3admin_client.go index a14c4b8f..1c0da577 100644 --- a/pkg/controller/m3admin_client.go +++ b/pkg/controller/m3admin_client.go @@ -28,9 +28,12 @@ import ( "github.com/m3db/m3db-operator/pkg/m3admin" "github.com/m3db/m3db-operator/pkg/m3admin/namespace" "github.com/m3db/m3db-operator/pkg/m3admin/placement" + "github.com/m3db/m3db-operator/pkg/m3admin/topic" "github.com/m3db/m3/src/cluster/generated/proto/placementpb" m3placement "github.com/m3db/m3/src/cluster/placement" + "github.com/m3db/m3/src/msg/generated/proto/topicpb" + m3topic "github.com/m3db/m3/src/msg/topic" "github.com/m3db/m3/src/query/generated/proto/admin" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -44,9 +47,11 @@ type multiAdminClient struct { mu sync.RWMutex nsClients map[string]namespace.Client plClients map[string]placement.Client + tpClients map[string]topic.Client nsClientFn func(...namespace.Option) (namespace.Client, error) plClientFn func(...placement.Option) (placement.Client, error) + tpClientFn func(...topic.Option) (topic.Client, error) clusterKeyFn func(metav1.ObjectMetaAccessor, string) string clusterURLFn func(metav1.ObjectMetaAccessor) string @@ -89,8 +94,10 @@ func newMultiAdminClient(adminOpts []m3admin.Option, logger *zap.Logger) *multiA return &multiAdminClient{ nsClients: make(map[string]namespace.Client), plClients: make(map[string]placement.Client), + tpClients: make(map[string]topic.Client), nsClientFn: namespace.NewClient, plClientFn: placement.NewClient, + tpClientFn: topic.NewClient, clusterKeyFn: clusterKey, clusterURLFn: clusterURL, adminClientFn: newAdminClient, @@ -172,6 +179,39 @@ func (m *multiAdminClient) placementClientForCluster(cluster metav1.ObjectMetaAc return client } +func (m *multiAdminClient) topicClientForCluster(cluster metav1.ObjectMetaAccessor) topic.Client { + url := m.clusterURLFn(cluster) + key := m.clusterKeyFn(cluster, url) + + m.mu.RLock() + client, ok := m.tpClients[key] + m.mu.RUnlock() + if ok { + return client + } + + adminClient := m.adminClientForCluster(cluster) + client, err := m.tpClientFn( + topic.WithClient(adminClient), + topic.WithLogger(m.logger), + topic.WithURL(url), + ) + if err != nil { + return newErrorTopicClient(err) + } + + m.mu.Lock() + mapClient, ok := m.tpClients[key] + if ok { + client = mapClient + } else { + m.tpClients[key] = client + } + m.mu.Unlock() + + return client +} + // errorNamespaceClient implements namespace.Client by returning an error that a // specified cluster couldn't be found, enabling easier ergonomics for the // common pattern of looking up a client and returning an error if one is @@ -229,3 +269,27 @@ func (c errorPlacementClient) Remove(string) error { func (c errorPlacementClient) Replace(string, placementpb.Instance) error { return c.err } + +type errorTopicClient struct { + err error +} + +func newErrorTopicClient(err error) topic.Client { + return errorTopicClient{err: err} +} + +func (c errorTopicClient) Init(name string, req *admin.TopicInitRequest) error { + return c.err +} + +func (c errorTopicClient) Get(topicName string) (m3topic.Topic, error) { + return nil, c.err +} + +func (c errorTopicClient) Delete(topicName string) error { + return c.err +} + +func (c errorTopicClient) Add(topicName string, consumerSvc *topicpb.ConsumerService) error { + return c.err +} diff --git a/pkg/controller/m3admin_client_test.go b/pkg/controller/m3admin_client_test.go index 0323de64..45bcc3c6 100644 --- a/pkg/controller/m3admin_client_test.go +++ b/pkg/controller/m3admin_client_test.go @@ -28,8 +28,10 @@ import ( "github.com/m3db/m3db-operator/pkg/m3admin" "github.com/m3db/m3db-operator/pkg/m3admin/namespace" "github.com/m3db/m3db-operator/pkg/m3admin/placement" + "github.com/m3db/m3db-operator/pkg/m3admin/topic" "github.com/m3db/m3/src/cluster/generated/proto/placementpb" + "github.com/m3db/m3/src/msg/generated/proto/topicpb" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -168,6 +170,38 @@ func TestPlacementClientForCluster(t *testing.T) { assert.Equal(t, testErr, cl3.Delete()) } +func TestTopicClientForCluster(t *testing.T) { + mc := gomock.NewController(t) + defer mc.Finish() + + m3Client := m3admin.NewMockClient(mc) + tpClient := topic.NewMockClient(mc) + + m := newTestAdminClient(m3Client, "http://foo") + m.tpClientFn = func(_ ...topic.Option) (topic.Client, error) { + return tpClient, nil + } + + clusterA := newM3DBCluster("ns", "a") + clusterB := newM3DBCluster("ns", "b") + clusterC := newM3DBCluster("ns", "c") + testErr := errors.New("test") + + cl := m.topicClientForCluster(clusterA) + assert.Equal(t, tpClient, cl) + _ = m.topicClientForCluster(clusterB) + assert.Equal(t, 2, len(m.tpClients)) + m.tpClientFn = func(_ ...topic.Option) (topic.Client, error) { + return nil, testErr + } + + cl2 := m.topicClientForCluster(clusterA) + assert.Equal(t, cl, cl2) + + cl3 := m.topicClientForCluster(clusterC) + assert.Equal(t, testErr, cl3.Delete("topic")) +} + func TestErrorNamespaceClient(t *testing.T) { clErr := errors.New("test") cl := newErrorNamespaceClient(clErr) @@ -206,3 +240,24 @@ func TestErrorPlacementClient(t *testing.T) { err = cl.Replace("foo", placementpb.Instance{}) assert.Equal(t, clErr, err) } + +func TestErrorTopicClient(t *testing.T) { + clErr := errors.New("test") + cl := newErrorTopicClient(clErr) + + err := cl.Init("topic", nil) + assert.Equal(t, clErr, err) + + tp, err := cl.Get("topic") + assert.Nil(t, tp) + assert.Equal(t, clErr, err) + + err = cl.Delete("topic") + assert.Equal(t, clErr, err) + + err = cl.Add("topic", &topicpb.ConsumerService{}) + assert.Equal(t, clErr, err) + + err = cl.Delete("topic") + assert.Equal(t, clErr, err) +} diff --git a/pkg/m3admin/client.go b/pkg/m3admin/client.go index 34b17df1..6660eeb3 100644 --- a/pkg/m3admin/client.go +++ b/pkg/m3admin/client.go @@ -50,8 +50,8 @@ var ( // Client is an m3admin client. type Client interface { - DoHTTPRequest(action, url string, data *bytes.Buffer) (*http.Response, error) - DoHTTPJSONPBRequest(action, url string, request, response proto.Message) error + DoHTTPRequest(action, url string, data *bytes.Buffer, opts ...RequestOption) (*http.Response, error) + DoHTTPJSONPBRequest(action, url string, request, response proto.Message, opts ...RequestOption) error } type client struct { @@ -95,9 +95,13 @@ func NewClient(clientOpts ...Option) Client { func (c *client) DoHTTPRequest( action, url string, data *bytes.Buffer, + options ...RequestOption, ) (*http.Response, error) { - l := c.logger.With(zap.String("action", action), zap.String("url", url)) + opts := &reqOptions{} + for _, o := range options { + o.execute(opts) + } var request *retryhttp.Request var err error @@ -116,6 +120,12 @@ func (c *client) DoHTTPRequest( } } + if opts.headers != nil { + for k, v := range opts.headers { + request.Header.Add(k, v) + } + } + request.Header.Add("Content-Type", "application/json") if c.environment != "" { request.Header.Add(m3EnvironmentHeader, c.environment) @@ -177,6 +187,7 @@ func (c *client) DoHTTPJSONPBRequest( action, url string, request proto.Message, response proto.Message, + opts ...RequestOption, ) error { var data *bytes.Buffer if request != nil { @@ -186,7 +197,7 @@ func (c *client) DoHTTPJSONPBRequest( } } - r, err := c.DoHTTPRequest(action, url, data) + r, err := c.DoHTTPRequest(action, url, data, opts...) if err != nil { return err } diff --git a/pkg/m3admin/client_mock.go b/pkg/m3admin/client_mock.go index 1a68c758..af8b1aee 100644 --- a/pkg/m3admin/client_mock.go +++ b/pkg/m3admin/client_mock.go @@ -57,30 +57,40 @@ func (m *MockClient) EXPECT() *MockClientMockRecorder { } // DoHTTPRequest mocks base method -func (m *MockClient) DoHTTPRequest(action, url string, data *bytes.Buffer) (*http.Response, error) { +func (m *MockClient) DoHTTPRequest(action, url string, data *bytes.Buffer, opts ...RequestOption) (*http.Response, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "DoHTTPRequest", action, url, data) + varargs := []interface{}{action, url, data} + for _, a := range opts { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "DoHTTPRequest", varargs...) ret0, _ := ret[0].(*http.Response) ret1, _ := ret[1].(error) return ret0, ret1 } // DoHTTPRequest indicates an expected call of DoHTTPRequest -func (mr *MockClientMockRecorder) DoHTTPRequest(action, url, data interface{}) *gomock.Call { +func (mr *MockClientMockRecorder) DoHTTPRequest(action, url, data interface{}, opts ...interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DoHTTPRequest", reflect.TypeOf((*MockClient)(nil).DoHTTPRequest), action, url, data) + varargs := append([]interface{}{action, url, data}, opts...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DoHTTPRequest", reflect.TypeOf((*MockClient)(nil).DoHTTPRequest), varargs...) } // DoHTTPJSONPBRequest mocks base method -func (m *MockClient) DoHTTPJSONPBRequest(action, url string, request, response proto.Message) error { +func (m *MockClient) DoHTTPJSONPBRequest(action, url string, request, response proto.Message, opts ...RequestOption) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "DoHTTPJSONPBRequest", action, url, request, response) + varargs := []interface{}{action, url, request, response} + for _, a := range opts { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "DoHTTPJSONPBRequest", varargs...) ret0, _ := ret[0].(error) return ret0 } // DoHTTPJSONPBRequest indicates an expected call of DoHTTPJSONPBRequest -func (mr *MockClientMockRecorder) DoHTTPJSONPBRequest(action, url, request, response interface{}) *gomock.Call { +func (mr *MockClientMockRecorder) DoHTTPJSONPBRequest(action, url, request, response interface{}, opts ...interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DoHTTPJSONPBRequest", reflect.TypeOf((*MockClient)(nil).DoHTTPJSONPBRequest), action, url, request, response) + varargs := append([]interface{}{action, url, request, response}, opts...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DoHTTPJSONPBRequest", reflect.TypeOf((*MockClient)(nil).DoHTTPJSONPBRequest), varargs...) } diff --git a/pkg/m3admin/client_test.go b/pkg/m3admin/client_test.go index 7744ea44..ed1cf94a 100644 --- a/pkg/m3admin/client_test.go +++ b/pkg/m3admin/client_test.go @@ -113,6 +113,32 @@ func TestClient_DoHTTPRequest_Header(t *testing.T) { assert.Equal(t, []byte("hello"), readAll(resp.Body)) } +func TestClient_DoHTTPRequest_PerRequestHeader(t *testing.T) { + s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Header.Get("test-header") != "test-val" { + w.WriteHeader(500) + return + } + w.Write([]byte("hello")) + })) + defer s.Close() + + readAll := func(r io.Reader) []byte { + data, err := ioutil.ReadAll(r) + assert.NoError(t, err) + return data + } + + cl := newTestClient(WithHTTPClient(devNullRetry())) + _, err := cl.DoHTTPRequest("GET", s.URL, nil) + assert.Error(t, err) + + resp, err := cl.DoHTTPRequest("GET", s.URL, nil, WithHeader("test-header", "test-val")) + require.NoError(t, err) + assert.Equal(t, resp.StatusCode, http.StatusOK) + assert.Equal(t, []byte("hello"), readAll(resp.Body)) +} + func TestClient_DoHTTPRequest_Err(t *testing.T) { for _, test := range []struct { code int diff --git a/pkg/m3admin/options.go b/pkg/m3admin/options.go index 49444559..7e309485 100644 --- a/pkg/m3admin/options.go +++ b/pkg/m3admin/options.go @@ -66,3 +66,28 @@ func WithEnvironment(e string) Option { o.environment = e }) } + +// RequestOption defines a per-request option. +type RequestOption interface { + execute(*reqOptions) +} + +type reqOptionFn func(o *reqOptions) + +func (f reqOptionFn) execute(o *reqOptions) { + f(o) +} + +type reqOptions struct { + headers map[string]string +} + +// WithHeader adds a header to a request. It can be specified multiple times. +func WithHeader(name, value string) RequestOption { + return reqOptionFn(func(o *reqOptions) { + if o.headers == nil { + o.headers = make(map[string]string) + } + o.headers[name] = value + }) +} diff --git a/pkg/m3admin/topic/client.go b/pkg/m3admin/topic/client.go new file mode 100644 index 00000000..95e8c322 --- /dev/null +++ b/pkg/m3admin/topic/client.go @@ -0,0 +1,123 @@ +// Copyright (c) 2018 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package topic + +import ( + "errors" + "net/http" + + "github.com/m3db/m3db-operator/pkg/m3admin" + + "github.com/m3db/m3/src/msg/generated/proto/topicpb" + m3topic "github.com/m3db/m3/src/msg/topic" + "github.com/m3db/m3/src/query/generated/proto/admin" + + "go.uber.org/zap" +) + +const ( + headerTopicName = "topic-name" + + topicBaseURL = "/api/v1/topic" + topicInitURL = topicBaseURL + "/init" + topicAddURL = topicBaseURL +) + +type topicClient struct { + url string + client m3admin.Client + logger *zap.Logger +} + +// NewClient creates a new topic client. +func NewClient(opts ...Option) (Client, error) { + logger := zap.NewNop() + tc := &topicClient{ + client: m3admin.NewClient(), + logger: logger, + } + + for _, o := range opts { + if err := o.execute(tc); err != nil { + return nil, err + } + } + return tc, nil +} + +// Init creates a given topic. +func (t *topicClient) Init(name string, req *admin.TopicInitRequest) error { + url := t.url + topicInitURL + err := t.client.DoHTTPJSONPBRequest(http.MethodPost, url, req, nil, withTopicName(name)) + if err != nil { + return err + } + t.logger.Info("successfully created topic") + return nil +} + +// Delete deletes a specific topic. +func (t *topicClient) Delete(topicName string) error { + url := t.url + topicBaseURL + err := t.client.DoHTTPJSONPBRequest(http.MethodDelete, url, nil, nil, withTopicName(topicName)) + if err != nil { + return err + } + t.logger.Info("successfully deleted topic") + return nil +} + +// Get will get given topic. +func (t *topicClient) Get(topicName string) (m3topic.Topic, error) { + var ( + url = t.url + topicBaseURL + resp = &admin.TopicGetResponse{} + ) + err := t.client.DoHTTPJSONPBRequest(http.MethodGet, url, nil, resp, withTopicName(topicName)) + if err != nil { + return nil, err + } + + if resp.Topic == nil { + return nil, errors.New("nil topic fetch") + } + t.logger.Debug("topic retrieved") + return m3topic.NewTopicFromProto(resp.Topic) +} + +// Add adds a consumer service to a given topic. +func (t *topicClient) Add(topicName string, consumerSvc *topicpb.ConsumerService) error { + url := t.url + topicBaseURL + req := &admin.TopicAddRequest{ + ConsumerService: consumerSvc, + } + + err := t.client.DoHTTPJSONPBRequest(http.MethodPost, url, req, nil, withTopicName(topicName)) + if err != nil { + return err + } + t.logger.Info("successfully added consumer to topic") + return nil +} + +func withTopicName(name string) m3admin.RequestOption { + return m3admin.WithHeader(headerTopicName, name) +} diff --git a/pkg/m3admin/topic/client_mock.go b/pkg/m3admin/topic/client_mock.go new file mode 100644 index 00000000..fd49110c --- /dev/null +++ b/pkg/m3admin/topic/client_mock.go @@ -0,0 +1,115 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/m3db/m3db-operator/pkg/m3admin/topic/types.go + +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +// Package topic is a generated GoMock package. +package topic + +import ( + "reflect" + + "github.com/m3db/m3/src/msg/generated/proto/topicpb" + "github.com/m3db/m3/src/msg/topic" + "github.com/m3db/m3/src/query/generated/proto/admin" + + "github.com/golang/mock/gomock" +) + +// MockClient is a mock of Client interface +type MockClient struct { + ctrl *gomock.Controller + recorder *MockClientMockRecorder +} + +// MockClientMockRecorder is the mock recorder for MockClient +type MockClientMockRecorder struct { + mock *MockClient +} + +// NewMockClient creates a new mock instance +func NewMockClient(ctrl *gomock.Controller) *MockClient { + mock := &MockClient{ctrl: ctrl} + mock.recorder = &MockClientMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use +func (m *MockClient) EXPECT() *MockClientMockRecorder { + return m.recorder +} + +// Init mocks base method +func (m *MockClient) Init(name string, req *admin.TopicInitRequest) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Init", name, req) + ret0, _ := ret[0].(error) + return ret0 +} + +// Init indicates an expected call of Init +func (mr *MockClientMockRecorder) Init(name, req interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Init", reflect.TypeOf((*MockClient)(nil).Init), name, req) +} + +// Get mocks base method +func (m *MockClient) Get(topicName string) (topic.Topic, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Get", topicName) + ret0, _ := ret[0].(topic.Topic) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Get indicates an expected call of Get +func (mr *MockClientMockRecorder) Get(topicName interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Get", reflect.TypeOf((*MockClient)(nil).Get), topicName) +} + +// Delete mocks base method +func (m *MockClient) Delete(topicName string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Delete", topicName) + ret0, _ := ret[0].(error) + return ret0 +} + +// Delete indicates an expected call of Delete +func (mr *MockClientMockRecorder) Delete(topicName interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Delete", reflect.TypeOf((*MockClient)(nil).Delete), topicName) +} + +// Add mocks base method +func (m *MockClient) Add(topicName string, consumerSvc *topicpb.ConsumerService) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Add", topicName, consumerSvc) + ret0, _ := ret[0].(error) + return ret0 +} + +// Add indicates an expected call of Add +func (mr *MockClientMockRecorder) Add(topicName, consumerSvc interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Add", reflect.TypeOf((*MockClient)(nil).Add), topicName, consumerSvc) +} diff --git a/pkg/m3admin/topic/client_test.go b/pkg/m3admin/topic/client_test.go new file mode 100644 index 00000000..6d6b65b4 --- /dev/null +++ b/pkg/m3admin/topic/client_test.go @@ -0,0 +1,203 @@ +// Copyright (c) 2018 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package topic + +import ( + "io/ioutil" + "net/http" + "net/http/httptest" + "testing" + + "github.com/m3db/m3db-operator/pkg/m3admin" + + "github.com/m3db/m3/src/msg/generated/proto/topicpb" + "github.com/m3db/m3/src/query/generated/proto/admin" + + retryhttp "github.com/hashicorp/go-retryablehttp" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +const ( + testTopicName = "someTestTopic" +) + +func checkTopicNameHeader(t *testing.T, r *http.Request) { + if r.Header.Get(headerTopicName) != testTopicName { + t.Error("expected topic name header") + } +} + +// Client to avoid waiting many seconds in tests. +func newM3adminClient() m3admin.Client { + retry := retryhttp.NewClient() + retry.RetryMax = 0 + + return m3admin.NewClient(m3admin.WithHTTPClient(retry)) +} + +func newTopicClient(t *testing.T, url string) Client { + cl, err := NewClient( + WithURL(url), + WithClient(newM3adminClient()), + ) + + require.NoError(t, err) + require.NotNil(t, cl) + + return cl +} + +func TestDelete(t *testing.T) { + s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + checkTopicNameHeader(t, r) + w.WriteHeader(200) + w.Write([]byte("{}")) + })) + + defer s.Close() + client := newTopicClient(t, s.URL) + + err := client.Delete(testTopicName) + require.Nil(t, err) +} + +func TestDeleteErr(t *testing.T) { + s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + checkTopicNameHeader(t, r) + w.WriteHeader(500) + w.Write([]byte("{}")) + })) + + defer s.Close() + client := newTopicClient(t, s.URL) + + err := client.Delete(testTopicName) + require.NotNil(t, err) +} + +func TestAdd(t *testing.T) { + s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + checkTopicNameHeader(t, r) + bytes, err := ioutil.ReadAll(r.Body) + if err != nil { + w.WriteHeader(400) + return + } + + const expected = `{"consumerService":{"serviceId":{"name":"foo","environment":"bar"}}}` + assert.Equal(t, expected, string(bytes)) + + w.WriteHeader(200) + w.Write([]byte("{}")) + })) + + defer s.Close() + client := newTopicClient(t, s.URL) + + err := client.Add(testTopicName, &topicpb.ConsumerService{ + ServiceId: &topicpb.ServiceID{ + Name: "foo", + Environment: "bar", + }, + }) + require.Nil(t, err) +} + +func TestAddErr(t *testing.T) { + s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + checkTopicNameHeader(t, r) + w.WriteHeader(500) + w.Write([]byte("{}")) + })) + + defer s.Close() + client := newTopicClient(t, s.URL) + + err := client.Add(testTopicName, &topicpb.ConsumerService{ + ServiceId: &topicpb.ServiceID{ + Name: "foo", + Environment: "bar", + }, + }) + require.NotNil(t, err) +} + +func TestInit(t *testing.T) { + s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + checkTopicNameHeader(t, r) + w.WriteHeader(200) + w.Write([]byte("{}")) + })) + + defer s.Close() + client := newTopicClient(t, s.URL) + + err := client.Init(testTopicName, &admin.TopicInitRequest{ + NumberOfShards: 64, + }) + require.Nil(t, err) +} + +func TestInitErr(t *testing.T) { + s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + checkTopicNameHeader(t, r) + w.WriteHeader(500) + w.Write([]byte("{}")) + })) + + defer s.Close() + client := newTopicClient(t, s.URL) + + err := client.Init(testTopicName, &admin.TopicInitRequest{ + NumberOfShards: 64, + }) + require.NotNil(t, err) +} + +func TestGet(t *testing.T) { + s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + checkTopicNameHeader(t, r) + w.WriteHeader(200) + w.Write([]byte(`{"topic": {}}`)) + })) + defer s.Close() + client := newTopicClient(t, s.URL) + + topic, err := client.Get(testTopicName) + require.NotNil(t, topic) + require.NoError(t, err) +} + +func TestGetErr(t *testing.T) { + s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + checkTopicNameHeader(t, r) + w.WriteHeader(404) + w.Write([]byte("{}")) + })) + defer s.Close() + + client := newTopicClient(t, s.URL) + + topic, err := client.Get(testTopicName) + require.Nil(t, topic) + require.Error(t, err) +} diff --git a/pkg/m3admin/topic/options.go b/pkg/m3admin/topic/options.go new file mode 100644 index 00000000..a101aba3 --- /dev/null +++ b/pkg/m3admin/topic/options.go @@ -0,0 +1,68 @@ +// Copyright (c) 2018 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package topic + +import ( + "net/url" + + "github.com/m3db/m3db-operator/pkg/m3admin" + + "go.uber.org/zap" +) + +// Option provides an interface that can be used for setter options with the +// constructor +type Option interface { + execute(*topicClient) error +} + +type optionFn func(p *topicClient) error + +func (fn optionFn) execute(p *topicClient) error { + return fn(p) +} + +// WithURL is a setter to override the default URL +func WithURL(u string) Option { + return optionFn(func(p *topicClient) error { + if _, err := url.ParseRequestURI(u); err != nil { + return err + } + p.url = u + return nil + }) +} + +// WithLogger is a setter to override the default logger +func WithLogger(logger *zap.Logger) Option { + return optionFn(func(p *topicClient) error { + p.logger = logger + return nil + }) +} + +// WithClient configures an m3admin client. +func WithClient(cl m3admin.Client) Option { + return optionFn(func(p *topicClient) error { + p.client = cl + return nil + }) +} diff --git a/pkg/m3admin/topic/options_test.go b/pkg/m3admin/topic/options_test.go new file mode 100644 index 00000000..01655dc7 --- /dev/null +++ b/pkg/m3admin/topic/options_test.go @@ -0,0 +1,54 @@ +// Copyright (c) 2018 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package topic + +import ( + "testing" + + "github.com/m3db/m3db-operator/pkg/m3admin" + + "github.com/stretchr/testify/assert" + "go.uber.org/zap" +) + +func TestOptions(t *testing.T) { + client := &topicClient{} + + opt := WithURL("...") + assert.Error(t, opt.execute(client)) + + const url = "http://localhost:1234" + l := zap.NewNop() + m3cl := m3admin.NewClient() + opts := []Option{ + WithURL(url), + WithLogger(l), + WithClient(m3cl), + } + + for _, opt := range opts { + assert.NoError(t, opt.execute(client)) + } + + assert.Equal(t, l, client.logger) + assert.Equal(t, m3cl, client.client) + assert.Equal(t, url, client.url) +} diff --git a/pkg/m3admin/topic/types.go b/pkg/m3admin/topic/types.go new file mode 100644 index 00000000..ddbd8bff --- /dev/null +++ b/pkg/m3admin/topic/types.go @@ -0,0 +1,35 @@ +// Copyright (c) 2018 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package topic + +import ( + "github.com/m3db/m3/src/msg/generated/proto/topicpb" + m3topic "github.com/m3db/m3/src/msg/topic" + "github.com/m3db/m3/src/query/generated/proto/admin" +) + +// Client provides the interface to interact with the topic APIs. +type Client interface { + Init(name string, req *admin.TopicInitRequest) error + Get(topicName string) (m3topic.Topic, error) + Delete(topicName string) error + Add(topicName string, consumerSvc *topicpb.ConsumerService) error +}