Skip to content

Commit

Permalink
Add round trip time measurement to candidate pair (#731)
Browse files Browse the repository at this point in the history
* Add round trip time measurement to candidate pair

Use the round trip time measurement to populate RTT fields in
CandidatePairStats.

Atomic and tests

* Use int64 nanosecnods to make atomic easier
  • Loading branch information
boks1971 committed Sep 16, 2024
1 parent 277014e commit 2d9be9b
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 8 deletions.
6 changes: 3 additions & 3 deletions agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -973,16 +973,16 @@ func (a *Agent) invalidatePendingBindingRequests(filterTime time.Time) {

// Assert that the passed TransactionID is in our pendingBindingRequests and returns the destination
// If the bindingRequest was valid remove it from our pending cache
func (a *Agent) handleInboundBindingSuccess(id [stun.TransactionIDSize]byte) (bool, *bindingRequest) {
func (a *Agent) handleInboundBindingSuccess(id [stun.TransactionIDSize]byte) (bool, *bindingRequest, time.Duration) {
a.invalidatePendingBindingRequests(time.Now())
for i := range a.pendingBindingRequests {
if a.pendingBindingRequests[i].transactionID == id {
validBindingRequest := a.pendingBindingRequests[i]
a.pendingBindingRequests = append(a.pendingBindingRequests[:i], a.pendingBindingRequests[i+1:]...)
return true, &validBindingRequest
return true, &validBindingRequest, time.Since(validBindingRequest.timestamp)
}
}
return false, nil
return false, nil, 0
}

// handleInbound processes STUN traffic from a remote candidate
Expand Down
6 changes: 3 additions & 3 deletions agent_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,14 @@ func (a *Agent) GetCandidatePairsStats() []CandidatePairStats {
// FirstRequestTimestamp time.Time
// LastRequestTimestamp time.Time
// LastResponseTimestamp time.Time
// TotalRoundTripTime float64
// CurrentRoundTripTime float64
TotalRoundTripTime: cp.TotalRoundTripTime(),
CurrentRoundTripTime: cp.CurrentRoundTripTime(),
// AvailableOutgoingBitrate float64
// AvailableIncomingBitrate float64
// CircuitBreakerTriggerCount uint32
// RequestsReceived uint64
// RequestsSent uint64
// ResponsesReceived uint64
ResponsesReceived: cp.ResponsesReceived(),
// ResponsesSent uint64
// RetransmissionsReceived uint64
// RetransmissionsSent uint64
Expand Down
21 changes: 21 additions & 0 deletions agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -721,6 +721,10 @@ func TestCandidatePairStats(t *testing.T) {
p := a.findPair(hostLocal, prflxRemote)
p.state = CandidatePairStateFailed

for i := 0; i < 10; i++ {
p.UpdateRoundTripTime(time.Duration(i+1) * time.Second)
}

stats := a.GetCandidatePairsStats()
if len(stats) != 4 {
t.Fatal("expected 4 candidate pairs stats")
Expand Down Expand Up @@ -766,6 +770,23 @@ func TestCandidatePairStats(t *testing.T) {
t.Fatalf("expected host-prflx pair to have state failed, it has state %s instead",
prflxPairStat.State.String())
}

expectedCurrentRoundTripTime := time.Duration(10) * time.Second
if prflxPairStat.CurrentRoundTripTime != expectedCurrentRoundTripTime.Seconds() {
t.Fatalf("expected current round trip time to be %f, it is %f instead",
expectedCurrentRoundTripTime.Seconds(), prflxPairStat.CurrentRoundTripTime)
}

expectedTotalRoundTripTime := time.Duration(55) * time.Second
if prflxPairStat.TotalRoundTripTime != expectedTotalRoundTripTime.Seconds() {
t.Fatalf("expected total round trip time to be %f, it is %f instead",
expectedTotalRoundTripTime.Seconds(), prflxPairStat.TotalRoundTripTime)
}

if prflxPairStat.ResponsesReceived != 10 {
t.Fatalf("expected responses received to be 10, it is %d instead",
prflxPairStat.ResponsesReceived)
}
}

func TestLocalCandidateStats(t *testing.T) {
Expand Down
34 changes: 34 additions & 0 deletions candidatepair.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ package ice

import (
"fmt"
"sync/atomic"
"time"

"github.com/pion/stun/v3"
)
Expand All @@ -28,6 +30,11 @@ type CandidatePair struct {
state CandidatePairState
nominated bool
nominateOnBindingSuccess bool

// stats
currentRoundTripTime int64 // in ns
totalRoundTripTime int64 // in ns
responsesReceived uint64
}

func (p *CandidatePair) String() string {
Expand Down Expand Up @@ -100,3 +107,30 @@ func (a *Agent) sendSTUN(msg *stun.Message, local, remote Candidate) {
a.log.Tracef("Failed to send STUN message: %s", err)
}
}

// UpdateRoundTripTime sets the current round time of this pair and
// accumulates total round trip time and responses received
func (p *CandidatePair) UpdateRoundTripTime(rtt time.Duration) {
rttNs := rtt.Nanoseconds()
atomic.StoreInt64(&p.currentRoundTripTime, rttNs)
atomic.AddInt64(&p.totalRoundTripTime, rttNs)
atomic.AddUint64(&p.responsesReceived, 1)
}

// CurrentRoundTripTime returns the current round trip time in seconds
// https://www.w3.org/TR/webrtc-stats/#dom-rtcicecandidatepairstats-currentroundtriptime
func (p *CandidatePair) CurrentRoundTripTime() float64 {
return time.Duration(atomic.LoadInt64(&p.currentRoundTripTime)).Seconds()
}

// TotalRoundTripTime returns the current round trip time in seconds
// https://www.w3.org/TR/webrtc-stats/#dom-rtcicecandidatepairstats-totalroundtriptime
func (p *CandidatePair) TotalRoundTripTime() float64 {
return time.Duration(atomic.LoadInt64(&p.totalRoundTripTime)).Seconds()
}

// ResponsesReceived returns the total number of connectivity responses received
// https://www.w3.org/TR/webrtc-stats/#dom-rtcicecandidatepairstats-responsesreceived
func (p *CandidatePair) ResponsesReceived() uint64 {
return atomic.LoadUint64(&p.responsesReceived)
}
8 changes: 6 additions & 2 deletions selection.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func (s *controllingSelector) HandleBindingRequest(m *stun.Message, local, remot
}

func (s *controllingSelector) HandleSuccessResponse(m *stun.Message, local, remote Candidate, remoteAddr net.Addr) {
ok, pendingRequest := s.agent.handleInboundBindingSuccess(m.TransactionID)
ok, pendingRequest, rtt := s.agent.handleInboundBindingSuccess(m.TransactionID)
if !ok {
s.log.Warnf("Discard message from (%s), unknown TransactionID 0x%x", remote, m.TransactionID)
return
Expand Down Expand Up @@ -149,6 +149,8 @@ func (s *controllingSelector) HandleSuccessResponse(m *stun.Message, local, remo
if pendingRequest.isUseCandidate && s.agent.getSelectedPair() == nil {
s.agent.setSelectedPair(p)
}

p.UpdateRoundTripTime(rtt)
}

func (s *controllingSelector) PingCandidate(local, remote Candidate) {
Expand Down Expand Up @@ -211,7 +213,7 @@ func (s *controlledSelector) HandleSuccessResponse(m *stun.Message, local, remot
// request with an appropriate error code response (e.g., 400)
// [RFC5389].

ok, pendingRequest := s.agent.handleInboundBindingSuccess(m.TransactionID)
ok, pendingRequest, rtt := s.agent.handleInboundBindingSuccess(m.TransactionID)
if !ok {
s.log.Warnf("Discard message from (%s), unknown TransactionID 0x%x", remote, m.TransactionID)
return
Expand Down Expand Up @@ -245,6 +247,8 @@ func (s *controlledSelector) HandleSuccessResponse(m *stun.Message, local, remot
s.log.Tracef("Ignore nominate new pair %s, already nominated pair %s", p, selectedPair)
}
}

p.UpdateRoundTripTime(rtt)
}

func (s *controlledSelector) HandleBindingRequest(m *stun.Message, local, remote Candidate) {
Expand Down

0 comments on commit 2d9be9b

Please sign in to comment.