Skip to content

Commit

Permalink
Revert "Remove pending buffer when stream closed"
Browse files Browse the repository at this point in the history
This reverts commit c0159aa.
  • Loading branch information
edaniels committed Dec 12, 2022
1 parent 6e962c6 commit f31b934
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 151 deletions.
25 changes: 6 additions & 19 deletions association.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,9 +221,8 @@ type Association struct {
delayedAckTriggered bool
immediateAckTriggered bool

name string
log logging.LeveledLogger
streamVersion uint32
name string
log logging.LeveledLogger
}

// Config collects the arguments to createAssociation construction into
Expand Down Expand Up @@ -1369,7 +1368,6 @@ func (a *Association) createStream(streamIdentifier uint16, accept bool) *Stream
streamIdentifier: streamIdentifier,
reassemblyQueue: newReassemblyQueue(streamIdentifier),
log: a.log,
version: atomic.AddUint32(&a.streamVersion, 1),
name: fmt.Sprintf("%d:%s", streamIdentifier, a.name),
}

Expand Down Expand Up @@ -2090,14 +2088,10 @@ func (a *Association) popPendingDataChunksToSend() ([]*chunkPayloadData, []uint1
dataLen := uint32(len(c.userData))
if dataLen == 0 {
sisToReset = append(sisToReset, c.streamIdentifier)
a.popPendingDataChunksToDrop(c)
continue
}

s, ok := a.streams[c.streamIdentifier]

if !ok || s.State() > StreamStateOpen || s.version != c.streamVersion {
a.popPendingDataChunksToDrop(c)
err := a.pendingQueue.pop(c)
if err != nil {
a.log.Errorf("failed to pop from pending queue: %s", err.Error())
}
continue
}

Expand Down Expand Up @@ -2129,13 +2123,6 @@ func (a *Association) popPendingDataChunksToSend() ([]*chunkPayloadData, []uint1
return chunks, sisToReset
}

func (a *Association) popPendingDataChunksToDrop(c *chunkPayloadData) {
err := a.pendingQueue.pop(c)
if err != nil {
a.log.Errorf("failed to pop from pending queue: %s", err.Error())
}
}

// bundleDataChunksIntoPackets packs DATA chunks into packets. It tries to bundle
// DATA chunks into a packet so long as the resulting packet size does not exceed
// the path MTU.
Expand Down
3 changes: 1 addition & 2 deletions chunk_payload_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,7 @@ type chunkPayloadData struct {
// chunk is still in the inflight queue
retransmit bool

head *chunkPayloadData // link to the head of the fragment
streamVersion uint32
head *chunkPayloadData // link to the head of the fragment
}

const (
Expand Down
51 changes: 20 additions & 31 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ const (

// StreamState is an enum for SCTP Stream state field
// This field identifies the state of stream.
type StreamState int32
type StreamState int

// StreamState enums
const (
StreamStateOpen StreamState = iota // Stream object starts with StreamStateOpen
StreamStateClosing // Stream is closed by remote
StreamStateClosing // Outgoing stream is being reset
StreamStateClosed // Stream has been closed
)

Expand Down Expand Up @@ -71,7 +71,6 @@ type Stream struct {
state StreamState
log logging.LeveledLogger
name string
version uint32
}

// StreamIdentifier returns the Stream identifier associated to the stream.
Expand Down Expand Up @@ -297,7 +296,6 @@ func (s *Stream) packetize(raw []byte, ppi PayloadProtocolIdentifier) []*chunkPa
copy(userData, raw[i:i+fragmentSize])

chunk := &chunkPayloadData{
streamVersion: s.version,
streamIdentifier: s.streamIdentifier,
userData: userData,
unordered: unordered,
Expand Down Expand Up @@ -340,22 +338,16 @@ func (s *Stream) Close() error {
s.lock.Lock()
defer s.lock.Unlock()

state := s.State()
s.log.Debugf("[%s] Close: state=%s", s.name, state.String())
s.log.Debugf("[%s] Close: state=%s", s.name, s.state.String())

switch state {
case StreamStateOpen:
s.SetState(StreamStateClosed)
s.log.Debugf("[%s] state change: open => closed", s.name)
s.readErr = io.EOF
s.readNotifier.Broadcast()
return s.streamIdentifier, true
case StreamStateClosing:
s.SetState(StreamStateClosed)
s.log.Debugf("[%s] state change: closing => closed", s.name)
if s.state == StreamStateOpen {
if s.readErr == nil {
s.state = StreamStateClosing
} else {
s.state = StreamStateClosed
}
s.log.Debugf("[%s] state change: open => %s", s.name, s.state.String())
return s.streamIdentifier, true
case StreamStateClosed:
return s.streamIdentifier, false
}
return s.streamIdentifier, false
}(); resetOutbound {
Expand Down Expand Up @@ -442,8 +434,7 @@ func (s *Stream) onInboundStreamReset() {
s.lock.Lock()
defer s.lock.Unlock()

state := s.State()
s.log.Debugf("[%s] onInboundStreamReset: state=%s", s.name, state.String())
s.log.Debugf("[%s] onInboundStreamReset: state=%s", s.name, s.state.String())

// No more inbound data to read. Unblock the read with io.EOF.
// This should cause DCEP layer (datachannel package) to call Close() which
Expand All @@ -454,21 +445,19 @@ func (s *Stream) onInboundStreamReset() {
// outgoing stream. When the peer sees that an incoming stream was
// reset, it also resets its corresponding outgoing stream. Once this
// is completed, the data channel is closed.
if state == StreamStateOpen {
s.log.Debugf("[%s] state change: open => closing", s.name)
s.SetState(StreamStateClosing)
}

s.readErr = io.EOF
s.readNotifier.Broadcast()
}

// State atomically returns the stream state.
func (s *Stream) State() StreamState {
return StreamState(atomic.LoadInt32((*int32)(&s.state)))
if s.state == StreamStateClosing {
s.log.Debugf("[%s] state change: closing => closed", s.name)
s.state = StreamStateClosed
}
}

// SetState atomically sets the stream state.
func (s *Stream) SetState(newState StreamState) {
atomic.StoreInt32((*int32)(&s.state), int32(newState))
// State return the stream state.
func (s *Stream) State() StreamState {
s.lock.RLock()
defer s.lock.RUnlock()
return s.state
}
Loading

0 comments on commit f31b934

Please sign in to comment.