Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[m3admin] Add topic client #190

Merged
merged 2 commits into from
Jan 31, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -113,12 +113,12 @@ test-all: clean-all install-tools verify-gen lint metalint test-all-gen bins tes
.PHONY: test
test: install-tools test-base
@echo "--- $@"
@PATH=$(combined_bin_paths):$(PATH) gocov convert $(coverfile) | gocov report
@$(tools_bin_path)/gocov convert $(coverfile) | $(tools_bin_path)/gocov report

.PHONY: test-no-deps
test-no-deps: test-base
@echo "--- $@"
@PATH=$(combined_bin_paths):$(PATH) gocov convert $(coverfile) | gocov report
@$(tools_bin_path)/gocov convert $(coverfile) | $(tools_bin_path)/gocov report

.PHONY: test-e2e
test-e2e:
Expand All @@ -139,7 +139,7 @@ test-ci-unit: install-tools test-base verify-gen
.PHONY: install-tools
install-tools:
@echo "--- $@"
GOBIN=$(tools_bin_path) go install github.com/axw/gocov
GOBIN=$(tools_bin_path) go install github.com/axw/gocov/gocov
GOBIN=$(tools_bin_path) go install github.com/garethr/kubeval
GOBIN=$(tools_bin_path) go install github.com/golang/mock/mockgen
GOBIN=$(tools_bin_path) go install github.com/m3db/build-tools/linters/badtime
Expand Down
1 change: 1 addition & 0 deletions generated/mocks/generate.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
// mockgen rules for generating mocks using file mode
//go:generate sh -c "mockgen -package=placement -destination=$GOPATH/src/$PACKAGE/pkg/m3admin/placement/client_mock.go -source=$GOPATH/src/$PACKAGE/pkg/m3admin/placement/types.go"
//go:generate sh -c "mockgen -package=namespace -destination=$GOPATH/src/$PACKAGE/pkg/m3admin/namespace/client_mock.go -source=$GOPATH/src/$PACKAGE/pkg/m3admin/namespace/types.go"
//go:generate sh -c "mockgen -package=topic -destination=$GOPATH/src/$PACKAGE/pkg/m3admin/topic/client_mock.go -source=$GOPATH/src/$PACKAGE/pkg/m3admin/topic/types.go"
//go:generate sh -c "mockgen -package=m3admin -destination=$GOPATH/src/$PACKAGE/pkg/m3admin/client_mock.go -source=$GOPATH/src/$PACKAGE/pkg/m3admin/client.go"
//go:generate sh -c "mockgen -package=m3db -destination=$GOPATH/src/$PACKAGE/pkg/k8sops/m3db/k8sops_mock.go -source=$GOPATH/src/$PACKAGE/pkg/k8sops/m3db/types.go"
//go:generate sh -c "mockgen -package=podidentity -destination=$GOPATH/src/$PACKAGE/pkg/k8sops/podidentity/provider_mock.go -source=$GOPATH/src/$PACKAGE/pkg/k8sops/podidentity/provider.go"
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ require (
github.com/hashicorp/golang-lru v0.5.3 // indirect
github.com/imdario/mergo v0.3.7 // indirect
github.com/m3db/build-tools v0.0.0-20181013000606-edd1bdd1df8a
github.com/m3db/m3 v0.8.4
github.com/m3db/m3 v0.14.2
github.com/m3db/m3x v0.0.0-20190408051622-ebf3c7b94afd
github.com/m3db/prometheus_client_golang v0.8.1 // indirect
github.com/m3db/prometheus_client_model v0.0.0-20180517145114-8b2299a4bf7d // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -343,8 +343,8 @@ github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/logrusorgru/aurora v0.0.0-20181002194514-a7b3b318ed4e/go.mod h1:7rIyQOR62GCctdiQpZ/zOJlFyk6y+94wXzv6RNZgaR4=
github.com/m3db/build-tools v0.0.0-20181013000606-edd1bdd1df8a h1:CwsSHIJLeCESKdZ844jXg/3rQD3yA5azuVlJBp5w8U8=
github.com/m3db/build-tools v0.0.0-20181013000606-edd1bdd1df8a/go.mod h1:Pk9AtZeKuCO2xcAth0gxwzRNFv4lV26GPSx4I6A7DQ8=
github.com/m3db/m3 v0.8.4 h1:b7yOpeHsdr4WUt9ODYfnE/7pA8B/FwD6RQMJ/2dxMzQ=
github.com/m3db/m3 v0.8.4/go.mod h1:7izI0EeTws4qSNJ1mb1rF0c3PKbQbogUFDsfAgcH2jo=
github.com/m3db/m3 v0.14.2 h1:cdEsX+4f0eufXf/Cn7zibMLngK4ncZYHZlbGeSxIKDQ=
github.com/m3db/m3 v0.14.2/go.mod h1:7izI0EeTws4qSNJ1mb1rF0c3PKbQbogUFDsfAgcH2jo=
github.com/m3db/m3x v0.0.0-20190408051622-ebf3c7b94afd h1:wzLBtXzxZM9b6IXwLSRE5crynocLTCuRDpGDaOJzyuI=
github.com/m3db/m3x v0.0.0-20190408051622-ebf3c7b94afd/go.mod h1:zLbcVb352e3Jsg62A6zzEhZ1gumeFsiamTqDs9ZmZrs=
github.com/m3db/prometheus_client_golang v0.8.1 h1:t7w/tcFws81JL1j5sqmpqcOyQOpH4RDOmIe3A3fdN3w=
Expand Down
64 changes: 64 additions & 0 deletions pkg/controller/m3admin_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,12 @@ import (
"github.com/m3db/m3db-operator/pkg/m3admin"
"github.com/m3db/m3db-operator/pkg/m3admin/namespace"
"github.com/m3db/m3db-operator/pkg/m3admin/placement"
"github.com/m3db/m3db-operator/pkg/m3admin/topic"

"github.com/m3db/m3/src/cluster/generated/proto/placementpb"
m3placement "github.com/m3db/m3/src/cluster/placement"
"github.com/m3db/m3/src/msg/generated/proto/topicpb"
m3topic "github.com/m3db/m3/src/msg/topic"
"github.com/m3db/m3/src/query/generated/proto/admin"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -44,9 +47,11 @@ type multiAdminClient struct {
mu sync.RWMutex
nsClients map[string]namespace.Client
plClients map[string]placement.Client
tpClients map[string]topic.Client

nsClientFn func(...namespace.Option) (namespace.Client, error)
plClientFn func(...placement.Option) (placement.Client, error)
tpClientFn func(...topic.Option) (topic.Client, error)

clusterKeyFn func(metav1.ObjectMetaAccessor, string) string
clusterURLFn func(metav1.ObjectMetaAccessor) string
Expand Down Expand Up @@ -89,8 +94,10 @@ func newMultiAdminClient(adminOpts []m3admin.Option, logger *zap.Logger) *multiA
return &multiAdminClient{
nsClients: make(map[string]namespace.Client),
plClients: make(map[string]placement.Client),
tpClients: make(map[string]topic.Client),
nsClientFn: namespace.NewClient,
plClientFn: placement.NewClient,
tpClientFn: topic.NewClient,
clusterKeyFn: clusterKey,
clusterURLFn: clusterURL,
adminClientFn: newAdminClient,
Expand Down Expand Up @@ -172,6 +179,39 @@ func (m *multiAdminClient) placementClientForCluster(cluster metav1.ObjectMetaAc
return client
}

func (m *multiAdminClient) topicClientForCluster(cluster metav1.ObjectMetaAccessor) topic.Client {
url := m.clusterURLFn(cluster)
key := m.clusterKeyFn(cluster, url)

m.mu.RLock()
client, ok := m.tpClients[key]
m.mu.RUnlock()
if ok {
return client
}

adminClient := m.adminClientForCluster(cluster)
client, err := m.tpClientFn(
topic.WithClient(adminClient),
topic.WithLogger(m.logger),
topic.WithURL(url),
)
if err != nil {
return newErrorTopicClient(err)
}

m.mu.Lock()
mapClient, ok := m.tpClients[key]
if ok {
client = mapClient
} else {
m.tpClients[key] = client
}
m.mu.Unlock()

return client
}

// errorNamespaceClient implements namespace.Client by returning an error that a
// specified cluster couldn't be found, enabling easier ergonomics for the
// common pattern of looking up a client and returning an error if one is
Expand Down Expand Up @@ -229,3 +269,27 @@ func (c errorPlacementClient) Remove(string) error {
func (c errorPlacementClient) Replace(string, placementpb.Instance) error {
return c.err
}

type errorTopicClient struct {
err error
}

func newErrorTopicClient(err error) topic.Client {
return errorTopicClient{err: err}
}

func (c errorTopicClient) Init(name string, req *admin.TopicInitRequest) error {
return c.err
}

func (c errorTopicClient) Get(topicName string) (m3topic.Topic, error) {
return nil, c.err
}

func (c errorTopicClient) Delete(topicName string) error {
return c.err
}

func (c errorTopicClient) Add(topicName string, consumerSvc *topicpb.ConsumerService) error {
return c.err
}
55 changes: 55 additions & 0 deletions pkg/controller/m3admin_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@ import (
"github.com/m3db/m3db-operator/pkg/m3admin"
"github.com/m3db/m3db-operator/pkg/m3admin/namespace"
"github.com/m3db/m3db-operator/pkg/m3admin/placement"
"github.com/m3db/m3db-operator/pkg/m3admin/topic"

"github.com/m3db/m3/src/cluster/generated/proto/placementpb"
"github.com/m3db/m3/src/msg/generated/proto/topicpb"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

Expand Down Expand Up @@ -168,6 +170,38 @@ func TestPlacementClientForCluster(t *testing.T) {
assert.Equal(t, testErr, cl3.Delete())
}

func TestTopicClientForCluster(t *testing.T) {
mc := gomock.NewController(t)
defer mc.Finish()

m3Client := m3admin.NewMockClient(mc)
tpClient := topic.NewMockClient(mc)

m := newTestAdminClient(m3Client, "http://foo")
m.tpClientFn = func(_ ...topic.Option) (topic.Client, error) {
return tpClient, nil
}

clusterA := newM3DBCluster("ns", "a")
clusterB := newM3DBCluster("ns", "b")
clusterC := newM3DBCluster("ns", "c")
testErr := errors.New("test")

cl := m.topicClientForCluster(clusterA)
assert.Equal(t, tpClient, cl)
_ = m.topicClientForCluster(clusterB)
assert.Equal(t, 2, len(m.tpClients))
m.tpClientFn = func(_ ...topic.Option) (topic.Client, error) {
return nil, testErr
}

cl2 := m.topicClientForCluster(clusterA)
assert.Equal(t, cl, cl2)

cl3 := m.topicClientForCluster(clusterC)
assert.Equal(t, testErr, cl3.Delete("topic"))
}

func TestErrorNamespaceClient(t *testing.T) {
clErr := errors.New("test")
cl := newErrorNamespaceClient(clErr)
Expand Down Expand Up @@ -206,3 +240,24 @@ func TestErrorPlacementClient(t *testing.T) {
err = cl.Replace("foo", placementpb.Instance{})
assert.Equal(t, clErr, err)
}

func TestErrorTopicClient(t *testing.T) {
clErr := errors.New("test")
cl := newErrorTopicClient(clErr)

err := cl.Init("topic", nil)
assert.Equal(t, clErr, err)

tp, err := cl.Get("topic")
assert.Nil(t, tp)
assert.Equal(t, clErr, err)

err = cl.Delete("topic")
assert.Equal(t, clErr, err)

err = cl.Add("topic", &topicpb.ConsumerService{})
assert.Equal(t, clErr, err)

err = cl.Delete("topic")
assert.Equal(t, clErr, err)
}
19 changes: 15 additions & 4 deletions pkg/m3admin/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ var (

// Client is an m3admin client.
type Client interface {
DoHTTPRequest(action, url string, data *bytes.Buffer) (*http.Response, error)
DoHTTPJSONPBRequest(action, url string, request, response proto.Message) error
DoHTTPRequest(action, url string, data *bytes.Buffer, opts ...RequestOption) (*http.Response, error)
DoHTTPJSONPBRequest(action, url string, request, response proto.Message, opts ...RequestOption) error
}

type client struct {
Expand Down Expand Up @@ -95,9 +95,13 @@ func NewClient(clientOpts ...Option) Client {
func (c *client) DoHTTPRequest(
action, url string,
data *bytes.Buffer,
options ...RequestOption,
) (*http.Response, error) {

l := c.logger.With(zap.String("action", action), zap.String("url", url))
opts := &reqOptions{}
for _, o := range options {
o.execute(opts)
}

var request *retryhttp.Request
var err error
Expand All @@ -116,6 +120,12 @@ func (c *client) DoHTTPRequest(
}
}

if opts.headers != nil {
for k, v := range opts.headers {
request.Header.Add(k, v)
}
}

request.Header.Add("Content-Type", "application/json")
if c.environment != "" {
request.Header.Add(m3EnvironmentHeader, c.environment)
Expand Down Expand Up @@ -177,6 +187,7 @@ func (c *client) DoHTTPJSONPBRequest(
action, url string,
request proto.Message,
response proto.Message,
opts ...RequestOption,
) error {
var data *bytes.Buffer
if request != nil {
Expand All @@ -186,7 +197,7 @@ func (c *client) DoHTTPJSONPBRequest(
}
}

r, err := c.DoHTTPRequest(action, url, data)
r, err := c.DoHTTPRequest(action, url, data, opts...)
if err != nil {
return err
}
Expand Down
26 changes: 18 additions & 8 deletions pkg/m3admin/client_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 26 additions & 0 deletions pkg/m3admin/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,32 @@ func TestClient_DoHTTPRequest_Header(t *testing.T) {
assert.Equal(t, []byte("hello"), readAll(resp.Body))
}

func TestClient_DoHTTPRequest_PerRequestHeader(t *testing.T) {
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Header.Get("test-header") != "test-val" {
w.WriteHeader(500)
return
}
w.Write([]byte("hello"))
}))
defer s.Close()

readAll := func(r io.Reader) []byte {
data, err := ioutil.ReadAll(r)
assert.NoError(t, err)
return data
}

cl := newTestClient(WithHTTPClient(devNullRetry()))
_, err := cl.DoHTTPRequest("GET", s.URL, nil)
assert.Error(t, err)

resp, err := cl.DoHTTPRequest("GET", s.URL, nil, WithHeader("test-header", "test-val"))
require.NoError(t, err)
assert.Equal(t, resp.StatusCode, http.StatusOK)
assert.Equal(t, []byte("hello"), readAll(resp.Body))
}

func TestClient_DoHTTPRequest_Err(t *testing.T) {
for _, test := range []struct {
code int
Expand Down
25 changes: 25 additions & 0 deletions pkg/m3admin/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,28 @@ func WithEnvironment(e string) Option {
o.environment = e
})
}

// RequestOption defines a per-request option.
type RequestOption interface {
execute(*reqOptions)
}

type reqOptionFn func(o *reqOptions)

func (f reqOptionFn) execute(o *reqOptions) {
f(o)
}

type reqOptions struct {
headers map[string]string
}

// WithHeader adds a header to a request. It can be specified multiple times.
func WithHeader(name, value string) RequestOption {
return reqOptionFn(func(o *reqOptions) {
if o.headers == nil {
o.headers = make(map[string]string)
}
o.headers[name] = value
})
}
Loading