Skip to content

Commit

Permalink
Merge pull request #1481 from dcherednik/explicit_ip_support_rc1
Browse files Browse the repository at this point in the history
Support for external ip discovery
  • Loading branch information
asmyasnikov authored Sep 26, 2024
2 parents 14ec2f2 + 4b40c7b commit d6b0468
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 15 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
* Added ip discovery. Server can show own ip address and target hostname in the ListEndpoint message. These fields are used to bypass DNS resolving.

## v3.81.0
* Added error ErrMessagesPutToInternalQueueBeforeError to topic writer
* Added write to topics within transactions
Expand Down
19 changes: 16 additions & 3 deletions internal/conn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,18 @@ func (c *conn) GetState() (s State) {
return State(c.state.Load())
}

func makeDialOption(overrideHost string) []grpc.DialOption {
dialOption := []grpc.DialOption{
grpc.WithStatsHandler(statsHandler{}),
}

if len(overrideHost) != 0 {
dialOption = append(dialOption, grpc.WithAuthority(overrideHost))
}

return dialOption
}

func (c *conn) realConn(ctx context.Context) (cc *grpc.ClientConn, err error) {
if c.isClosed() {
return nil, xerrors.WithStackTrace(errClosedConnection)
Expand Down Expand Up @@ -208,10 +220,11 @@ func (c *conn) realConn(ctx context.Context) (cc *grpc.ClientConn, err error) {
// three slashes in "ydb:///" is ok. It needs for good parse scheme in grpc resolver.
address := "ydb:///" + c.endpoint.Address()

dialOption := makeDialOption(c.endpoint.OverrideHost())

cc, err = grpc.DialContext(ctx, address, append( //nolint:staticcheck,nolintlint
[]grpc.DialOption{
grpc.WithStatsHandler(statsHandler{}),
}, c.config.GrpcDialOptions()...,
dialOption,
c.config.GrpcDialOptions()...,
)...)
if err != nil {
if xerrors.IsContextError(err) {
Expand Down
3 changes: 3 additions & 0 deletions internal/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ func discover(
endpoint.WithLocalDC(e.GetLocation() == location),
endpoint.WithServices(e.GetService()),
endpoint.WithLastUpdated(config.Clock().Now()),
endpoint.WithIPV4(e.GetIpV4()),
endpoint.WithIPV6(e.GetIpV6()),
endpoint.WithSslTargetNameOverride(e.GetSslTargetNameOverride()),
))
}
}
Expand Down
83 changes: 71 additions & 12 deletions internal/endpoint/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ type (
Location() string
LastUpdated() time.Time
LoadFactor() float32
OverrideHost() string

// Deprecated: LocalDC check "local" by compare endpoint location with discovery "selflocation" field.
// It work good only if connection url always point to local dc.
Expand All @@ -33,11 +34,14 @@ type (
)

type endpoint struct { //nolint:maligned
mu sync.RWMutex
id uint32
address string
location string
services []string
mu sync.RWMutex
id uint32
address string
location string
services []string
ipv4 []string
ipv6 []string
sslNameOverride string

loadFactor float32
lastUpdated time.Time
Expand All @@ -50,13 +54,16 @@ func (e *endpoint) Copy() Endpoint {
defer e.mu.RUnlock()

return &endpoint{
id: e.id,
address: e.address,
location: e.location,
services: append(make([]string, 0, len(e.services)), e.services...),
loadFactor: e.loadFactor,
local: e.local,
lastUpdated: e.lastUpdated,
id: e.id,
address: e.address,
location: e.location,
services: append(make([]string, 0, len(e.services)), e.services...),
ipv4: append(make([]string, 0, len(e.ipv4)), e.ipv4...),
ipv6: append(make([]string, 0, len(e.ipv6)), e.ipv6...),
sslNameOverride: e.sslNameOverride,
loadFactor: e.loadFactor,
local: e.local,
lastUpdated: e.lastUpdated,
}
}

Expand All @@ -81,13 +88,47 @@ func (e *endpoint) NodeID() uint32 {
return e.id
}

func getResolvedAddr(e *endpoint, useV6 bool) string {
var ip string
if useV6 {
ip = "[" + e.ipv6[0] + "]"
} else {
ip = e.ipv4[0]
}

end := len(e.address)

for i := end - 1; i >= 0; i-- {
if e.address[i] == ':' {
return ip + e.address[i:]
}
}

return e.address
}

func (e *endpoint) Address() (address string) {
e.mu.RLock()
defer e.mu.RUnlock()

if len(e.ipv4) != 0 {
return getResolvedAddr(e, false)
}

if len(e.ipv6) != 0 {
return getResolvedAddr(e, true)
}

return e.address
}

func (e *endpoint) OverrideHost() string {
e.mu.RLock()
defer e.mu.RUnlock()

return e.sslNameOverride
}

func (e *endpoint) Location() string {
e.mu.RLock()
defer e.mu.RUnlock()
Expand Down Expand Up @@ -168,6 +209,24 @@ func WithLastUpdated(ts time.Time) Option {
}
}

func WithIPV4(ipv4 []string) Option {
return func(e *endpoint) {
e.ipv4 = append(e.ipv4, ipv4...)
}
}

func WithIPV6(ipv6 []string) Option {
return func(e *endpoint) {
e.ipv6 = append(e.ipv6, ipv6...)
}
}

func WithSslTargetNameOverride(nameOverride string) Option {
return func(e *endpoint) {
e.sslNameOverride = nameOverride
}
}

func New(address string, opts ...Option) *endpoint {
e := &endpoint{
address: address,
Expand Down
4 changes: 4 additions & 0 deletions internal/mock/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,10 @@ func (e *Endpoint) LoadFactor() float32 {
panic("not implemented in mock")
}

func (e *Endpoint) OverrideHost() string {
panic("not implemented in mock")
}

func (e *Endpoint) String() string {
panic("not implemented in mock")
}
Expand Down

0 comments on commit d6b0468

Please sign in to comment.