Skip to content

Commit

Permalink
Fix Chrome datachannel stuck at closing
Browse files Browse the repository at this point in the history
Fix #178.
  • Loading branch information
jerry-tao committed Oct 14, 2022
1 parent d0b7cf3 commit e549ed0
Show file tree
Hide file tree
Showing 3 changed files with 187 additions and 134 deletions.
8 changes: 7 additions & 1 deletion association.go
Original file line number Diff line number Diff line change
Expand Up @@ -1985,6 +1985,7 @@ func (a *Association) sendResetRequest(streamIdentifier uint16) error {
func (a *Association) handleReconfigParam(raw param) (*packet, error) {
switch p := raw.(type) {
case *paramOutgoingResetRequest:
a.log.Tracef("[%s] handleReconfigParam (OutgoingResetRequest)", a.name)
a.reconfigRequests[p.reconfigRequestSequenceNumber] = p
resp := a.resetStreamsIfAny(p)
if resp != nil {
Expand All @@ -1993,6 +1994,7 @@ func (a *Association) handleReconfigParam(raw param) (*packet, error) {
return nil, nil //nolint:nilnil

case *paramReconfigResponse:
a.log.Tracef("[%s] handleReconfigParam (ReconfigResponse)", a.name)
delete(a.reconfigs, p.reconfigResponseSequenceNumber)
if len(a.reconfigs) == 0 {
a.tReconfig.stop()
Expand All @@ -2014,7 +2016,11 @@ func (a *Association) resetStreamsIfAny(p *paramOutgoingResetRequest) *packet {
if !ok {
continue
}
a.unregisterStream(s, io.EOF)
a.lock.Unlock()
s.onInboundStreamReset()
a.lock.Lock()
a.log.Debugf("[%s] deleting stream %d", a.name, id)
delete(a.streams, s.streamIdentifier)
}
delete(a.reconfigRequests, p.reconfigRequestSequenceNumber)
} else {
Expand Down
111 changes: 77 additions & 34 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,32 @@ const (
ReliabilityTypeTimed byte = 2
)

// StreamState is an enum for SCTP Stream state field
// This field identifies the state of stream.
type StreamState int

// StreamState enums
const (
StreamStateOpen StreamState = iota // Stream object starts with StreamStateOpen
StreamStateClosing // Outgoing stream is being reset
StreamStateClosed // Stream has been closed
)

func (ss StreamState) String() string {
switch ss {
case StreamStateOpen:
return "open"
case StreamStateClosing:
return "closing"
case StreamStateClosed:
return "closed"
}
return "unknown"
}

var (
errOutboundPacketTooLarge = errors.New("outbound packet larger than maximum message size")
errStreamClosed = errors.New("Stream closed")
errStreamClosed = errors.New("stream closed")
)

// Stream represents an SCTP stream
Expand All @@ -35,13 +58,13 @@ type Stream struct {
sequenceNumber uint16
readNotifier *sync.Cond
readErr error
writeErr error
unordered bool
reliabilityType byte
reliabilityValue uint32
bufferedAmount uint64
bufferedAmountLow uint64
onBufferedAmountLow func()
state StreamState
log logging.LeveledLogger
name string
}
Expand Down Expand Up @@ -178,32 +201,23 @@ func (s *Stream) Write(p []byte) (n int, err error) {
}

// WriteSCTP writes len(p) bytes from p to the DTLS connection
func (s *Stream) WriteSCTP(p []byte, ppi PayloadProtocolIdentifier) (n int, err error) {
func (s *Stream) WriteSCTP(p []byte, ppi PayloadProtocolIdentifier) (int, error) {
maxMessageSize := s.association.MaxMessageSize()
if len(p) > int(maxMessageSize) {
return 0, fmt.Errorf("%w: %v", errOutboundPacketTooLarge, math.MaxUint16)
}

switch s.association.getState() {
case shutdownSent, shutdownAckSent, shutdownPending, shutdownReceived:
s.lock.Lock()
if s.writeErr == nil {
s.writeErr = errStreamClosed
}
s.lock.Unlock()
default:
if s.State() != StreamStateOpen {
return 0, errStreamClosed
}

s.lock.RLock()
err = s.writeErr
s.lock.RUnlock()
chunks := s.packetize(p, ppi)
n := len(p)
err := s.association.sendPayloadData(chunks)
if err != nil {
return 0, err
return n, errStreamClosed
}

chunks := s.packetize(p, ppi)

return len(p), s.association.sendPayloadData(chunks)
return n, nil
}

func (s *Stream) packetize(raw []byte, ppi PayloadProtocolIdentifier) []*chunkPayloadData {
Expand Down Expand Up @@ -267,26 +281,23 @@ func (s *Stream) packetize(raw []byte, ppi PayloadProtocolIdentifier) []*chunkPa
// Close closes the write-direction of the stream.
// Future calls to Write are not permitted after calling Close.
func (s *Stream) Close() error {
if sid, isOpen := func() (uint16, bool) {
if sid, resetOutbound := func() (uint16, bool) {
s.lock.Lock()
defer s.lock.Unlock()

isOpen := true
if s.writeErr == nil {
s.writeErr = errStreamClosed
} else {
isOpen = false
}
s.log.Debugf("[%s] Close: state=%s", s.name, s.state.String())

if s.readErr == nil {
s.readErr = io.EOF
} else {
isOpen = false
if s.state == StreamStateOpen {
if s.readErr == nil {
s.state = StreamStateClosing
} else {
s.state = StreamStateClosed
}
s.log.Debugf("[%s] state change: open => %s", s.name, s.state.String())
return s.streamIdentifier, true
}
s.readNotifier.Broadcast() // broadcast regardless

return s.streamIdentifier, isOpen
}(); isOpen {
return s.streamIdentifier, false
}(); resetOutbound {
// Reset the outgoing stream
// https://tools.ietf.org/html/rfc6525
return s.association.sendResetRequest(sid)
Expand Down Expand Up @@ -365,3 +376,35 @@ func (s *Stream) getNumBytesInReassemblyQueue() int {
// No lock is required as it reads the size with atomic load function.
return s.reassemblyQueue.getNumBytes()
}

func (s *Stream) onInboundStreamReset() {
s.lock.Lock()
defer s.lock.Unlock()

s.log.Debugf("[%s] onInboundStreamReset: state=%s", s.name, s.state.String())

// No more inbound data to read. Unblock the read with io.EOF.
// This should cause DCEP layer (datachannel package) to call Close() which
// will reset outgoing stream also.

// See RFC 8831 section 6.7:
// if one side decides to close the data channel, it resets the corresponding
// outgoing stream. When the peer sees that an incoming stream was
// reset, it also resets its corresponding outgoing stream. Once this
// is completed, the data channel is closed.

s.readErr = io.EOF
s.readNotifier.Broadcast()

if s.state == StreamStateClosing {
s.log.Debugf("[%s] state change: closing => closed", s.name)
s.state = StreamStateClosed
}
}

// State return the stream state.
func (s *Stream) State() StreamState {
s.lock.RLock()
defer s.lock.RUnlock()
return s.state
}
Loading

0 comments on commit e549ed0

Please sign in to comment.