diff --git a/bill-of-materials.json b/bill-of-materials.json index e72b0bd89d6..19d63aca21f 100644 --- a/bill-of-materials.json +++ b/bill-of-materials.json @@ -98,15 +98,6 @@ } ] }, - { - "project": "github.com/google/uuid", - "licenses": [ - { - "type": "BSD 3-clause \"New\" or \"Revised\" License", - "confidence": 0.9663865546218487 - } - ] - }, { "project": "github.com/gorilla/websocket", "licenses": [ diff --git a/clientv3/balancer/balancer.go b/clientv3/balancer/balancer.go deleted file mode 100644 index d02a7eec7c3..00000000000 --- a/clientv3/balancer/balancer.go +++ /dev/null @@ -1,293 +0,0 @@ -// Copyright 2018 The etcd Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// Package balancer implements client balancer. -package balancer - -import ( - "strconv" - "sync" - "time" - - "go.etcd.io/etcd/clientv3/balancer/connectivity" - "go.etcd.io/etcd/clientv3/balancer/picker" - - "go.uber.org/zap" - "google.golang.org/grpc/balancer" - grpcconnectivity "google.golang.org/grpc/connectivity" - "google.golang.org/grpc/resolver" - _ "google.golang.org/grpc/resolver/dns" // register DNS resolver - _ "google.golang.org/grpc/resolver/passthrough" // register passthrough resolver -) - -// Config defines balancer configurations. -type Config struct { - // Policy configures balancer policy. - Policy picker.Policy - - // Picker implements gRPC picker. - // Leave empty if "Policy" field is not custom. - // TODO: currently custom policy is not supported. - // Picker picker.Picker - - // Name defines an additional name for balancer. - // Useful for balancer testing to avoid register conflicts. - // If empty, defaults to policy name. - Name string - - // Logger configures balancer logging. - // If nil, logs are discarded. - Logger *zap.Logger -} - -// RegisterBuilder creates and registers a builder. Since this function calls balancer.Register, it -// must be invoked at initialization time. -func RegisterBuilder(cfg Config) { - bb := &builder{cfg} - balancer.Register(bb) - - bb.cfg.Logger.Debug( - "registered balancer", - zap.String("policy", bb.cfg.Policy.String()), - zap.String("name", bb.cfg.Name), - ) -} - -type builder struct { - cfg Config -} - -// Build is called initially when creating "ccBalancerWrapper". -// "grpc.Dial" is called to this client connection. -// Then, resolved addresses will be handled via "HandleResolvedAddrs". -func (b *builder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer { - bb := &baseBalancer{ - id: strconv.FormatInt(time.Now().UnixNano(), 36), - policy: b.cfg.Policy, - name: b.cfg.Name, - lg: b.cfg.Logger, - - addrToSc: make(map[resolver.Address]balancer.SubConn), - scToAddr: make(map[balancer.SubConn]resolver.Address), - scToSt: make(map[balancer.SubConn]grpcconnectivity.State), - - currentConn: nil, - connectivityRecorder: connectivity.New(b.cfg.Logger), - - // initialize picker always returns "ErrNoSubConnAvailable" - picker: picker.NewErr(balancer.ErrNoSubConnAvailable), - } - - // TODO: support multiple connections - bb.mu.Lock() - bb.currentConn = cc - bb.mu.Unlock() - - bb.lg.Info( - "built balancer", - zap.String("balancer-id", bb.id), - zap.String("policy", bb.policy.String()), - zap.String("resolver-target", cc.Target()), - ) - return bb -} - -// Name implements "grpc/balancer.Builder" interface. -func (b *builder) Name() string { return b.cfg.Name } - -// Balancer defines client balancer interface. -type Balancer interface { - // Balancer is called on specified client connection. Client initiates gRPC - // connection with "grpc.Dial(addr, grpc.WithBalancerName)", and then those resolved - // addresses are passed to "grpc/balancer.Balancer.HandleResolvedAddrs". - // For each resolved address, balancer calls "balancer.ClientConn.NewSubConn". - // "grpc/balancer.Balancer.HandleSubConnStateChange" is called when connectivity state - // changes, thus requires failover logic in this method. - balancer.Balancer - - // Picker calls "Pick" for every client request. - picker.Picker -} - -type baseBalancer struct { - id string - policy picker.Policy - name string - lg *zap.Logger - - mu sync.RWMutex - - addrToSc map[resolver.Address]balancer.SubConn - scToAddr map[balancer.SubConn]resolver.Address - scToSt map[balancer.SubConn]grpcconnectivity.State - - currentConn balancer.ClientConn - connectivityRecorder connectivity.Recorder - - picker picker.Picker -} - -// HandleResolvedAddrs implements "grpc/balancer.Balancer" interface. -// gRPC sends initial or updated resolved addresses from "Build". -func (bb *baseBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) { - if err != nil { - bb.lg.Warn("HandleResolvedAddrs called with error", zap.String("balancer-id", bb.id), zap.Error(err)) - return - } - bb.lg.Info("resolved", - zap.String("picker", bb.picker.String()), - zap.String("balancer-id", bb.id), - zap.Strings("addresses", addrsToStrings(addrs)), - ) - - bb.mu.Lock() - defer bb.mu.Unlock() - - resolved := make(map[resolver.Address]struct{}) - for _, addr := range addrs { - resolved[addr] = struct{}{} - if _, ok := bb.addrToSc[addr]; !ok { - sc, err := bb.currentConn.NewSubConn([]resolver.Address{addr}, balancer.NewSubConnOptions{}) - if err != nil { - bb.lg.Warn("NewSubConn failed", zap.String("picker", bb.picker.String()), zap.String("balancer-id", bb.id), zap.Error(err), zap.String("address", addr.Addr)) - continue - } - bb.lg.Info("created subconn", zap.String("address", addr.Addr)) - bb.addrToSc[addr] = sc - bb.scToAddr[sc] = addr - bb.scToSt[sc] = grpcconnectivity.Idle - sc.Connect() - } - } - - for addr, sc := range bb.addrToSc { - if _, ok := resolved[addr]; !ok { - // was removed by resolver or failed to create subconn - bb.currentConn.RemoveSubConn(sc) - delete(bb.addrToSc, addr) - - bb.lg.Info( - "removed subconn", - zap.String("picker", bb.picker.String()), - zap.String("balancer-id", bb.id), - zap.String("address", addr.Addr), - zap.String("subconn", scToString(sc)), - ) - - // Keep the state of this sc in bb.scToSt until sc's state becomes Shutdown. - // The entry will be deleted in HandleSubConnStateChange. - // (DO NOT) delete(bb.scToAddr, sc) - // (DO NOT) delete(bb.scToSt, sc) - } - } -} - -// HandleSubConnStateChange implements "grpc/balancer.Balancer" interface. -func (bb *baseBalancer) HandleSubConnStateChange(sc balancer.SubConn, s grpcconnectivity.State) { - bb.mu.Lock() - defer bb.mu.Unlock() - - old, ok := bb.scToSt[sc] - if !ok { - bb.lg.Warn( - "state change for an unknown subconn", - zap.String("picker", bb.picker.String()), - zap.String("balancer-id", bb.id), - zap.String("subconn", scToString(sc)), - zap.Int("subconn-size", len(bb.scToAddr)), - zap.String("state", s.String()), - ) - return - } - - bb.lg.Info( - "state changed", - zap.String("picker", bb.picker.String()), - zap.String("balancer-id", bb.id), - zap.Bool("connected", s == grpcconnectivity.Ready), - zap.String("subconn", scToString(sc)), - zap.Int("subconn-size", len(bb.scToAddr)), - zap.String("address", bb.scToAddr[sc].Addr), - zap.String("old-state", old.String()), - zap.String("new-state", s.String()), - ) - - bb.scToSt[sc] = s - switch s { - case grpcconnectivity.Idle: - sc.Connect() - case grpcconnectivity.Shutdown: - // When an address was removed by resolver, b called RemoveSubConn but - // kept the sc's state in scToSt. Remove state for this sc here. - delete(bb.scToAddr, sc) - delete(bb.scToSt, sc) - } - - oldAggrState := bb.connectivityRecorder.GetCurrentState() - bb.connectivityRecorder.RecordTransition(old, s) - - // Update balancer picker when one of the following happens: - // - this sc became ready from not-ready - // - this sc became not-ready from ready - // - the aggregated state of balancer became TransientFailure from non-TransientFailure - // - the aggregated state of balancer became non-TransientFailure from TransientFailure - if (s == grpcconnectivity.Ready) != (old == grpcconnectivity.Ready) || - (bb.connectivityRecorder.GetCurrentState() == grpcconnectivity.TransientFailure) != (oldAggrState == grpcconnectivity.TransientFailure) { - bb.updatePicker() - } - - bb.currentConn.UpdateBalancerState(bb.connectivityRecorder.GetCurrentState(), bb.picker) -} - -func (bb *baseBalancer) updatePicker() { - if bb.connectivityRecorder.GetCurrentState() == grpcconnectivity.TransientFailure { - bb.picker = picker.NewErr(balancer.ErrTransientFailure) - bb.lg.Info( - "updated picker to transient error picker", - zap.String("picker", bb.picker.String()), - zap.String("balancer-id", bb.id), - zap.String("policy", bb.policy.String()), - ) - return - } - - // only pass ready subconns to picker - scToAddr := make(map[balancer.SubConn]resolver.Address) - for addr, sc := range bb.addrToSc { - if st, ok := bb.scToSt[sc]; ok && st == grpcconnectivity.Ready { - scToAddr[sc] = addr - } - } - - bb.picker = picker.New(picker.Config{ - Policy: bb.policy, - Logger: bb.lg, - SubConnToResolverAddress: scToAddr, - }) - bb.lg.Info( - "updated picker", - zap.String("picker", bb.picker.String()), - zap.String("balancer-id", bb.id), - zap.String("policy", bb.policy.String()), - zap.Strings("subconn-ready", scsToStrings(scToAddr)), - zap.Int("subconn-size", len(scToAddr)), - ) -} - -// Close implements "grpc/balancer.Balancer" interface. -// Close is a nop because base balancer doesn't have internal state to clean up, -// and it doesn't need to call RemoveSubConn for the SubConns. -func (bb *baseBalancer) Close() { - // TODO -} diff --git a/clientv3/balancer/balancer_test.go b/clientv3/balancer/balancer_test.go deleted file mode 100644 index 3eea2b77956..00000000000 --- a/clientv3/balancer/balancer_test.go +++ /dev/null @@ -1,323 +0,0 @@ -// Copyright 2018 The etcd Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package balancer - -import ( - "context" - "fmt" - "strings" - "testing" - - "go.etcd.io/etcd/clientv3/balancer/picker" - "go.etcd.io/etcd/clientv3/balancer/resolver/endpoint" - pb "go.etcd.io/etcd/etcdserver/etcdserverpb" - "go.etcd.io/etcd/pkg/mock/mockserver" - - "go.uber.org/zap" - "google.golang.org/grpc" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/peer" - "google.golang.org/grpc/status" -) - -// TestRoundRobinBalancedResolvableNoFailover ensures that -// requests to a resolvable endpoint can be balanced between -// multiple, if any, nodes. And there needs be no failover. -func TestRoundRobinBalancedResolvableNoFailover(t *testing.T) { - testCases := []struct { - name string - serverCount int - reqN int - network string - }{ - {name: "rrBalanced_1", serverCount: 1, reqN: 5, network: "tcp"}, - {name: "rrBalanced_1_unix_sockets", serverCount: 1, reqN: 5, network: "unix"}, - {name: "rrBalanced_3", serverCount: 3, reqN: 7, network: "tcp"}, - {name: "rrBalanced_5", serverCount: 5, reqN: 10, network: "tcp"}, - } - - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - ms, err := mockserver.StartMockServersOnNetwork(tc.serverCount, tc.network) - if err != nil { - t.Fatalf("failed to start mock servers: %v", err) - } - defer ms.Stop() - - var eps []string - for _, svr := range ms.Servers { - eps = append(eps, svr.ResolverAddress().Addr) - } - - rsv, err := endpoint.NewResolverGroup("nofailover") - if err != nil { - t.Fatal(err) - } - defer rsv.Close() - rsv.SetEndpoints(eps) - - name := genName() - cfg := Config{ - Policy: picker.RoundrobinBalanced, - Name: name, - Logger: zap.NewExample(), - } - RegisterBuilder(cfg) - conn, err := grpc.Dial(fmt.Sprintf("endpoint://nofailover/*"), grpc.WithInsecure(), grpc.WithBalancerName(name)) - if err != nil { - t.Fatalf("failed to dial mock server: %v", err) - } - defer conn.Close() - cli := pb.NewKVClient(conn) - - reqFunc := func(ctx context.Context) (picked string, err error) { - var p peer.Peer - _, err = cli.Range(ctx, &pb.RangeRequest{Key: []byte("/x")}, grpc.Peer(&p)) - if p.Addr != nil { - picked = p.Addr.String() - } - return picked, err - } - - _, picked, err := warmupConnections(reqFunc, tc.serverCount, "") - if err != nil { - t.Fatalf("Unexpected failure %v", err) - } - - // verify that we round robin - prev, switches := picked, 0 - for i := 0; i < tc.reqN; i++ { - picked, err = reqFunc(context.Background()) - if err != nil { - t.Fatalf("#%d: unexpected failure %v", i, err) - } - if prev != picked { - switches++ - } - prev = picked - } - if tc.serverCount > 1 && switches != tc.reqN { - t.Fatalf("expected balanced loads for %d requests, got switches %d", tc.reqN, switches) - } - }) - } -} - -// TestRoundRobinBalancedResolvableFailoverFromServerFail ensures that -// loads be rebalanced while one server goes down and comes back. -func TestRoundRobinBalancedResolvableFailoverFromServerFail(t *testing.T) { - serverCount := 5 - ms, err := mockserver.StartMockServers(serverCount) - if err != nil { - t.Fatalf("failed to start mock servers: %s", err) - } - defer ms.Stop() - var eps []string - for _, svr := range ms.Servers { - eps = append(eps, svr.ResolverAddress().Addr) - } - - rsv, err := endpoint.NewResolverGroup("serverfail") - if err != nil { - t.Fatal(err) - } - defer rsv.Close() - rsv.SetEndpoints(eps) - - name := genName() - cfg := Config{ - Policy: picker.RoundrobinBalanced, - Name: name, - Logger: zap.NewExample(), - } - RegisterBuilder(cfg) - conn, err := grpc.Dial(fmt.Sprintf("endpoint://serverfail/mock.server"), grpc.WithInsecure(), grpc.WithBalancerName(name)) - if err != nil { - t.Fatalf("failed to dial mock server: %s", err) - } - defer conn.Close() - cli := pb.NewKVClient(conn) - - reqFunc := func(ctx context.Context) (picked string, err error) { - var p peer.Peer - _, err = cli.Range(ctx, &pb.RangeRequest{Key: []byte("/x")}, grpc.Peer(&p)) - if p.Addr != nil { - picked = p.Addr.String() - } - return picked, err - } - - // stop first server, loads should be redistributed - ms.StopAt(0) - // stopped server will be transitioned into TRANSIENT_FAILURE state - // but it doesn't happen instantaneously and it can still be picked for a short period of time - // we ignore "transport is closing" in such case - available, picked, err := warmupConnections(reqFunc, serverCount-1, "transport is closing") - if err != nil { - t.Fatalf("Unexpected failure %v", err) - } - - reqN := 10 - prev, switches := picked, 0 - for i := 0; i < reqN; i++ { - picked, err = reqFunc(context.Background()) - if err != nil { - t.Fatalf("#%d: unexpected failure %v", i, err) - } - if _, ok := available[picked]; !ok { - t.Fatalf("picked unavailable address %q (available %v)", picked, available) - } - if prev != picked { - switches++ - } - prev = picked - } - if switches != reqN { - t.Fatalf("expected balanced loads for %d requests, got switches %d", reqN, switches) - } - - // now failed server comes back - ms.StartAt(0) - available, picked, err = warmupConnections(reqFunc, serverCount, "") - if err != nil { - t.Fatalf("Unexpected failure %v", err) - } - - prev, switches = picked, 0 - recoveredAddr, recovered := eps[0], 0 - available[recoveredAddr] = struct{}{} - - for i := 0; i < 2*reqN; i++ { - picked, err := reqFunc(context.Background()) - if err != nil { - t.Fatalf("#%d: unexpected failure %v", i, err) - } - if _, ok := available[picked]; !ok { - t.Fatalf("#%d: picked unavailable address %q (available %v)", i, picked, available) - } - if prev != picked { - switches++ - } - if picked == recoveredAddr { - recovered++ - } - prev = picked - } - if switches != 2*reqN { - t.Fatalf("expected balanced loads for %d requests, got switches %d", reqN, switches) - } - if recovered != 2*reqN/serverCount { - t.Fatalf("recovered server %q got only %d requests", recoveredAddr, recovered) - } -} - -// TestRoundRobinBalancedResolvableFailoverFromRequestFail ensures that -// loads be rebalanced while some requests are failed. -func TestRoundRobinBalancedResolvableFailoverFromRequestFail(t *testing.T) { - serverCount := 5 - ms, err := mockserver.StartMockServers(serverCount) - if err != nil { - t.Fatalf("failed to start mock servers: %s", err) - } - defer ms.Stop() - var eps []string - for _, svr := range ms.Servers { - eps = append(eps, svr.ResolverAddress().Addr) - } - - rsv, err := endpoint.NewResolverGroup("requestfail") - if err != nil { - t.Fatal(err) - } - defer rsv.Close() - rsv.SetEndpoints(eps) - - name := genName() - cfg := Config{ - Policy: picker.RoundrobinBalanced, - Name: name, - Logger: zap.NewExample(), - } - RegisterBuilder(cfg) - conn, err := grpc.Dial(fmt.Sprintf("endpoint://requestfail/mock.server"), grpc.WithInsecure(), grpc.WithBalancerName(name)) - if err != nil { - t.Fatalf("failed to dial mock server: %s", err) - } - defer conn.Close() - cli := pb.NewKVClient(conn) - - reqFunc := func(ctx context.Context) (picked string, err error) { - var p peer.Peer - _, err = cli.Range(ctx, &pb.RangeRequest{Key: []byte("/x")}, grpc.Peer(&p)) - if p.Addr != nil { - picked = p.Addr.String() - } - return picked, err - } - - available, picked, err := warmupConnections(reqFunc, serverCount, "") - if err != nil { - t.Fatalf("Unexpected failure %v", err) - } - - reqN := 20 - prev, switches := "", 0 - for i := 0; i < reqN; i++ { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - if i%2 == 0 { - cancel() - } - picked, err = reqFunc(ctx) - if i%2 == 0 { - if s, ok := status.FromError(err); ok && s.Code() != codes.Canceled { - t.Fatalf("#%d: expected %v, got %v", i, context.Canceled, err) - } - continue - } - if _, ok := available[picked]; !ok { - t.Fatalf("#%d: picked unavailable address %q (available %v)", i, picked, available) - } - if prev != picked { - switches++ - } - prev = picked - } - if switches != reqN/2 { - t.Fatalf("expected balanced loads for %d requests, got switches %d", reqN, switches) - } -} - -type reqFuncT = func(ctx context.Context) (picked string, err error) - -func warmupConnections(reqFunc reqFuncT, serverCount int, ignoreErr string) (map[string]struct{}, string, error) { - var picked string - var err error - available := make(map[string]struct{}) - // cycle through all peers to indirectly verify that balancer subconn list is fully loaded - // otherwise we can't reliably count switches between 'picked' peers in the test assert phase - for len(available) < serverCount { - picked, err = reqFunc(context.Background()) - if err != nil { - if ignoreErr != "" && strings.Contains(err.Error(), ignoreErr) { - // skip ignored errors - continue - } - return available, picked, err - } - available[picked] = struct{}{} - } - return available, picked, err -} diff --git a/clientv3/balancer/connectivity/connectivity.go b/clientv3/balancer/connectivity/connectivity.go deleted file mode 100644 index 5b03e0c112c..00000000000 --- a/clientv3/balancer/connectivity/connectivity.go +++ /dev/null @@ -1,93 +0,0 @@ -// Copyright 2019 The etcd Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// Package connectivity implements client connectivity operations. -package connectivity - -import ( - "sync" - - "go.uber.org/zap" - "google.golang.org/grpc/connectivity" -) - -// Recorder records gRPC connectivity. -type Recorder interface { - GetCurrentState() connectivity.State - RecordTransition(oldState, newState connectivity.State) -} - -// New returns a new Recorder. -func New(lg *zap.Logger) Recorder { - return &recorder{lg: lg} -} - -// recorder takes the connectivity states of multiple SubConns -// and returns one aggregated connectivity state. -// ref. https://github.com/grpc/grpc-go/blob/master/balancer/balancer.go -type recorder struct { - lg *zap.Logger - - mu sync.RWMutex - - cur connectivity.State - - numReady uint64 // Number of addrConns in ready state. - numConnecting uint64 // Number of addrConns in connecting state. - numTransientFailure uint64 // Number of addrConns in transientFailure. -} - -func (rc *recorder) GetCurrentState() (state connectivity.State) { - rc.mu.RLock() - defer rc.mu.RUnlock() - return rc.cur -} - -// RecordTransition records state change happening in subConn and based on that -// it evaluates what aggregated state should be. -// -// - If at least one SubConn in Ready, the aggregated state is Ready; -// - Else if at least one SubConn in Connecting, the aggregated state is Connecting; -// - Else the aggregated state is TransientFailure. -// -// Idle and Shutdown are not considered. -// -// ref. https://github.com/grpc/grpc-go/blob/master/balancer/balancer.go -func (rc *recorder) RecordTransition(oldState, newState connectivity.State) { - rc.mu.Lock() - defer rc.mu.Unlock() - - for idx, state := range []connectivity.State{oldState, newState} { - updateVal := 2*uint64(idx) - 1 // -1 for oldState and +1 for new. - switch state { - case connectivity.Ready: - rc.numReady += updateVal - case connectivity.Connecting: - rc.numConnecting += updateVal - case connectivity.TransientFailure: - rc.numTransientFailure += updateVal - default: - rc.lg.Warn("connectivity recorder received unknown state", zap.String("connectivity-state", state.String())) - } - } - - switch { // must be exclusive, no overlap - case rc.numReady > 0: - rc.cur = connectivity.Ready - case rc.numConnecting > 0: - rc.cur = connectivity.Connecting - default: - rc.cur = connectivity.TransientFailure - } -} diff --git a/clientv3/balancer/picker/doc.go b/clientv3/balancer/picker/doc.go deleted file mode 100644 index 35dabf5532f..00000000000 --- a/clientv3/balancer/picker/doc.go +++ /dev/null @@ -1,16 +0,0 @@ -// Copyright 2018 The etcd Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// Package picker defines/implements client balancer picker policy. -package picker diff --git a/clientv3/balancer/picker/err.go b/clientv3/balancer/picker/err.go deleted file mode 100644 index f4b941d6529..00000000000 --- a/clientv3/balancer/picker/err.go +++ /dev/null @@ -1,39 +0,0 @@ -// Copyright 2018 The etcd Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package picker - -import ( - "context" - - "google.golang.org/grpc/balancer" -) - -// NewErr returns a picker that always returns err on "Pick". -func NewErr(err error) Picker { - return &errPicker{p: Error, err: err} -} - -type errPicker struct { - p Policy - err error -} - -func (ep *errPicker) String() string { - return ep.p.String() -} - -func (ep *errPicker) Pick(context.Context, balancer.PickInfo) (balancer.SubConn, func(balancer.DoneInfo), error) { - return nil, nil, ep.err -} diff --git a/clientv3/balancer/picker/picker.go b/clientv3/balancer/picker/picker.go deleted file mode 100644 index bd1a5d25e8b..00000000000 --- a/clientv3/balancer/picker/picker.go +++ /dev/null @@ -1,91 +0,0 @@ -// Copyright 2018 The etcd Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package picker - -import ( - "fmt" - - "go.uber.org/zap" - "google.golang.org/grpc/balancer" - "google.golang.org/grpc/resolver" -) - -// Picker defines balancer Picker methods. -type Picker interface { - balancer.Picker - String() string -} - -// Config defines picker configuration. -type Config struct { - // Policy specifies etcd clientv3's built in balancer policy. - Policy Policy - - // Logger defines picker logging object. - Logger *zap.Logger - - // SubConnToResolverAddress maps each gRPC sub-connection to an address. - // Basically, it is a list of addresses that the Picker can pick from. - SubConnToResolverAddress map[balancer.SubConn]resolver.Address -} - -// Policy defines balancer picker policy. -type Policy uint8 - -const ( - // Error is error picker policy. - Error Policy = iota - - // RoundrobinBalanced balances loads over multiple endpoints - // and implements failover in roundrobin fashion. - RoundrobinBalanced - - // Custom defines custom balancer picker. - // TODO: custom picker is not supported yet. - Custom -) - -func (p Policy) String() string { - switch p { - case Error: - return "picker-error" - - case RoundrobinBalanced: - return "picker-roundrobin-balanced" - - case Custom: - panic("'custom' picker policy is not supported yet") - - default: - panic(fmt.Errorf("invalid balancer picker policy (%d)", p)) - } -} - -// New creates a new Picker. -func New(cfg Config) Picker { - switch cfg.Policy { - case Error: - panic("'error' picker policy is not supported here; use 'picker.NewErr'") - - case RoundrobinBalanced: - return newRoundrobinBalanced(cfg) - - case Custom: - panic("'custom' picker policy is not supported yet") - - default: - panic(fmt.Errorf("invalid balancer picker policy (%d)", cfg.Policy)) - } -} diff --git a/clientv3/balancer/picker/roundrobin_balanced.go b/clientv3/balancer/picker/roundrobin_balanced.go deleted file mode 100644 index e3971ecc421..00000000000 --- a/clientv3/balancer/picker/roundrobin_balanced.go +++ /dev/null @@ -1,95 +0,0 @@ -// Copyright 2018 The etcd Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package picker - -import ( - "context" - "sync" - - "go.uber.org/zap" - "go.uber.org/zap/zapcore" - "google.golang.org/grpc/balancer" - "google.golang.org/grpc/resolver" -) - -// newRoundrobinBalanced returns a new roundrobin balanced picker. -func newRoundrobinBalanced(cfg Config) Picker { - scs := make([]balancer.SubConn, 0, len(cfg.SubConnToResolverAddress)) - for sc := range cfg.SubConnToResolverAddress { - scs = append(scs, sc) - } - return &rrBalanced{ - p: RoundrobinBalanced, - lg: cfg.Logger, - scs: scs, - scToAddr: cfg.SubConnToResolverAddress, - } -} - -type rrBalanced struct { - p Policy - - lg *zap.Logger - - mu sync.RWMutex - next int - scs []balancer.SubConn - scToAddr map[balancer.SubConn]resolver.Address -} - -func (rb *rrBalanced) String() string { return rb.p.String() } - -// Pick is called for every client request. -func (rb *rrBalanced) Pick(ctx context.Context, opts balancer.PickInfo) (balancer.SubConn, func(balancer.DoneInfo), error) { - rb.mu.RLock() - n := len(rb.scs) - rb.mu.RUnlock() - if n == 0 { - return nil, nil, balancer.ErrNoSubConnAvailable - } - - rb.mu.Lock() - cur := rb.next - sc := rb.scs[cur] - picked := rb.scToAddr[sc].Addr - rb.next = (rb.next + 1) % len(rb.scs) - rb.mu.Unlock() - - rb.lg.Debug( - "picked", - zap.String("picker", rb.p.String()), - zap.String("address", picked), - zap.Int("subconn-index", cur), - zap.Int("subconn-size", n), - ) - - doneFunc := func(info balancer.DoneInfo) { - // TODO: error handling? - fss := []zapcore.Field{ - zap.Error(info.Err), - zap.String("picker", rb.p.String()), - zap.String("address", picked), - zap.Bool("success", info.Err == nil), - zap.Bool("bytes-sent", info.BytesSent), - zap.Bool("bytes-received", info.BytesReceived), - } - if info.Err == nil { - rb.lg.Debug("balancer done", fss...) - } else { - rb.lg.Warn("balancer failed", fss...) - } - } - return sc, doneFunc, nil -} diff --git a/clientv3/balancer/resolver/endpoint/endpoint.go b/clientv3/balancer/resolver/endpoint/endpoint.go deleted file mode 100644 index 2837bd4180b..00000000000 --- a/clientv3/balancer/resolver/endpoint/endpoint.go +++ /dev/null @@ -1,247 +0,0 @@ -// Copyright 2018 The etcd Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// Package endpoint resolves etcd entpoints using grpc targets of the form 'endpoint:///'. -package endpoint - -import ( - "context" - "fmt" - "net" - "net/url" - "strings" - "sync" - - "google.golang.org/grpc/resolver" -) - -const scheme = "endpoint" - -var ( - targetPrefix = fmt.Sprintf("%s://", scheme) - - bldr *builder -) - -func init() { - bldr = &builder{ - resolverGroups: make(map[string]*ResolverGroup), - } - resolver.Register(bldr) -} - -type builder struct { - mu sync.RWMutex - resolverGroups map[string]*ResolverGroup -} - -// NewResolverGroup creates a new ResolverGroup with the given id. -func NewResolverGroup(id string) (*ResolverGroup, error) { - return bldr.newResolverGroup(id) -} - -// ResolverGroup keeps all endpoints of resolvers using a common endpoint:/// target -// up-to-date. -type ResolverGroup struct { - mu sync.RWMutex - id string - endpoints []string - resolvers []*Resolver -} - -func (e *ResolverGroup) addResolver(r *Resolver) { - e.mu.Lock() - addrs := epsToAddrs(e.endpoints...) - e.resolvers = append(e.resolvers, r) - e.mu.Unlock() - r.cc.NewAddress(addrs) -} - -func (e *ResolverGroup) removeResolver(r *Resolver) { - e.mu.Lock() - for i, er := range e.resolvers { - if er == r { - e.resolvers = append(e.resolvers[:i], e.resolvers[i+1:]...) - break - } - } - e.mu.Unlock() -} - -// SetEndpoints updates the endpoints for ResolverGroup. All registered resolver are updated -// immediately with the new endpoints. -func (e *ResolverGroup) SetEndpoints(endpoints []string) { - addrs := epsToAddrs(endpoints...) - e.mu.Lock() - e.endpoints = endpoints - for _, r := range e.resolvers { - r.cc.NewAddress(addrs) - } - e.mu.Unlock() -} - -// Target constructs a endpoint target using the endpoint id of the ResolverGroup. -func (e *ResolverGroup) Target(endpoint string) string { - return Target(e.id, endpoint) -} - -// Target constructs a endpoint resolver target. -func Target(id, endpoint string) string { - return fmt.Sprintf("%s://%s/%s", scheme, id, endpoint) -} - -// IsTarget checks if a given target string in an endpoint resolver target. -func IsTarget(target string) bool { - return strings.HasPrefix(target, "endpoint://") -} - -func (e *ResolverGroup) Close() { - bldr.close(e.id) -} - -// Build creates or reuses an etcd resolver for the etcd cluster name identified by the authority part of the target. -func (b *builder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) { - if len(target.Authority) < 1 { - return nil, fmt.Errorf("'etcd' target scheme requires non-empty authority identifying etcd cluster being routed to") - } - id := target.Authority - es, err := b.getResolverGroup(id) - if err != nil { - return nil, fmt.Errorf("failed to build resolver: %v", err) - } - r := &Resolver{ - endpointID: id, - cc: cc, - } - es.addResolver(r) - return r, nil -} - -func (b *builder) newResolverGroup(id string) (*ResolverGroup, error) { - b.mu.RLock() - _, ok := b.resolverGroups[id] - b.mu.RUnlock() - if ok { - return nil, fmt.Errorf("Endpoint already exists for id: %s", id) - } - - es := &ResolverGroup{id: id} - b.mu.Lock() - b.resolverGroups[id] = es - b.mu.Unlock() - return es, nil -} - -func (b *builder) getResolverGroup(id string) (*ResolverGroup, error) { - b.mu.RLock() - es, ok := b.resolverGroups[id] - b.mu.RUnlock() - if !ok { - return nil, fmt.Errorf("ResolverGroup not found for id: %s", id) - } - return es, nil -} - -func (b *builder) close(id string) { - b.mu.Lock() - delete(b.resolverGroups, id) - b.mu.Unlock() -} - -func (b *builder) Scheme() string { - return scheme -} - -// Resolver provides a resolver for a single etcd cluster, identified by name. -type Resolver struct { - endpointID string - cc resolver.ClientConn - sync.RWMutex -} - -// TODO: use balancer.epsToAddrs -func epsToAddrs(eps ...string) (addrs []resolver.Address) { - addrs = make([]resolver.Address, 0, len(eps)) - for _, ep := range eps { - addrs = append(addrs, resolver.Address{Addr: ep}) - } - return addrs -} - -func (*Resolver) ResolveNow(o resolver.ResolveNowOptions) {} - -func (r *Resolver) Close() { - es, err := bldr.getResolverGroup(r.endpointID) - if err != nil { - return - } - es.removeResolver(r) -} - -// ParseEndpoint endpoint parses an endpoint of the form -// (http|https)://*|(unix|unixs)://) -// and returns a protocol ('tcp' or 'unix'), -// host (or filepath if a unix socket), -// scheme (http, https, unix, unixs). -func ParseEndpoint(endpoint string) (proto string, host string, scheme string) { - proto = "tcp" - host = endpoint - url, uerr := url.Parse(endpoint) - if uerr != nil || !strings.Contains(endpoint, "://") { - return proto, host, scheme - } - scheme = url.Scheme - - // strip scheme:// prefix since grpc dials by host - host = url.Host - switch url.Scheme { - case "http", "https": - case "unix", "unixs": - proto = "unix" - host = url.Host + url.Path - default: - proto, host = "", "" - } - return proto, host, scheme -} - -// ParseTarget parses a endpoint:/// string and returns the parsed id and endpoint. -// If the target is malformed, an error is returned. -func ParseTarget(target string) (string, string, error) { - noPrefix := strings.TrimPrefix(target, targetPrefix) - if noPrefix == target { - return "", "", fmt.Errorf("malformed target, %s prefix is required: %s", targetPrefix, target) - } - parts := strings.SplitN(noPrefix, "/", 2) - if len(parts) != 2 { - return "", "", fmt.Errorf("malformed target, expected %s:///, but got %s", scheme, target) - } - return parts[0], parts[1], nil -} - -// Dialer dials a endpoint using net.Dialer. -// Context cancelation and timeout are supported. -func Dialer(ctx context.Context, dialEp string) (net.Conn, error) { - proto, host, _ := ParseEndpoint(dialEp) - select { - case <-ctx.Done(): - return nil, ctx.Err() - default: - } - dialer := &net.Dialer{} - if deadline, ok := ctx.Deadline(); ok { - dialer.Deadline = deadline - } - return dialer.DialContext(ctx, proto, host) -} diff --git a/clientv3/balancer/utils.go b/clientv3/balancer/utils.go deleted file mode 100644 index 48eb8750740..00000000000 --- a/clientv3/balancer/utils.go +++ /dev/null @@ -1,68 +0,0 @@ -// Copyright 2018 The etcd Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package balancer - -import ( - "fmt" - "net/url" - "sort" - "sync/atomic" - "time" - - "google.golang.org/grpc/balancer" - "google.golang.org/grpc/resolver" -) - -func scToString(sc balancer.SubConn) string { - return fmt.Sprintf("%p", sc) -} - -func scsToStrings(scs map[balancer.SubConn]resolver.Address) (ss []string) { - ss = make([]string, 0, len(scs)) - for sc, a := range scs { - ss = append(ss, fmt.Sprintf("%s (%s)", a.Addr, scToString(sc))) - } - sort.Strings(ss) - return ss -} - -func addrsToStrings(addrs []resolver.Address) (ss []string) { - ss = make([]string, len(addrs)) - for i := range addrs { - ss[i] = addrs[i].Addr - } - sort.Strings(ss) - return ss -} - -func epsToAddrs(eps ...string) (addrs []resolver.Address) { - addrs = make([]resolver.Address, 0, len(eps)) - for _, ep := range eps { - u, err := url.Parse(ep) - if err != nil { - addrs = append(addrs, resolver.Address{Addr: ep, Type: resolver.Backend}) - continue - } - addrs = append(addrs, resolver.Address{Addr: u.Host, Type: resolver.Backend}) - } - return addrs -} - -var genN = new(uint32) - -func genName() string { - now := time.Now().UnixNano() - return fmt.Sprintf("%X%X", now, atomic.AddUint32(genN, 1)) -} diff --git a/clientv3/balancer/utils_test.go b/clientv3/balancer/utils_test.go deleted file mode 100644 index e58cd349576..00000000000 --- a/clientv3/balancer/utils_test.go +++ /dev/null @@ -1,34 +0,0 @@ -// Copyright 2018 The etcd Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package balancer - -import ( - "reflect" - "testing" - - "google.golang.org/grpc/resolver" -) - -func Test_epsToAddrs(t *testing.T) { - eps := []string{"https://example.com:2379", "127.0.0.1:2379"} - exp := []resolver.Address{ - {Addr: "example.com:2379", Type: resolver.Backend}, - {Addr: "127.0.0.1:2379", Type: resolver.Backend}, - } - rs := epsToAddrs(eps...) - if !reflect.DeepEqual(rs, exp) { - t.Fatalf("expected %v, got %v", exp, rs) - } -} diff --git a/clientv3/client.go b/clientv3/client.go index 8dbfc8808a2..ac7afe84c6c 100644 --- a/clientv3/client.go +++ b/clientv3/client.go @@ -18,14 +18,11 @@ import ( "context" "errors" "fmt" - "net" - "os" "strconv" "strings" "sync" "time" - "github.com/google/uuid" "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -33,10 +30,9 @@ import ( "google.golang.org/grpc/keepalive" "google.golang.org/grpc/status" - "go.etcd.io/etcd/clientv3/balancer" - "go.etcd.io/etcd/clientv3/balancer/picker" - "go.etcd.io/etcd/clientv3/balancer/resolver/endpoint" "go.etcd.io/etcd/clientv3/credentials" + "go.etcd.io/etcd/clientv3/internal/endpoint" + "go.etcd.io/etcd/clientv3/internal/resolver" "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes" "go.etcd.io/etcd/pkg/logutil" ) @@ -44,31 +40,8 @@ import ( var ( ErrNoAvailableEndpoints = errors.New("etcdclient: no available endpoints") ErrOldCluster = errors.New("etcdclient: old cluster version") - - roundRobinBalancerName = fmt.Sprintf("etcd-%s", picker.RoundrobinBalanced.String()) ) -func init() { - lg := zap.NewNop() - if os.Getenv("ETCD_CLIENT_DEBUG") != "" { - lcfg := logutil.DefaultZapLoggerConfig - lcfg.Level = zap.NewAtomicLevelAt(zap.DebugLevel) - - var err error - lg, err = lcfg.Build() // info level logging - if err != nil { - panic(err) - } - } - - // TODO: support custom balancer - balancer.RegisterBuilder(balancer.Config{ - Policy: picker.RoundrobinBalanced, - Name: roundRobinBalancerName, - Logger: lg, - }) -} - // Client provides and manages an etcd v3 client session. type Client struct { Cluster @@ -80,10 +53,12 @@ type Client struct { conn *grpc.ClientConn - cfg Config - creds grpccredentials.TransportCredentials - resolverGroup *endpoint.ResolverGroup - mu *sync.RWMutex + cfg Config + creds grpccredentials.TransportCredentials + resolver *resolver.EtcdManualResolver + + epMu *sync.RWMutex + endpoints []string ctx context.Context cancel context.CancelFunc @@ -153,9 +128,6 @@ func (c *Client) Close() error { if c.Lease != nil { c.Lease.Close() } - if c.resolverGroup != nil { - c.resolverGroup.Close() - } if c.conn != nil { return toErr(c.ctx, c.conn.Close()) } @@ -170,19 +142,20 @@ func (c *Client) Ctx() context.Context { return c.ctx } // Endpoints lists the registered endpoints for the client. func (c *Client) Endpoints() []string { // copy the slice; protect original endpoints from being changed - c.mu.RLock() - defer c.mu.RUnlock() - eps := make([]string, len(c.cfg.Endpoints)) - copy(eps, c.cfg.Endpoints) + c.epMu.RLock() + defer c.epMu.RUnlock() + eps := make([]string, len(c.endpoints)) + copy(eps, c.endpoints) return eps } // SetEndpoints updates client's endpoints. func (c *Client) SetEndpoints(eps ...string) { - c.mu.Lock() - defer c.mu.Unlock() - c.cfg.Endpoints = eps - c.resolverGroup.SetEndpoints(eps) + c.epMu.Lock() + defer c.epMu.Unlock() + c.endpoints = eps + + c.resolver.SetEndpoints(eps) } // Sync synchronizes client's endpoints with the known endpoints from the etcd membership. @@ -215,29 +188,12 @@ func (c *Client) autoSync() { err := c.Sync(ctx) cancel() if err != nil && err != c.ctx.Err() { - lg.Lvl(4).Infof("Auto sync endpoints failed: %v", err) + c.lg.Info("Auto sync endpoints failed.", zap.Error(err)) } } } } -func (c *Client) processCreds(scheme string) (creds grpccredentials.TransportCredentials) { - creds = c.creds - switch scheme { - case "unix": - case "http": - creds = nil - case "https", "unixs": - if creds != nil { - break - } - creds = credentials.NewBundle(credentials.Config{}).TransportCredentials() - default: - creds = nil - } - return creds -} - // dialSetupOpts gives the dial opts prior to any authentication. func (c *Client) dialSetupOpts(creds grpccredentials.TransportCredentials, dopts ...grpc.DialOption) (opts []grpc.DialOption, err error) { if c.cfg.DialKeepAliveTime > 0 { @@ -250,22 +206,15 @@ func (c *Client) dialSetupOpts(creds grpccredentials.TransportCredentials, dopts } opts = append(opts, dopts...) - dialer := endpoint.Dialer if creds != nil { opts = append(opts, grpc.WithTransportCredentials(creds)) - // gRPC load balancer workaround. See credentials.transportCredential for details. - if credsDialer, ok := creds.(TransportCredentialsWithDialer); ok { - dialer = credsDialer.Dialer - } } else { opts = append(opts, grpc.WithInsecure()) } - opts = append(opts, grpc.WithContextDialer(dialer)) + grpc.WithDisableRetry() - // Interceptor retry and backoff. - // TODO: Replace all of clientv3/retry.go with interceptor based retry, or with - // https://github.com/grpc/proposal/blob/master/A6-client-retries.md#retry-policy - // once it is available. + // TODO: Replace all of clientv3/retry.go with RetryPolicy: + // https://github.com/grpc/grpc-proto/blob/cdd9ed5c3d3f87aef62f373b93361cf7bddc620d/grpc/service_config/service_config.proto#L130 rrBackoff := withBackoff(c.roundRobinQuorumBackoff(defaultBackoffWaitBetween, defaultBackoffJitterFraction)) opts = append(opts, // Disable stream retry by default since go-grpc-middleware/retry does not support client streams. @@ -279,15 +228,11 @@ func (c *Client) dialSetupOpts(creds grpccredentials.TransportCredentials, dopts // Dial connects to a single endpoint using the client's config. func (c *Client) Dial(ep string) (*grpc.ClientConn, error) { - creds, err := c.directDialCreds(ep) - if err != nil { - return nil, err - } - // Use the grpc passthrough resolver to directly dial a single endpoint. - // This resolver passes through the 'unix' and 'unixs' endpoints schemes used - // by etcd without modification, allowing us to directly dial endpoints and - // using the same dial functions that we use for load balancer dialing. - return c.dial(fmt.Sprintf("passthrough:///%s", ep), creds) + creds := c.credentialsForEndpoint(ep) + + // Using ad-hoc created resolver, to guarantee only explicitly given + // endpoint is used. + return c.dial(creds, grpc.WithResolvers(resolver.New(ep))) } func (c *Client) getToken(ctx context.Context) error { @@ -310,20 +255,18 @@ func (c *Client) getToken(ctx context.Context) error { // dialWithBalancer dials the client's current load balanced resolver group. The scheme of the host // of the provided endpoint determines the scheme used for all endpoints of the client connection. -func (c *Client) dialWithBalancer(ep string, dopts ...grpc.DialOption) (*grpc.ClientConn, error) { - _, host, _ := endpoint.ParseEndpoint(ep) - target := c.resolverGroup.Target(host) - creds := c.dialWithBalancerCreds(ep) - return c.dial(target, creds, dopts...) +func (c *Client) dialWithBalancer(dopts ...grpc.DialOption) (*grpc.ClientConn, error) { + creds := c.credentialsForEndpoint(c.Endpoints()[0]) + opts := append(dopts, grpc.WithResolvers(c.resolver)) + return c.dial(creds, opts...) } // dial configures and dials any grpc balancer target. -func (c *Client) dial(target string, creds grpccredentials.TransportCredentials, dopts ...grpc.DialOption) (*grpc.ClientConn, error) { +func (c *Client) dial(creds grpccredentials.TransportCredentials, dopts ...grpc.DialOption) (*grpc.ClientConn, error) { opts, err := c.dialSetupOpts(creds, dopts...) if err != nil { return nil, fmt.Errorf("failed to configure dialer: %v", err) } - if c.Username != "" && c.Password != "" { c.authTokenBundle = credentials.NewBundle(credentials.Config{}) opts = append(opts, grpc.WithPerRPCCredentials(c.authTokenBundle.PerRPCCredentials())) @@ -338,6 +281,7 @@ func (c *Client) dial(target string, creds grpccredentials.TransportCredentials, defer cancel() // TODO: Is this right for cases where grpc.WithBlock() is not set on the dial options? } + target := fmt.Sprintf("%s://%p/%s", resolver.Schema, c, authority(c.Endpoints()[0])) conn, err := grpc.DialContext(dctx, target, opts...) if err != nil { return nil, err @@ -345,36 +289,35 @@ func (c *Client) dial(target string, creds grpccredentials.TransportCredentials, return conn, nil } -func (c *Client) directDialCreds(ep string) (grpccredentials.TransportCredentials, error) { - _, host, scheme := endpoint.ParseEndpoint(ep) - creds := c.creds - if len(scheme) != 0 { - creds = c.processCreds(scheme) - if creds != nil { - clone := creds.Clone() - // Set the server name must to the endpoint hostname without port since grpc - // otherwise attempts to check if x509 cert is valid for the full endpoint - // including the scheme and port, which fails. - overrideServerName, _, err := net.SplitHostPort(host) - if err != nil { - // Either the host didn't have a port or the host could not be parsed. Either way, continue with the - // original host string. - overrideServerName = host - } - clone.OverrideServerName(overrideServerName) - creds = clone +func authority(endpoint string) string { + spl := strings.SplitN(endpoint, "://", 2) + if len(spl) < 2 { + if strings.HasPrefix(endpoint, "unix:") { + return endpoint[len("unix:"):] } + if strings.HasPrefix(endpoint, "unixs:") { + return endpoint[len("unixs:"):] + } + return endpoint } - return creds, nil + return spl[1] } -func (c *Client) dialWithBalancerCreds(ep string) grpccredentials.TransportCredentials { - _, _, scheme := endpoint.ParseEndpoint(ep) - creds := c.creds - if len(scheme) != 0 { - creds = c.processCreds(scheme) +func (c *Client) credentialsForEndpoint(ep string) grpccredentials.TransportCredentials { + r := endpoint.RequiresCredentials(ep) + switch r { + case endpoint.CREDS_DROP: + return nil + case endpoint.CREDS_OPTIONAL: + return c.creds + case endpoint.CREDS_REQUIRE: + if c.creds != nil { + return c.creds + } + return credentials.NewBundle(credentials.Config{}).TransportCredentials() + default: + panic(fmt.Errorf("Unsupported CredsRequirement: %v", r)) } - return creds } func newClient(cfg *Config) (*Client, error) { @@ -399,7 +342,7 @@ func newClient(cfg *Config) (*Client, error) { creds: creds, ctx: ctx, cancel: cancel, - mu: new(sync.RWMutex), + epMu: new(sync.RWMutex), callOpts: defaultCallOpts, lgMu: new(sync.RWMutex), } @@ -436,30 +379,22 @@ func newClient(cfg *Config) (*Client, error) { client.callOpts = callOpts } - // Prepare a 'endpoint:///' resolver for the client and create a endpoint target to pass - // to dial so the client knows to use this resolver. - client.resolverGroup, err = endpoint.NewResolverGroup(fmt.Sprintf("client-%s", uuid.New().String())) - if err != nil { - client.cancel() - return nil, err - } - client.resolverGroup.SetEndpoints(cfg.Endpoints) + client.resolver = resolver.New(cfg.Endpoints...) if len(cfg.Endpoints) < 1 { return nil, fmt.Errorf("at least one Endpoint must is required in client config") } - dialEndpoint := cfg.Endpoints[0] + client.SetEndpoints(cfg.Endpoints...) // Use a provided endpoint target so that for https:// without any tls config given, then // grpc will assume the certificate server name is the endpoint host. - conn, err := client.dialWithBalancer(dialEndpoint, grpc.WithBalancerName(roundRobinBalancerName)) + conn, err := client.dialWithBalancer() if err != nil { client.cancel() - client.resolverGroup.Close() + client.resolver.Close() + // TODO: Error like `fmt.Errorf(dialing [%s] failed: %v, strings.Join(cfg.Endpoints, ";"), err)` would help with debugging a lot. return nil, err } - // TODO: With the old grpc balancer interface, we waited until the dial timeout - // for the balancer to be ready. Is there an equivalent wait we should do with the new grpc balancer interface? client.conn = conn client.Cluster = NewCluster(client) @@ -478,6 +413,7 @@ func newClient(cfg *Config) (*Client, error) { if err != nil { client.Close() cancel() + //TODO: Consider fmt.Errorf("communicating with [%s] failed: %v", strings.Join(cfg.Endpoints, ";"), err) return nil, err } cancel() @@ -651,9 +587,3 @@ func IsConnCanceled(err error) bool { // <= gRPC v1.7.x returns 'errors.New("grpc: the client connection is closing")' return strings.Contains(err.Error(), "grpc: the client connection is closing") } - -// TransportCredentialsWithDialer is for a gRPC load balancer workaround. See credentials.transportCredential for details. -type TransportCredentialsWithDialer interface { - grpccredentials.TransportCredentials - Dialer(ctx context.Context, dialEp string) (net.Conn, error) -} diff --git a/clientv3/client_test.go b/clientv3/client_test.go index a6d9fc43914..029a10dd1f1 100644 --- a/clientv3/client_test.go +++ b/clientv3/client_test.go @@ -83,6 +83,8 @@ func TestDialCancel(t *testing.T) { func TestDialTimeout(t *testing.T) { defer testutil.AfterTest(t) + wantError := context.DeadlineExceeded + // grpc.WithBlock to block until connection up or timeout testCfgs := []Config{ { @@ -122,8 +124,8 @@ func TestDialTimeout(t *testing.T) { case <-time.After(5 * time.Second): t.Errorf("#%d: failed to timeout dial on time", i) case err := <-donec: - if err != context.DeadlineExceeded { - t.Errorf("#%d: unexpected error %v, want %v", i, err, context.DeadlineExceeded) + if err.Error() != wantError.Error() { + t.Errorf("#%d: unexpected error '%v', want '%v'", i, err, wantError) } } } diff --git a/clientv3/credentials/credentials.go b/clientv3/credentials/credentials.go index c1a2ec5fdba..1dd5817f699 100644 --- a/clientv3/credentials/credentials.go +++ b/clientv3/credentials/credentials.go @@ -22,9 +22,9 @@ import ( "net" "sync" - "go.etcd.io/etcd/clientv3/balancer/resolver/endpoint" - "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes" grpccredentials "google.golang.org/grpc/credentials" + + "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes" ) // Config defines gRPC credential configuration. @@ -66,46 +66,20 @@ func (b *bundle) NewWithMode(mode string) (grpccredentials.Bundle, error) { } // transportCredential implements "grpccredentials.TransportCredentials" interface. -// transportCredential wraps TransportCredentials to track which -// addresses are dialed for which endpoints, and then sets the authority when checking the endpoint's cert to the -// hostname or IP of the dialed endpoint. -// This is a workaround of a gRPC load balancer issue. gRPC uses the dialed target's service name as the authority when -// checking all endpoint certs, which does not work for etcd servers using their hostname or IP as the Subject Alternative Name -// in their TLS certs. -// To enable, include both WithTransportCredentials(creds) and WithContextDialer(creds.Dialer) -// when dialing. type transportCredential struct { gtc grpccredentials.TransportCredentials - mu sync.Mutex - // addrToEndpoint maps from the connection addresses that are dialed to the hostname or IP of the - // endpoint provided to the dialer when dialing - addrToEndpoint map[string]string } func newTransportCredential(cfg *tls.Config) *transportCredential { return &transportCredential{ - gtc: grpccredentials.NewTLS(cfg), - addrToEndpoint: map[string]string{}, + gtc: grpccredentials.NewTLS(cfg), } } func (tc *transportCredential) ClientHandshake(ctx context.Context, authority string, rawConn net.Conn) (net.Conn, grpccredentials.AuthInfo, error) { - // Set the authority when checking the endpoint's cert to the hostname or IP of the dialed endpoint - tc.mu.Lock() - dialEp, ok := tc.addrToEndpoint[rawConn.RemoteAddr().String()] - tc.mu.Unlock() - if ok { - _, host, _ := endpoint.ParseEndpoint(dialEp) - authority = host - } return tc.gtc.ClientHandshake(ctx, authority, rawConn) } -// return true if given string is an IP. -func isIP(ep string) bool { - return net.ParseIP(ep) != nil -} - func (tc *transportCredential) ServerHandshake(rawConn net.Conn) (net.Conn, grpccredentials.AuthInfo, error) { return tc.gtc.ServerHandshake(rawConn) } @@ -115,15 +89,8 @@ func (tc *transportCredential) Info() grpccredentials.ProtocolInfo { } func (tc *transportCredential) Clone() grpccredentials.TransportCredentials { - copy := map[string]string{} - tc.mu.Lock() - for k, v := range tc.addrToEndpoint { - copy[k] = v - } - tc.mu.Unlock() return &transportCredential{ - gtc: tc.gtc.Clone(), - addrToEndpoint: copy, + gtc: tc.gtc.Clone(), } } @@ -131,17 +98,6 @@ func (tc *transportCredential) OverrideServerName(serverNameOverride string) err return tc.gtc.OverrideServerName(serverNameOverride) } -func (tc *transportCredential) Dialer(ctx context.Context, dialEp string) (net.Conn, error) { - // Keep track of which addresses are dialed for which endpoints - conn, err := endpoint.Dialer(ctx, dialEp) - if conn != nil { - tc.mu.Lock() - tc.addrToEndpoint[conn.RemoteAddr().String()] = dialEp - tc.mu.Unlock() - } - return conn, err -} - // perRPCCredential implements "grpccredentials.PerRPCCredentials" interface. type perRPCCredential struct { authToken string diff --git a/clientv3/internal/endpoint/endpoint.go b/clientv3/internal/endpoint/endpoint.go new file mode 100644 index 00000000000..9cd17ab6b64 --- /dev/null +++ b/clientv3/internal/endpoint/endpoint.go @@ -0,0 +1,138 @@ +// Copyright 2021 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package endpoint + +import ( + "fmt" + "net" + "net/url" + "path" + "strings" +) + +type CredsRequirement int + +const ( + // CREDS_REQUIRE - Credentials/certificate required for thi type of connection. + CREDS_REQUIRE CredsRequirement = iota + // CREDS_DROP - Credentials/certificate not needed and should get ignored. + CREDS_DROP + // CREDS_OPTIONAL - Credentials/certificate might be used if supplied + CREDS_OPTIONAL +) + +func extractHostFromHostPort(ep string) string { + host, _, err := net.SplitHostPort(ep) + if err != nil { + return ep + } + return host +} + +func extractHostFromPath(pathStr string) string { + return extractHostFromHostPort(path.Base(pathStr)) +} + +// mustSplit2 returns the values from strings.SplitN(s, sep, 2). +// If sep is not found, it returns ("", "", false) instead. +func mustSplit2(s, sep string) (string, string) { + spl := strings.SplitN(s, sep, 2) + if len(spl) < 2 { + panic(fmt.Errorf("Token '%v' expected to have separator sep: `%v`", s, sep)) + } + return spl[0], spl[1] +} + +func schemeToCredsRequirement(schema string) CredsRequirement { + switch schema { + case "https", "unixs": + return CREDS_REQUIRE + case "http": + return CREDS_DROP + case "unix": + // Preserving previous behavior from: + // https://github.com/etcd-io/etcd/blob/dae29bb719dd69dc119146fc297a0628fcc1ccf8/client/v3/client.go#L212 + // that likely was a bug due to missing 'fallthrough'. + // At the same time it seems legit to let the users decide whether they + // want credential control or not (and 'unixs' schema is not a standard thing). + return CREDS_OPTIONAL + case "": + return CREDS_OPTIONAL + default: + return CREDS_OPTIONAL + } +} + +// This function translates endpoints names supported by etcd server into +// endpoints as supported by grpc with additional information +// (server_name for cert validation, requireCreds - whether certs are needed). +// The main differences: +// - etcd supports unixs & https names as opposed to unix & http to +// distinguish need to configure certificates. +// - etcd support http(s) names as opposed to tcp supported by grpc/dial method. +// - etcd supports unix(s)://local-file naming schema +// (as opposed to unix:local-file canonical name used by grpc for current dir files). +// - Within the unix(s) schemas, the last segment (filename) without 'port' (content after colon) +// is considered serverName - to allow local testing of cert-protected communication. +// +// See more: +// - https://github.com/grpc/grpc-go/blob/26c143bd5f59344a4b8a1e491e0f5e18aa97abc7/internal/grpcutil/target.go#L47 +// - https://golang.org/pkg/net/#Dial +// - https://github.com/grpc/grpc/blob/master/doc/naming.md +func translateEndpoint(ep string) (addr string, serverName string, requireCreds CredsRequirement) { + if strings.HasPrefix(ep, "unix:") || strings.HasPrefix(ep, "unixs:") { + if strings.HasPrefix(ep, "unix:///") || strings.HasPrefix(ep, "unixs:///") { + // absolute path case + schema, absolutePath := mustSplit2(ep, "://") + return "unix://" + absolutePath, extractHostFromPath(absolutePath), schemeToCredsRequirement(schema) + } + if strings.HasPrefix(ep, "unix://") || strings.HasPrefix(ep, "unixs://") { + // legacy etcd local path + schema, localPath := mustSplit2(ep, "://") + return "unix:" + localPath, extractHostFromPath(localPath), schemeToCredsRequirement(schema) + } + schema, localPath := mustSplit2(ep, ":") + return "unix:" + localPath, extractHostFromPath(localPath), schemeToCredsRequirement(schema) + } + + if strings.Contains(ep, "://") { + url, err := url.Parse(ep) + if err != nil { + return ep, extractHostFromHostPort(ep), CREDS_OPTIONAL + } + if url.Scheme == "http" || url.Scheme == "https" { + return url.Host, url.Hostname(), schemeToCredsRequirement(url.Scheme) + } + return ep, url.Hostname(), schemeToCredsRequirement(url.Scheme) + } + // Handles plain addresses like 10.0.0.44:437. + return ep, extractHostFromHostPort(ep), CREDS_OPTIONAL +} + +// RequiresCredentials returns whether given endpoint requires +// credentials/certificates for connection. +func RequiresCredentials(ep string) CredsRequirement { + _, _, requireCreds := translateEndpoint(ep) + return requireCreds +} + +// Interpret endpoint parses an endpoint of the form +// (http|https)://*|(unix|unixs)://) +// and returns low-level address (supported by 'net') to connect to, +// and a server name used for x509 certificate matching. +func Interpret(ep string) (address string, serverName string) { + addr, serverName, _ := translateEndpoint(ep) + return addr, serverName +} diff --git a/clientv3/internal/endpoint/endpoint_test.go b/clientv3/internal/endpoint/endpoint_test.go new file mode 100644 index 00000000000..bc6cd71399c --- /dev/null +++ b/clientv3/internal/endpoint/endpoint_test.go @@ -0,0 +1,99 @@ +// Copyright 2021 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package endpoint + +import ( + "testing" +) + +func Test_interpret(t *testing.T) { + tests := []struct { + endpoint string + wantAddress string + wantServerName string + wantRequiresCreds CredsRequirement + }{ + {"127.0.0.1", "127.0.0.1", "127.0.0.1", CREDS_OPTIONAL}, + {"localhost", "localhost", "localhost", CREDS_OPTIONAL}, + {"localhost:8080", "localhost:8080", "localhost", CREDS_OPTIONAL}, + + {"unix:127.0.0.1", "unix:127.0.0.1", "127.0.0.1", CREDS_OPTIONAL}, + {"unix:127.0.0.1:8080", "unix:127.0.0.1:8080", "127.0.0.1", CREDS_OPTIONAL}, + + {"unix://127.0.0.1", "unix:127.0.0.1", "127.0.0.1", CREDS_OPTIONAL}, + {"unix://127.0.0.1:8080", "unix:127.0.0.1:8080", "127.0.0.1", CREDS_OPTIONAL}, + + {"unixs:127.0.0.1", "unix:127.0.0.1", "127.0.0.1", CREDS_REQUIRE}, + {"unixs:127.0.0.1:8080", "unix:127.0.0.1:8080", "127.0.0.1", CREDS_REQUIRE}, + {"unixs://127.0.0.1", "unix:127.0.0.1", "127.0.0.1", CREDS_REQUIRE}, + {"unixs://127.0.0.1:8080", "unix:127.0.0.1:8080", "127.0.0.1", CREDS_REQUIRE}, + + {"http://127.0.0.1", "127.0.0.1", "127.0.0.1", CREDS_DROP}, + {"http://127.0.0.1:8080", "127.0.0.1:8080", "127.0.0.1", CREDS_DROP}, + {"https://127.0.0.1", "127.0.0.1", "127.0.0.1", CREDS_REQUIRE}, + {"https://127.0.0.1:8080", "127.0.0.1:8080", "127.0.0.1", CREDS_REQUIRE}, + {"https://localhost:20000", "localhost:20000", "localhost", CREDS_REQUIRE}, + + {"unix:///tmp/abc", "unix:///tmp/abc", "abc", CREDS_OPTIONAL}, + {"unixs:///tmp/abc", "unix:///tmp/abc", "abc", CREDS_REQUIRE}, + {"unix:///tmp/abc:1234", "unix:///tmp/abc:1234", "abc", CREDS_OPTIONAL}, + {"unixs:///tmp/abc:1234", "unix:///tmp/abc:1234", "abc", CREDS_REQUIRE}, + {"etcd.io", "etcd.io", "etcd.io", CREDS_OPTIONAL}, + {"http://etcd.io/abc", "etcd.io", "etcd.io", CREDS_DROP}, + {"dns://something-other", "dns://something-other", "something-other", CREDS_OPTIONAL}, + + {"http://[2001:db8:1f70::999:de8:7648:6e8]:100/", "[2001:db8:1f70::999:de8:7648:6e8]:100", "2001:db8:1f70::999:de8:7648:6e8", CREDS_DROP}, + {"[2001:db8:1f70::999:de8:7648:6e8]:100", "[2001:db8:1f70::999:de8:7648:6e8]:100", "2001:db8:1f70::999:de8:7648:6e8", CREDS_OPTIONAL}, + {"unix:unexpected-file_name#123$456", "unix:unexpected-file_name#123$456", "unexpected-file_name#123$456", CREDS_OPTIONAL}, + } + for _, tt := range tests { + t.Run("Interpret_"+tt.endpoint, func(t *testing.T) { + gotAddress, gotServerName := Interpret(tt.endpoint) + if gotAddress != tt.wantAddress { + t.Errorf("Interpret() gotAddress = %v, want %v", gotAddress, tt.wantAddress) + } + if gotServerName != tt.wantServerName { + t.Errorf("Interpret() gotServerName = %v, want %v", gotServerName, tt.wantServerName) + } + }) + t.Run("RequiresCredentials_"+tt.endpoint, func(t *testing.T) { + requiresCreds := RequiresCredentials(tt.endpoint) + if requiresCreds != tt.wantRequiresCreds { + t.Errorf("RequiresCredentials() got = %v, want %v", requiresCreds, tt.wantRequiresCreds) + } + }) + } +} + +func Test_extractHostFromHostPort(t *testing.T) { + tests := []struct { + ep string + want string + }{ + {ep: "localhost", want: "localhost"}, + {ep: "localhost:8080", want: "localhost"}, + {ep: "192.158.7.14:8080", want: "192.158.7.14"}, + {ep: "192.158.7.14:8080", want: "192.158.7.14"}, + {ep: "[2001:db8:1f70::999:de8:7648:6e8]", want: "[2001:db8:1f70::999:de8:7648:6e8]"}, + {ep: "[2001:db8:1f70::999:de8:7648:6e8]:100", want: "2001:db8:1f70::999:de8:7648:6e8"}, + } + for _, tt := range tests { + t.Run(tt.ep, func(t *testing.T) { + if got := extractHostFromHostPort(tt.ep); got != tt.want { + t.Errorf("extractHostFromHostPort() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/clientv3/internal/resolver/resolver.go b/clientv3/internal/resolver/resolver.go new file mode 100644 index 00000000000..60b7d85f192 --- /dev/null +++ b/clientv3/internal/resolver/resolver.go @@ -0,0 +1,75 @@ +// Copyright 2021 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package resolver + +import ( + "go.etcd.io/etcd/clientv3/internal/endpoint" + + "google.golang.org/grpc/resolver" + "google.golang.org/grpc/resolver/manual" + "google.golang.org/grpc/serviceconfig" +) + +const ( + Schema = "etcd-endpoints" +) + +// EtcdManualResolver is a Resolver (and resolver.Builder) that can be updated +// using SetEndpoints. +type EtcdManualResolver struct { + *manual.Resolver + endpoints []string + serviceConfig *serviceconfig.ParseResult +} + +func New(endpoints ...string) *EtcdManualResolver { + r := manual.NewBuilderWithScheme(Schema) + return &EtcdManualResolver{Resolver: r, endpoints: endpoints, serviceConfig: nil} +} + +// Build returns itself for Resolver, because it's both a builder and a resolver. +func (r *EtcdManualResolver) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) { + r.serviceConfig = cc.ParseServiceConfig(`{"loadBalancingPolicy": "round_robin"}`) + if r.serviceConfig.Err != nil { + return nil, r.serviceConfig.Err + } + res, err := r.Resolver.Build(target, cc, opts) + if err != nil { + return nil, err + } + // Populates endpoints stored in r into ClientConn (cc). + r.updateState() + return res, nil +} + +func (r *EtcdManualResolver) SetEndpoints(endpoints []string) { + r.endpoints = endpoints + r.updateState() +} + +func (r EtcdManualResolver) updateState() { + if r.CC != nil { + addresses := make([]resolver.Address, len(r.endpoints)) + for i, ep := range r.endpoints { + addr, serverName := endpoint.Interpret(ep) + addresses[i] = resolver.Address{Addr: addr, ServerName: serverName} + } + state := resolver.State{ + Addresses: addresses, + ServiceConfig: r.serviceConfig, + } + r.UpdateState(state) + } +} diff --git a/embed/etcd.go b/embed/etcd.go index 223a8aaeaa8..0f15f3609d6 100644 --- a/embed/etcd.go +++ b/embed/etcd.go @@ -555,7 +555,7 @@ func (e *Etcd) servePeers() (err error) { for _, p := range e.Peers { u := p.Listener.Addr().String() - gs := v3rpc.Server(e.Server, peerTLScfg) + gs := v3rpc.Server(e.Server, peerTLScfg, nil) m := cmux.New(p.Listener) go gs.Serve(m.Match(cmux.HTTP2())) srv := &http.Server{ diff --git a/embed/serve.go b/embed/serve.go index 27230e35d5b..1655171f4f5 100644 --- a/embed/serve.go +++ b/embed/serve.go @@ -147,7 +147,7 @@ func (sctx *serveCtx) serve( } } if grpcEnabled { - gs = v3rpc.Server(s, nil, gopts...) + gs = v3rpc.Server(s, nil, nil, gopts...) v3electionpb.RegisterElectionServer(gs, servElection) v3lockpb.RegisterLockServer(gs, servLock) if sctx.serviceRegister != nil { @@ -215,7 +215,7 @@ func (sctx *serveCtx) serve( } if grpcEnabled { - gs = v3rpc.Server(s, tlscfg, gopts...) + gs = v3rpc.Server(s, tlscfg, nil, gopts...) v3electionpb.RegisterElectionServer(gs, servElection) v3lockpb.RegisterLockServer(gs, servLock) if sctx.serviceRegister != nil { diff --git a/etcdserver/api/v3rpc/grpc.go b/etcdserver/api/v3rpc/grpc.go index 0cd09d01642..5ae8f9aba12 100644 --- a/etcdserver/api/v3rpc/grpc.go +++ b/etcdserver/api/v3rpc/grpc.go @@ -23,10 +23,11 @@ import ( grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" - "go.etcd.io/etcd/clientv3/credentials" "google.golang.org/grpc" "google.golang.org/grpc/health" healthpb "google.golang.org/grpc/health/grpc_health_v1" + + "go.etcd.io/etcd/clientv3/credentials" ) const ( @@ -34,18 +35,23 @@ const ( maxSendBytes = math.MaxInt32 ) -func Server(s *etcdserver.EtcdServer, tls *tls.Config, gopts ...grpc.ServerOption) *grpc.Server { +func Server(s *etcdserver.EtcdServer, tls *tls.Config, interceptor grpc.UnaryServerInterceptor, gopts ...grpc.ServerOption) *grpc.Server { var opts []grpc.ServerOption opts = append(opts, grpc.CustomCodec(&codec{})) if tls != nil { bundle := credentials.NewBundle(credentials.Config{TLSConfig: tls}) opts = append(opts, grpc.Creds(bundle.TransportCredentials())) } - opts = append(opts, grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer( + + chainUnaryInterceptors := []grpc.UnaryServerInterceptor{ newLogUnaryInterceptor(s), newUnaryInterceptor(s), grpc_prometheus.UnaryServerInterceptor, - ))) + } + if interceptor != nil { + chainUnaryInterceptors = append(chainUnaryInterceptors, interceptor) + } + opts = append(opts, grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(chainUnaryInterceptors...))) opts = append(opts, grpc.StreamInterceptor(grpc_middleware.ChainStreamServer( newStreamInterceptor(s), grpc_prometheus.StreamServerInterceptor, diff --git a/go.mod b/go.mod index bf8835cfd3a..a1ea8b564d2 100644 --- a/go.mod +++ b/go.mod @@ -15,7 +15,6 @@ require ( github.com/golang/groupcache v0.0.0-20160516000752-02826c3e7903 github.com/golang/protobuf v1.4.3 github.com/google/btree v1.0.0 - github.com/google/uuid v1.0.0 github.com/grpc-ecosystem/go-grpc-middleware v1.0.1-0.20190118093823-f849b5445de4 github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/grpc-ecosystem/grpc-gateway v1.11.0 diff --git a/go.sum b/go.sum index 442bbb1ce93..79e29ddb62b 100644 --- a/go.sum +++ b/go.sum @@ -76,8 +76,6 @@ github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= -github.com/google/uuid v1.0.0 h1:b4Gk+7WdP/d3HZH8EJsZpvV7EtDOgaZLtnaNGIu1adA= -github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc= github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/grpc-ecosystem/go-grpc-middleware v1.0.1-0.20190118093823-f849b5445de4 h1:z53tR0945TRRQO/fLEVPI6SMv7ZflF0TEaTAoU7tOzg= diff --git a/integration/cluster.go b/integration/cluster.go index 11d44c96541..0a6363ebbdd 100644 --- a/integration/cluster.go +++ b/integration/cluster.go @@ -836,8 +836,8 @@ func (m *member) Launch() error { return err } } - m.grpcServer = v3rpc.Server(m.s, tlscfg, m.grpcServerOpts...) - m.grpcServerPeer = v3rpc.Server(m.s, peerTLScfg) + m.grpcServer = v3rpc.Server(m.s, tlscfg, nil, m.grpcServerOpts...) + m.grpcServerPeer = v3rpc.Server(m.s, peerTLScfg, nil) m.serverClient = v3client.New(m.s) lockpb.RegisterLockServer(m.grpcServer, v3lock.NewLockServer(m.serverClient)) epb.RegisterElectionServer(m.grpcServer, v3election.NewElectionServer(m.serverClient)) diff --git a/tests/e2e/cluster_proxy_test.go b/tests/e2e/cluster_proxy_test.go index efb05f71e97..4387983484b 100644 --- a/tests/e2e/cluster_proxy_test.go +++ b/tests/e2e/cluster_proxy_test.go @@ -116,6 +116,10 @@ func (p *proxyEtcdProcess) WithStopSignal(sig os.Signal) os.Signal { return p.etcdProc.WithStopSignal(sig) } +func (p *proxyEtcdProcess) Logs() logsExpect { + return p.etcdProc.Logs() +} + type proxyProc struct { execPath string args []string diff --git a/tests/e2e/cluster_test.go b/tests/e2e/cluster_test.go index f13fc88c590..deac4ed1e5e 100644 --- a/tests/e2e/cluster_test.go +++ b/tests/e2e/cluster_test.go @@ -100,6 +100,7 @@ type etcdProcessClusterConfig struct { execPath string dataDirPath string keepDataDir bool + envVars map[string]string clusterSize int @@ -291,6 +292,7 @@ func (cfg *etcdProcessClusterConfig) etcdServerProcessConfigs() []*etcdServerPro etcdCfgs[i] = &etcdServerProcessConfig{ execPath: cfg.execPath, args: args, + envVars: cfg.envVars, tlsArgs: cfg.tlsArgs(), dataDirPath: dataDirPath, keepDataDir: cfg.keepDataDir, diff --git a/tests/e2e/ctl_v3_grpc_test.go b/tests/e2e/ctl_v3_grpc_test.go new file mode 100644 index 00000000000..e84c9b6bd8e --- /dev/null +++ b/tests/e2e/ctl_v3_grpc_test.go @@ -0,0 +1,214 @@ +// Copyright 2021 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build !cluster_proxy +// +build !cluster_proxy + +package e2e + +import ( + "fmt" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "go.etcd.io/etcd/pkg/testutil" +) + +func TestAuthority(t *testing.T) { + tcs := []struct { + name string + useTLS bool + useInsecureTLS bool + // Pattern used to generate endpoints for client. Fields filled + // %d - will be filled with member grpc port + clientURLPattern string + + // Pattern used to validate authority received by server. Fields filled: + // %d - will be filled with first member grpc port + expectAuthorityPattern string + }{ + { + name: "http://domain[:port]", + clientURLPattern: "http://localhost:%d", + expectAuthorityPattern: "localhost:%d", + }, + { + name: "http://address[:port]", + clientURLPattern: "http://127.0.0.1:%d", + expectAuthorityPattern: "127.0.0.1:%d", + }, + { + name: "https://domain[:port] insecure", + useTLS: true, + useInsecureTLS: true, + clientURLPattern: "https://localhost:%d", + expectAuthorityPattern: "localhost:%d", + }, + { + name: "https://address[:port] insecure", + useTLS: true, + useInsecureTLS: true, + clientURLPattern: "https://127.0.0.1:%d", + expectAuthorityPattern: "127.0.0.1:%d", + }, + { + name: "https://domain[:port]", + useTLS: true, + clientURLPattern: "https://localhost:%d", + expectAuthorityPattern: "localhost:%d", + }, + { + name: "https://address[:port]", + useTLS: true, + clientURLPattern: "https://127.0.0.1:%d", + expectAuthorityPattern: "127.0.0.1:%d", + }, + } + for _, tc := range tcs { + for _, clusterSize := range []int{1, 3} { + t.Run(fmt.Sprintf("Size: %d, Scenario: %q", clusterSize, tc.name), func(t *testing.T) { + defer testutil.AfterTest(t) + + cfg := configNoTLS + cfg.clusterSize = clusterSize + if tc.useTLS { + cfg.clientTLS = clientTLS + } + cfg.isClientAutoTLS = tc.useInsecureTLS + // Enable debug mode to get logs with http2 headers (including authority) + cfg.envVars = map[string]string{"GODEBUG": "http2debug=2"} + + epc, err := newEtcdProcessCluster(&cfg) + if err != nil { + t.Fatalf("could not start etcd process cluster (%v)", err) + } + defer epc.Close() + endpoints := templateEndpoints(t, tc.clientURLPattern, epc) + + client := clusterEtcdctlV3(&cfg, endpoints) + err = client.Put("foo", "bar") + if err != nil { + t.Fatal(err) + } + + executeWithTimeout(t, 5*time.Second, func() { + assertAuthority(t, fmt.Sprintf(tc.expectAuthorityPattern, 20000), epc) + }) + }) + + } + } +} + +func templateEndpoints(t *testing.T, pattern string, clus *etcdProcessCluster) []string { + t.Helper() + endpoints := []string{} + for i := 0; i < clus.cfg.clusterSize; i++ { + ent := pattern + if strings.Contains(ent, "%d") { + ent = fmt.Sprintf(ent, etcdProcessBasePort+i*5) + } + if strings.Contains(ent, "%") { + t.Fatalf("Failed to template pattern, %% symbol left %q", ent) + } + endpoints = append(endpoints, ent) + } + return endpoints +} + +func assertAuthority(t *testing.T, expectAurhority string, clus *etcdProcessCluster) { + logs := []logsExpect{} + for _, proc := range clus.procs { + logs = append(logs, proc.Logs()) + } + line := firstMatch(t, `http2: decoded hpack field header field ":authority"`, logs...) + line = strings.TrimSuffix(line, "\n") + line = strings.TrimSuffix(line, "\r") + expectLine := fmt.Sprintf(`http2: decoded hpack field header field ":authority" = %q`, expectAurhority) + assert.True(t, strings.HasSuffix(line, expectLine), fmt.Sprintf("Got %q expected suffix %q", line, expectLine)) +} + +func firstMatch(t *testing.T, expectLine string, logs ...logsExpect) string { + t.Helper() + match := make(chan string, len(logs)) + for i := range logs { + go func(l logsExpect) { + line, _ := l.Expect(expectLine) + match <- line + }(logs[i]) + } + return <-match +} + +func executeWithTimeout(t *testing.T, timeout time.Duration, f func()) { + donec := make(chan struct{}) + go func() { + defer close(donec) + f() + }() + + select { + case <-time.After(timeout): + testutil.FatalStack(t, fmt.Sprintf("test timed out after %v", timeout)) + case <-donec: + } +} + +type etcdctlV3 struct { + cfg *etcdProcessClusterConfig + endpoints []string +} + +func clusterEtcdctlV3(cfg *etcdProcessClusterConfig, endpoints []string) *etcdctlV3 { + return &etcdctlV3{ + cfg: cfg, + endpoints: endpoints, + } +} + +func (ctl *etcdctlV3) Put(key, value string) error { + return ctl.runCmd("put", key, value) +} + +func (ctl *etcdctlV3) runCmd(args ...string) error { + cmdArgs := []string{ctlBinPath + "3"} + for k, v := range ctl.flags() { + cmdArgs = append(cmdArgs, fmt.Sprintf("--%s=%s", k, v)) + } + cmdArgs = append(cmdArgs, args...) + return spawnWithExpect(cmdArgs, "OK") +} + +func (ctl *etcdctlV3) flags() map[string]string { + fmap := make(map[string]string) + if ctl.cfg.clientTLS == clientTLS { + if ctl.cfg.isClientAutoTLS { + fmap["insecure-transport"] = "false" + fmap["insecure-skip-tls-verify"] = "true" + } else if ctl.cfg.isClientCRL { + fmap["cacert"] = caPath + fmap["cert"] = revokedCertPath + fmap["key"] = revokedPrivateKeyPath + } else { + fmap["cacert"] = caPath + fmap["cert"] = certPath + fmap["key"] = privateKeyPath + } + } + fmap["endpoints"] = strings.Join(ctl.endpoints, ",") + return fmap +} diff --git a/tests/e2e/etcd_process.go b/tests/e2e/etcd_process.go index bdbcc209246..d1c43fda8f9 100644 --- a/tests/e2e/etcd_process.go +++ b/tests/e2e/etcd_process.go @@ -43,6 +43,12 @@ type etcdProcess interface { Close() error WithStopSignal(sig os.Signal) os.Signal Config() *etcdServerProcessConfig + + Logs() logsExpect +} + +type logsExpect interface { + Expect(string) (string, error) } type etcdServerProcess struct { @@ -54,6 +60,7 @@ type etcdServerProcess struct { type etcdServerProcessConfig struct { execPath string args []string + envVars map[string]string tlsArgs []string dataDirPath string @@ -98,7 +105,7 @@ func (ep *etcdServerProcess) Start() error { if ep.proc != nil { panic("already started") } - proc, err := spawnCmd(append([]string{ep.cfg.execPath}, ep.cfg.args...)) + proc, err := spawnCmdWithEnv(append([]string{ep.cfg.execPath}, ep.cfg.args...), ep.cfg.envVars) if err != nil { return err } @@ -153,3 +160,10 @@ func (ep *etcdServerProcess) waitReady() error { } func (ep *etcdServerProcess) Config() *etcdServerProcessConfig { return ep.cfg } + +func (ep *etcdServerProcess) Logs() logsExpect { + if ep.proc == nil { + panic("please grap logs before process is stopped") + } + return ep.proc +} diff --git a/tests/e2e/etcd_spawn_nocov.go b/tests/e2e/etcd_spawn_nocov.go index a5760078af0..b21636edfc9 100644 --- a/tests/e2e/etcd_spawn_nocov.go +++ b/tests/e2e/etcd_spawn_nocov.go @@ -18,6 +18,7 @@ package e2e import ( + "fmt" "os" "strings" @@ -27,7 +28,11 @@ import ( const noOutputLineCount = 0 // regular binaries emit no extra lines func spawnCmd(args []string) (*expect.ExpectProcess, error) { - env := os.Environ() + return spawnCmdWithEnv(args, nil) +} + +func spawnCmdWithEnv(args []string, envVars map[string]string) (*expect.ExpectProcess, error) { + env := mergeEnvVariables(envVars) switch { case strings.HasSuffix(args[0], ctlBinPath+"2"): env = append(env, "ETCDCTL_API=2") @@ -38,3 +43,23 @@ func spawnCmd(args []string) (*expect.ExpectProcess, error) { } return expect.NewExpectWithEnv(args[0], args[1:], env) } + +func mergeEnvVariables(envVars map[string]string) []string { + var env []string + // Environment variables are passed as parameter have higher priority + // than os environment variables. + for k, v := range envVars { + env = append(env, fmt.Sprintf("%s=%s", k, v)) + } + + // Now, we can set os environment variables not passed as parameter. + currVars := os.Environ() + for _, v := range currVars { + p := strings.Split(v, "=") + if _, ok := envVars[p[0]]; !ok { + env = append(env, fmt.Sprintf("%s=%s", p[0], p[1])) + } + } + + return env +}