Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Chrome datachannel stuck at closing #238

Merged
merged 1 commit into from
Oct 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion association.go
Original file line number Diff line number Diff line change
Expand Up @@ -1985,6 +1985,7 @@ func (a *Association) sendResetRequest(streamIdentifier uint16) error {
func (a *Association) handleReconfigParam(raw param) (*packet, error) {
switch p := raw.(type) {
case *paramOutgoingResetRequest:
a.log.Tracef("[%s] handleReconfigParam (OutgoingResetRequest)", a.name)
a.reconfigRequests[p.reconfigRequestSequenceNumber] = p
resp := a.resetStreamsIfAny(p)
if resp != nil {
Expand All @@ -1993,6 +1994,7 @@ func (a *Association) handleReconfigParam(raw param) (*packet, error) {
return nil, nil //nolint:nilnil

case *paramReconfigResponse:
a.log.Tracef("[%s] handleReconfigParam (ReconfigResponse)", a.name)
delete(a.reconfigs, p.reconfigResponseSequenceNumber)
if len(a.reconfigs) == 0 {
a.tReconfig.stop()
Expand All @@ -2014,7 +2016,11 @@ func (a *Association) resetStreamsIfAny(p *paramOutgoingResetRequest) *packet {
if !ok {
continue
}
a.unregisterStream(s, io.EOF)
a.lock.Unlock()
s.onInboundStreamReset()
a.lock.Lock()
a.log.Debugf("[%s] deleting stream %d", a.name, id)
delete(a.streams, s.streamIdentifier)
}
delete(a.reconfigRequests, p.reconfigRequestSequenceNumber)
} else {
Expand Down
111 changes: 77 additions & 34 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,32 @@ const (
ReliabilityTypeTimed byte = 2
)

// StreamState is an enum for SCTP Stream state field
// This field identifies the state of stream.
type StreamState int

// StreamState enums
const (
StreamStateOpen StreamState = iota // Stream object starts with StreamStateOpen
StreamStateClosing // Outgoing stream is being reset
StreamStateClosed // Stream has been closed
)

func (ss StreamState) String() string {
switch ss {
case StreamStateOpen:
return "open"
case StreamStateClosing:
return "closing"
case StreamStateClosed:
return "closed"
}
return "unknown"
}

var (
errOutboundPacketTooLarge = errors.New("outbound packet larger than maximum message size")
errStreamClosed = errors.New("Stream closed")
errStreamClosed = errors.New("stream closed")
)

// Stream represents an SCTP stream
Expand All @@ -35,13 +58,13 @@ type Stream struct {
sequenceNumber uint16
readNotifier *sync.Cond
readErr error
writeErr error
unordered bool
reliabilityType byte
reliabilityValue uint32
bufferedAmount uint64
bufferedAmountLow uint64
onBufferedAmountLow func()
state StreamState
log logging.LeveledLogger
name string
}
Expand Down Expand Up @@ -178,32 +201,23 @@ func (s *Stream) Write(p []byte) (n int, err error) {
}

// WriteSCTP writes len(p) bytes from p to the DTLS connection
func (s *Stream) WriteSCTP(p []byte, ppi PayloadProtocolIdentifier) (n int, err error) {
func (s *Stream) WriteSCTP(p []byte, ppi PayloadProtocolIdentifier) (int, error) {
maxMessageSize := s.association.MaxMessageSize()
if len(p) > int(maxMessageSize) {
return 0, fmt.Errorf("%w: %v", errOutboundPacketTooLarge, math.MaxUint16)
}

switch s.association.getState() {
case shutdownSent, shutdownAckSent, shutdownPending, shutdownReceived:
s.lock.Lock()
if s.writeErr == nil {
s.writeErr = errStreamClosed
}
s.lock.Unlock()
default:
if s.State() != StreamStateOpen {
return 0, errStreamClosed
}

s.lock.RLock()
err = s.writeErr
s.lock.RUnlock()
chunks := s.packetize(p, ppi)
n := len(p)
err := s.association.sendPayloadData(chunks)
if err != nil {
return 0, err
return n, errStreamClosed
}

chunks := s.packetize(p, ppi)

return len(p), s.association.sendPayloadData(chunks)
return n, nil
}

func (s *Stream) packetize(raw []byte, ppi PayloadProtocolIdentifier) []*chunkPayloadData {
Expand Down Expand Up @@ -267,26 +281,23 @@ func (s *Stream) packetize(raw []byte, ppi PayloadProtocolIdentifier) []*chunkPa
// Close closes the write-direction of the stream.
// Future calls to Write are not permitted after calling Close.
func (s *Stream) Close() error {
if sid, isOpen := func() (uint16, bool) {
if sid, resetOutbound := func() (uint16, bool) {
s.lock.Lock()
defer s.lock.Unlock()

isOpen := true
if s.writeErr == nil {
s.writeErr = errStreamClosed
} else {
isOpen = false
}
s.log.Debugf("[%s] Close: state=%s", s.name, s.state.String())

if s.readErr == nil {
s.readErr = io.EOF
} else {
isOpen = false
if s.state == StreamStateOpen {
if s.readErr == nil {
s.state = StreamStateClosing
} else {
s.state = StreamStateClosed
}
s.log.Debugf("[%s] state change: open => %s", s.name, s.state.String())
return s.streamIdentifier, true
}
s.readNotifier.Broadcast() // broadcast regardless

return s.streamIdentifier, isOpen
}(); isOpen {
return s.streamIdentifier, false
}(); resetOutbound {
// Reset the outgoing stream
// https://tools.ietf.org/html/rfc6525
return s.association.sendResetRequest(sid)
Expand Down Expand Up @@ -365,3 +376,35 @@ func (s *Stream) getNumBytesInReassemblyQueue() int {
// No lock is required as it reads the size with atomic load function.
return s.reassemblyQueue.getNumBytes()
}

func (s *Stream) onInboundStreamReset() {
s.lock.Lock()
defer s.lock.Unlock()

s.log.Debugf("[%s] onInboundStreamReset: state=%s", s.name, s.state.String())

// No more inbound data to read. Unblock the read with io.EOF.
// This should cause DCEP layer (datachannel package) to call Close() which
// will reset outgoing stream also.

// See RFC 8831 section 6.7:
// if one side decides to close the data channel, it resets the corresponding
// outgoing stream. When the peer sees that an incoming stream was
// reset, it also resets its corresponding outgoing stream. Once this
// is completed, the data channel is closed.

s.readErr = io.EOF
s.readNotifier.Broadcast()

if s.state == StreamStateClosing {
s.log.Debugf("[%s] state change: closing => closed", s.name)
s.state = StreamStateClosed
}
}

// State return the stream state.
func (s *Stream) State() StreamState {
s.lock.RLock()
defer s.lock.RUnlock()
return s.state
}
Loading