Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

client: don't reconnect to same db after leader change #280

Merged
merged 2 commits into from
Jan 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
193 changes: 127 additions & 66 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,21 @@ type bufferedUpdate struct {
lastTxnID string
}

type epInfo struct {
address string
serverID string
}

// ovsdbClient is an OVSDB client
type ovsdbClient struct {
options *options
metrics metrics
connected bool
rpcClient *rpc2.Client
rpcMutex sync.RWMutex
activeEndpoint string
options *options
metrics metrics
connected bool
rpcClient *rpc2.Client
rpcMutex sync.RWMutex
// endpoints contains all possible endpoints; the first element is
// the active endpoint if connected=true
endpoints []*epInfo

// The name of the "primary" database - that is to say, the DB
// that the user expects to interact with.
Expand Down Expand Up @@ -146,6 +153,9 @@ func newOVSDBClient(clientDBModel model.ClientDBModel, opts ...Option) (*ovsdbCl
if err != nil {
return nil, err
}
for _, address := range ovs.options.endpoints {
ovs.endpoints = append(ovs.endpoints, &epInfo{address: address})
}

if ovs.options.logger == nil {
// create a new logger to log to stdout
Expand Down Expand Up @@ -199,6 +209,28 @@ func (o *ovsdbClient) Connect(ctx context.Context) error {
return nil
}

// moveEndpointFirst makes the endpoint requested by active the first element
// in the endpoints slice, indicating it is the active endpoint
func (o *ovsdbClient) moveEndpointFirst(i int) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: It might be nice to have a unit test for this and the other function.
They "should" work, but tests are always nice

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dave-tucker I added a reconnect-with-leader-only test to client_test.go that covers these functions (and the whole reconnect process).

firstEp := o.endpoints[i]
othereps := append(o.endpoints[:i], o.endpoints[i+1:]...)
o.endpoints = append([]*epInfo{firstEp}, othereps...)
}

// moveEndpointLast moves the requested endpoint to the end of the list
func (o *ovsdbClient) moveEndpointLast(i int) {
lastEp := o.endpoints[i]
othereps := append(o.endpoints[:i], o.endpoints[i+1:]...)
o.endpoints = append(othereps, lastEp)
}

func (o *ovsdbClient) resetRPCClient() {
if o.rpcClient != nil {
o.rpcClient.Close()
o.rpcClient = nil
}
}

func (o *ovsdbClient) connect(ctx context.Context, reconnect bool) error {
o.rpcMutex.Lock()
defer o.rpcMutex.Unlock()
Expand All @@ -208,18 +240,20 @@ func (o *ovsdbClient) connect(ctx context.Context, reconnect bool) error {

connected := false
connectErrors := []error{}
for _, endpoint := range o.options.endpoints {
u, err := url.Parse(endpoint)
for i, endpoint := range o.endpoints {
u, err := url.Parse(endpoint.address)
if err != nil {
return err
}
if err := o.tryEndpoint(ctx, u); err != nil {
if sid, err := o.tryEndpoint(ctx, u); err != nil {
o.resetRPCClient()
connectErrors = append(connectErrors,
fmt.Errorf("failed to connect to %s: %w", endpoint, err))
fmt.Errorf("failed to connect to %s: %w", endpoint.address, err))
continue
} else {
o.logger.V(3).Info("successfully connected", "endpoint", endpoint)
o.activeEndpoint = endpoint
o.logger.V(3).Info("successfully connected", "endpoint", endpoint.address, "sid", sid)
endpoint.serverID = sid
o.moveEndpointFirst(i)
connected = true
break
}
Expand All @@ -246,8 +280,7 @@ func (o *ovsdbClient) connect(ctx context.Context, reconnect bool) error {
for id, request := range db.monitors {
err := o.monitor(ctx, MonitorCookie{DatabaseName: dbName, ID: id}, true, request)
if err != nil {
o.rpcClient.Close()
o.rpcClient = nil
o.resetRPCClient()
return err
}
}
Expand All @@ -264,7 +297,9 @@ func (o *ovsdbClient) connect(ctx context.Context, reconnect bool) error {
return nil
}

func (o *ovsdbClient) tryEndpoint(ctx context.Context, u *url.URL) error {
// tryEndpoint connects to a single database endpoint. Returns the
// server ID (if clustered) on success, or an error.
func (o *ovsdbClient) tryEndpoint(ctx context.Context, u *url.URL) (string, error) {
o.logger.V(5).Info("trying to connect", "endpoint", fmt.Sprintf("%v", u))
var dialer net.Dialer
var err error
Expand All @@ -283,18 +318,15 @@ func (o *ovsdbClient) tryEndpoint(ctx context.Context, u *url.URL) error {
default:
err = fmt.Errorf("unknown network protocol %s", u.Scheme)
}

if err != nil {
return fmt.Errorf("failed to open connection: %w", err)
return "", fmt.Errorf("failed to open connection: %w", err)
}

o.createRPC2Client(c)

serverDBNames, err := o.listDbs(ctx)
if err != nil {
o.rpcClient.Close()
o.rpcClient = nil
return err
return "", err
}

// for every requested database, ensure the DB exists in the server and
Expand All @@ -309,18 +341,13 @@ func (o *ovsdbClient) tryEndpoint(ctx context.Context, u *url.URL) error {
}
}
if !found {
err = fmt.Errorf("target database %s not found", dbName)
o.rpcClient.Close()
o.rpcClient = nil
return err
return "", fmt.Errorf("target database %s not found", dbName)
}

// load and validate the schema
schema, err := o.getSchema(ctx, dbName)
if err != nil {
o.rpcClient.Close()
o.rpcClient = nil
return err
return "", err
}

db.modelMutex.Lock()
Expand All @@ -332,21 +359,16 @@ func (o *ovsdbClient) tryEndpoint(ctx context.Context, u *url.URL) error {
for _, err := range errors {
combined = append(combined, err.Error())
}
err = fmt.Errorf("database %s validation error (%d): %s", dbName, len(errors),
strings.Join(combined, ". "))
o.rpcClient.Close()
o.rpcClient = nil
return err
return "", fmt.Errorf("database %s validation error (%d): %s",
dbName, len(errors), strings.Join(combined, ". "))
}

db.cacheMutex.Lock()
if db.cache == nil {
db.cache, err = cache.NewTableCache(db.model, nil, o.logger)
if err != nil {
db.cacheMutex.Unlock()
o.rpcClient.Close()
o.rpcClient = nil
return err
return "", err
}
db.api = newAPI(db.cache, o.logger)
} else {
Expand All @@ -356,22 +378,18 @@ func (o *ovsdbClient) tryEndpoint(ctx context.Context, u *url.URL) error {
}

// check that this is the leader
var sid string
if o.options.leaderOnly {
var leader bool
leader, err = o.isEndpointLeader(ctx)
leader, sid, err = o.isEndpointLeader(ctx)
if err != nil {
o.rpcClient.Close()
o.rpcClient = nil
return err
return "", err
}
if !leader {
err = fmt.Errorf("endpoint is not leader")
o.rpcClient.Close()
o.rpcClient = nil
return err
return "", fmt.Errorf("endpoint is not leader")
}
}
return nil
return sid, nil
}

// createRPC2Client creates an rpcClient using the provided connection
Expand All @@ -396,58 +414,67 @@ func (o *ovsdbClient) createRPC2Client(conn net.Conn) {
go o.rpcClient.Run()
}

// isEndpointLeader returns true if the currently connected endpoint is leader.
// assumes rpcMutex is held
func (o *ovsdbClient) isEndpointLeader(ctx context.Context) (bool, error) {
// isEndpointLeader returns true if the currently connected endpoint is leader,
// otherwise false or an error. If the currently connected endpoint is the leader
// and the database is clustered, also returns the database's Server ID.
// Assumes rpcMutex is held.
func (o *ovsdbClient) isEndpointLeader(ctx context.Context) (bool, string, error) {
op := ovsdb.Operation{
Op: ovsdb.OperationSelect,
Table: "Database",
Columns: []string{"name", "model", "leader"},
}
results, err := o.transact(ctx, serverDB, op)
if err != nil {
return false, fmt.Errorf("could not check if server was leader: %w", err)
return false, "", fmt.Errorf("could not check if server was leader: %w", err)
}
// for now, if no rows are returned, just accept this server
if len(results) != 1 {
return true, nil
return true, "", nil
}
result := results[0]
if len(result.Rows) == 0 {
return true, nil
return true, "", nil
}

for _, row := range result.Rows {
dbName, ok := row["name"].(string)
if !ok {
return false, fmt.Errorf("could not parse name")
return false, "", fmt.Errorf("could not parse name")
}
if dbName != o.primaryDBName {
continue
}

model, ok := row["model"].(string)
if !ok {
return false, fmt.Errorf("could not parse model")
return false, "", fmt.Errorf("could not parse model")
}

// the database reports whether or not it is part of a cluster via the
// "model" column. If it's not clustered, it is by definition leader.
if model != serverdb.DatabaseModelClustered {
return true, nil
return true, "", nil
}

// Clustered database must have a Server ID
sid, ok := row["sid"].(ovsdb.UUID)
if !ok {
return false, "", fmt.Errorf("could not parse server id")
}

leader, ok := row["leader"].(bool)
if !ok {
return false, fmt.Errorf("could not parse leader")
return false, "", fmt.Errorf("could not parse leader")
}
return leader, nil

return leader, sid.GoUUID, nil
}

// Extremely unlikely: there is no _Server row for the desired DB (which we made sure existed)
// for now, just continue
o.logger.V(3).Info("Couldn't find a row in _Server for our database. Continuing without leader detection", "database", o.primaryDBName)
return true, nil
return true, "", nil
}

func (o *ovsdbClient) primaryDB() *database {
Expand Down Expand Up @@ -497,7 +524,7 @@ func (o *ovsdbClient) CurrentEndpoint() string {
if o.rpcClient == nil {
return ""
}
return o.activeEndpoint
return o.endpoints[0].address
}

// DisconnectNotify returns a channel which will notify the caller when the
Expand Down Expand Up @@ -983,10 +1010,37 @@ func (o *ovsdbClient) watchForLeaderChange() error {
continue
}

if dbInfo.Model == serverdb.DatabaseModelClustered && !dbInfo.Leader && o.Connected() {
o.logger.V(3).Info("endpoint lost leader, reconnecting", "endpoint", o.activeEndpoint)
o.Disconnect()
// Only handle leadership changes for clustered databases
if dbInfo.Model != serverdb.DatabaseModelClustered {
continue
}

// Clustered database servers must have a valid Server ID
var sid string
if dbInfo.Sid != nil {
sid = *dbInfo.Sid
}
if sid == "" {
o.logger.V(3).Info("clustered database update contained invalid server ID")
continue
}

o.rpcMutex.Lock()
if !dbInfo.Leader && o.connected {
activeEndpoint := o.endpoints[0]
if sid == activeEndpoint.serverID {
o.logger.V(3).Info("endpoint lost leader, reconnecting",
"endpoint", activeEndpoint.address, "sid", sid)
// don't immediately reconnect to the active endpoint since it's no longer leader
o.moveEndpointLast(0)
o._disconnect()
} else {
o.logger.V(3).Info("endpoint lost leader but had unexpected server ID",
"endpoint", activeEndpoint.address,
"expected", activeEndpoint.serverID, "found", sid)
}
}
o.rpcMutex.Unlock()
}
}()
return nil
Expand Down Expand Up @@ -1051,7 +1105,7 @@ func (o *ovsdbClient) handleDisconnectNotification() {
suppressionCounter++
return err
}
o.logger.V(3).Info("connection lost, reconnecting", "endpoint", o.activeEndpoint)
o.logger.V(3).Info("connection lost, reconnecting", "endpoint", o.endpoints[0].address)
err := backoff.Retry(connect, o.options.backoff)
if err != nil {
// TODO: We should look at passing this back to the
Expand Down Expand Up @@ -1096,19 +1150,26 @@ func (o *ovsdbClient) handleDisconnectNotification() {
}
}

// Disconnect will close the connection to the OVSDB server
// _disconnect will close the connection to the OVSDB server
// If the client was created with WithReconnect then the client
// will reconnect afterwards
func (o *ovsdbClient) Disconnect() {
o.rpcMutex.Lock()
defer o.rpcMutex.Unlock()
// will reconnect afterwards. Assumes rpcMutex is held.
func (o *ovsdbClient) _disconnect() {
o.connected = false
if o.rpcClient == nil {
return
}
o.rpcClient.Close()
}

// Disconnect will close the connection to the OVSDB server
// If the client was created with WithReconnect then the client
// will reconnect afterwards
func (o *ovsdbClient) Disconnect() {
o.rpcMutex.Lock()
defer o.rpcMutex.Unlock()
o._disconnect()
}

// Close will close the connection to the OVSDB server
// It will remove all stored state ready for the next connection
// Even If the client was created with WithReconnect it will not reconnect afterwards
Expand Down
Loading