Skip to content

Commit

Permalink
fix(dataconn): add back ability for controller to respond to ping fai…
Browse files Browse the repository at this point in the history
…lures

Longhorn 8711

Signed-off-by: Eric Weber <eric.weber@suse.com>
  • Loading branch information
ejweber committed Jul 30, 2024
1 parent fd52aba commit cf59368
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 7 deletions.
4 changes: 4 additions & 0 deletions pkg/backend/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,10 @@ func (f *Wrapper) GetState() (string, error) {
return "open", nil
}

func (f *Wrapper) GetMonitorChannel() types.MonitorChannel {
return nil
}

func (f *Wrapper) PingResponse() error {
return nil
}
Expand Down
23 changes: 16 additions & 7 deletions pkg/backend/remote/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type Remote struct {
name string
replicaServiceURL string
closeChan chan struct{}
monitorChan types.MonitorChannel
volumeName string
}

Expand Down Expand Up @@ -392,8 +393,9 @@ func (rf *Factory) Create(volumeName, address string, dataServerProtocol types.D
replicaServiceURL: controlAddress,
// We don't want sender to wait for receiver, because receiver may
// has been already notified
closeChan: make(chan struct{}, 5),
volumeName: volumeName,
closeChan: make(chan struct{}, 5),
monitorChan: make(types.MonitorChannel, 5),
volumeName: volumeName,
}

replica, err := r.info()
Expand Down Expand Up @@ -421,7 +423,7 @@ func (rf *Factory) Create(volumeName, address string, dataServerProtocol types.D
return nil, err
}

go r.monitor(dataConnClient, engineToReplicaTimeout)
go r.monitorPing(dataConnClient, engineToReplicaTimeout)

return r, nil
}
Expand All @@ -441,27 +443,34 @@ func connect(dataServerProtocol types.DataServerProtocol, address string) (net.C
}
}

// monitor sends a TypePing message to the remote once every engineToReplicatimeout. The remote does not have to respond
// to each ping in any particular time, but if it does not respond to any message (including a TypePing message) within
// engineToReplicaTimeout, the replica will be marked as failed.
func (r *Remote) monitor(client *dataconn.Client, engineToReplicaTimeout time.Duration) {
// monitorPing sends a TypePing message to the remote once every engineToReplicatimeout. The remote does not have to
// respond to each ping in any particular time, but if it does not respond to any message (including a TypePing message)
// within engineToReplicaTimeout, the replica will be marked as failed.
func (r *Remote) monitorPing(client *dataconn.Client, engineToReplicaTimeout time.Duration) {
ticker := time.NewTicker(engineToReplicaTimeout)
defer ticker.Stop()

for {
select {
case <-r.closeChan:
r.monitorChan <- nil
return
case <-ticker.C:
go func() {
if err := client.Ping(); err != nil {
client.SetError(err)
r.monitorChan <- err
return
}
}()
}
}
}

func (r *Remote) GetMonitorChannel() types.MonitorChannel {
return r.monitorChan
}

func (r *Remote) StopMonitoring() {
r.closeChan <- struct{}{}
}
22 changes: 22 additions & 0 deletions pkg/controller/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,10 @@ func (c *Controller) addReplicaNoLock(newBackend types.Backend, address string,

c.backend.AddBackend(address, newBackend, mode)

if mode != types.ERR {
go c.monitoring(address, newBackend)
}

return nil
}

Expand Down Expand Up @@ -1259,6 +1263,24 @@ func (c *Controller) Size() int64 {
return c.size
}

func (c *Controller) monitoring(address string, backend types.Backend) {
monitorChan := backend.GetMonitorChannel()

if monitorChan == nil {
return
}

logrus.Infof("Start monitoring %v", address)
err := <-monitorChan
if err != nil {
logrus.WithError(err).Errorf("Backend %v monitoring failed, mark as ERR", address)
if err = c.SetReplicaMode(address, types.ERR); err != nil {
logrus.WithError(err).Warnf("Failed to set replica %v to ERR", address)
}
}
logrus.Infof("Monitoring stopped %v", address)
}

func (c *Controller) Endpoint() string {
if c.frontend != nil {
return c.frontend.Endpoint()
Expand Down
1 change: 1 addition & 0 deletions pkg/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ type Backend interface {
GetRevisionCounter() (int64, error)
SetRevisionCounter(counter int64) error
GetState() (string, error)
GetMonitorChannel() MonitorChannel
StopMonitoring()
IsRevisionCounterDisabled() (bool, error)
GetLastModifyTime() (int64, error)
Expand Down

0 comments on commit cf59368

Please sign in to comment.