Skip to content

Commit

Permalink
Use StoppableWorkers in the single-line encoder (#4147)
Browse files Browse the repository at this point in the history
Also, make `start()` a private function, so you can't get into trouble by someone building one of these structs and calling `.Start()` on it multiple times.

Tried on the `grievance` rover: I didn't see any changes to behavior (and indeed, no changes to behavior are intended).
  • Loading branch information
penguinland authored Jul 1, 2024
1 parent 7bc623e commit bdc3ea8
Showing 1 changed file with 30 additions and 20 deletions.
50 changes: 30 additions & 20 deletions components/encoder/single/single_encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"go.viam.com/rdk/components/encoder"
"go.viam.com/rdk/logging"
"go.viam.com/rdk/resource"
rdkutils "go.viam.com/rdk/utils"
)

var singleModel = resource.DefaultModelFamily.WithModel("single")
Expand Down Expand Up @@ -69,9 +70,7 @@ type Encoder struct {
positionType encoder.PositionType
logger logging.Logger

cancelCtx context.Context
cancelFunc func()
activeBackgroundWorkers sync.WaitGroup
workers rdkutils.StoppableWorkers
}

// Pin describes the configuration of Pins for a Single encoder.
Expand Down Expand Up @@ -115,12 +114,9 @@ func NewSingleEncoder(
conf resource.Config,
logger logging.Logger,
) (encoder.Encoder, error) {
cancelCtx, cancelFunc := context.WithCancel(context.Background())
e := &Encoder{
Named: conf.ResourceName().AsNamed(),
logger: logger,
cancelCtx: cancelCtx,
cancelFunc: cancelFunc,
position: 0,
positionType: encoder.PositionTypeTicks,
}
Expand Down Expand Up @@ -163,9 +159,6 @@ func (e *Encoder) Reconfigure(
return nil
}
utils.UncheckedError(e.Close(ctx))
cancelCtx, cancelFunc := context.WithCancel(context.Background())
e.cancelCtx = cancelCtx
e.cancelFunc = cancelFunc

e.mu.Lock()
e.I = di
Expand All @@ -175,31 +168,39 @@ func (e *Encoder) Reconfigure(
atomic.StoreInt64(&e.position, 0)
e.mu.Unlock()

e.Start(ctx, board)
e.start(ctx, board)

return nil
}

// Start starts the Encoder background thread.
func (e *Encoder) Start(ctx context.Context, b board.Board) {
// start starts the Encoder background thread.
func (e *Encoder) start(ctx context.Context, b board.Board) {
if e.workers != nil {
// We're already listening to an old interrupt. This should never happen! Stop that before
// we start listening to the new one.
utils.Logger.Error(
"starting an already-started encoder! Stopping the old interrupt stream...")
e.workers.Stop()
}
e.workers = rdkutils.NewStoppableWorkers()

encoderChannel := make(chan board.Tick)
err := b.StreamTicks(e.cancelCtx, []board.DigitalInterrupt{e.I}, encoderChannel, nil)
err := b.StreamTicks(e.workers.Context(), []board.DigitalInterrupt{e.I}, encoderChannel, nil)
if err != nil {
utils.Logger.Errorw("error getting interrupt ticks", "error", err)
return
}
e.activeBackgroundWorkers.Add(1)

utils.ManagedGo(func() {
e.workers.AddWorkers(func(cancelCtx context.Context) {
for {
select {
case <-e.cancelCtx.Done():
case <-cancelCtx.Done():
return
default:
}

select {
case <-e.cancelCtx.Done():
case <-cancelCtx.Done():
return
case <-encoderChannel:
}
Expand All @@ -216,7 +217,7 @@ func (e *Encoder) Start(ctx context.Context, b board.Board) {
e.logger.CDebug(ctx, "received tick for encoder that isn't connected to a motor; ignoring")
}
}
}, e.activeBackgroundWorkers.Done)
})
}

// Position returns the current position in terms of ticks or
Expand Down Expand Up @@ -250,7 +251,16 @@ func (e *Encoder) Properties(ctx context.Context, extra map[string]interface{})

// Close shuts down the Encoder.
func (e *Encoder) Close(ctx context.Context) error {
e.cancelFunc()
e.activeBackgroundWorkers.Wait()
// In unit tests, we construct encoders without calling NewSingleEncoder(), which means they
// might not have called e.start(), so might not have initialized e.workers. Don't crash if
// that happens.
if e.workers != nil {
e.workers.Stop() // This also shuts down the interrupt stream.
}

// During reconfiguration, we might call e.Close() and then e.start() to restart with a new
// interrupt pin. Remove the old StoppableWorkers so e.start() doesn't try adding workers to an
// already-stopped one.
e.workers = nil
return nil
}

0 comments on commit bdc3ea8

Please sign in to comment.