Skip to content

Commit

Permalink
chore: use sync/atomic types (#3723)
Browse files Browse the repository at this point in the history
  • Loading branch information
cristaloleg authored Sep 19, 2024
1 parent bb6b508 commit 2ce4bc8
Show file tree
Hide file tree
Showing 5 changed files with 12 additions and 12 deletions.
6 changes: 3 additions & 3 deletions das/daser.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type DASer struct {

cancel context.CancelFunc
subscriberDone chan struct{}
running int32
running atomic.Bool
}

type (
Expand Down Expand Up @@ -85,7 +85,7 @@ func NewDASer(

// Start initiates subscription for new ExtendedHeaders and spawns a sampling routine.
func (d *DASer) Start(ctx context.Context) error {
if !atomic.CompareAndSwapInt32(&d.running, 0, 1) {
if !d.running.CompareAndSwap(false, true) {
return errors.New("da: DASer already started")
}

Expand Down Expand Up @@ -124,7 +124,7 @@ func (d *DASer) Start(ctx context.Context) error {

// Stop stops sampling.
func (d *DASer) Stop(ctx context.Context) error {
if !atomic.CompareAndSwapInt32(&d.running, 1, 0) {
if !d.running.CompareAndSwap(true, false) {
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion das/daser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ func TestDASer_stopsAfter_BEFP(t *testing.T) {
require.NoError(t, res)
}
// wait for manager to finish catchup
require.True(t, daser.running == 0)
require.False(t, daser.running.Load())
}

func TestDASerSampleTimeout(t *testing.T) {
Expand Down
6 changes: 3 additions & 3 deletions das/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type metrics struct {
getHeaderTime metric.Float64Histogram
newHead metric.Int64Counter

lastSampledTS uint64
lastSampledTS atomic.Uint64

clientReg metric.Registration
}
Expand Down Expand Up @@ -113,7 +113,7 @@ func (d *DASer) InitMetrics() error {
observer.ObserveInt64(networkHead, int64(stats.NetworkHead))
observer.ObserveInt64(sampledChainHead, int64(stats.SampledChainHead))

if ts := atomic.LoadUint64(&d.sampler.metrics.lastSampledTS); ts != 0 {
if ts := d.sampler.metrics.lastSampledTS.Load(); ts != 0 {
observer.ObserveInt64(lastSampledTS, int64(ts))
}

Expand Down Expand Up @@ -171,7 +171,7 @@ func (m *metrics) observeSample(
attribute.String(jobTypeLabel, string(jobType)),
))

atomic.StoreUint64(&m.lastSampledTS, uint64(time.Now().UTC().Unix()))
m.lastSampledTS.Store(uint64(time.Now().UTC().Unix()))
}

// observeGetHeader records the time it took to get a header from the header store.
Expand Down
4 changes: 2 additions & 2 deletions share/eds/retriever.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ type retrievalSession struct {
// https://github.com/celestiaorg/rsmt2d/issues/135
squareQuadrants []*quadrant
squareCellsLks [][]sync.Mutex
squareCellsCount uint32
squareCellsCount atomic.Uint32
squareSig chan struct{}
squareDn chan struct{}
squareLk sync.RWMutex
Expand Down Expand Up @@ -319,7 +319,7 @@ func (rs *retrievalSession) doRequest(ctx context.Context, q *quadrant) {
// but it is totally fine for the happy case and for now.
// The earlier we correctly know that we have the full square - the earlier
// we cancel ongoing requests - the less data is being wastedly transferred.
if atomic.AddUint32(&rs.squareCellsCount, 1) >= uint32(size*size) {
if rs.squareCellsCount.Add(1) >= uint32(size*size) {
select {
case rs.squareSig <- struct{}{}:
default:
Expand Down
6 changes: 3 additions & 3 deletions share/ipld/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,15 +201,15 @@ func GetProof(
// when fully done.
type chanGroup struct {
jobs chan job
counter int64
counter atomic.Int64
}

func (w *chanGroup) add(count int64) {
atomic.AddInt64(&w.counter, count)
w.counter.Add(count)
}

func (w *chanGroup) done() {
numRemaining := atomic.AddInt64(&w.counter, -1)
numRemaining := w.counter.Add(-1)

// Close channel if this job was the last one
if numRemaining == 0 {
Expand Down

0 comments on commit 2ce4bc8

Please sign in to comment.