Skip to content

Commit

Permalink
fix: dont log errors when the transfer event queue is full (#1291)
Browse files Browse the repository at this point in the history
  • Loading branch information
dirkmc authored and jacobheun committed Mar 17, 2023
1 parent 209343e commit 473b5c0
Showing 1 changed file with 30 additions and 26 deletions.
56 changes: 30 additions & 26 deletions transport/httptransport/http_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func (h *httpTransport) Execute(ctx context.Context, transportInfo []byte, dealI

cleanupFns := []func(){
cancel,
func() { close(t.eventCh) },
func() { t.closeEventChannel(tctx) },
}
cleanup := func() {
for _, fn := range cleanupFns {
Expand Down Expand Up @@ -178,12 +178,7 @@ func (h *httpTransport) Execute(ctx context.Context, transportInfo []byte, dealI
if fileSize == dealInfo.DealSize {
defer cleanup()

if err := t.emitEvent(tctx, types.TransportEvent{
NBytesReceived: fileSize,
}, dealInfo.DealUuid); err != nil {
return nil, fmt.Errorf("failed to publish transfer completion event, id: %s, err: %w", t.dealInfo.DealUuid, err)
}

t.emitEvent(types.TransportEvent{NBytesReceived: fileSize})
h.dl.Infow(duuid, "file size is already equal to deal size, returning")
return t, nil
}
Expand All @@ -195,11 +190,7 @@ func (h *httpTransport) Execute(ctx context.Context, transportInfo []byte, dealI
defer cleanup()

if err := t.execute(tctx); err != nil {
if err := t.emitEvent(tctx, types.TransportEvent{
Error: err,
}, dealInfo.DealUuid); err != nil {
t.dl.LogError(duuid, "failed to publish transport error", err)
}
t.emitEvent(types.TransportEvent{Error: err})
}
}()

Expand All @@ -212,6 +203,7 @@ type transfer struct {
cancel context.CancelFunc

eventCh chan types.TransportEvent
lastEvt *types.TransportEvent

tInfo *types.HttpRequest
dealInfo *types.TransportDealInfo
Expand All @@ -226,15 +218,6 @@ type transfer struct {
dl *logs.DealLogger
}

func (t *transfer) emitEvent(ctx context.Context, evt types.TransportEvent, id uuid.UUID) error {
select {
case t.eventCh <- evt:
return nil
default:
return fmt.Errorf("dropping event %+v as channel is full for deal id %s", evt, id)
}
}

func (t *transfer) execute(ctx context.Context) error {
duuid := t.dealInfo.DealUuid
for {
Expand Down Expand Up @@ -387,11 +370,7 @@ func (t *transfer) doHttp(ctx context.Context, req *http.Request, dst io.Writer,
t.nBytesReceived = t.nBytesReceived + int64(nw)

// emit event updating the number of bytes received
if err := t.emitEvent(ctx, types.TransportEvent{
NBytesReceived: t.nBytesReceived,
}, t.dealInfo.DealUuid); err != nil {
t.dl.LogError(duid, "failed to publish transport event", err)
}
t.emitEvent(types.TransportEvent{NBytesReceived: t.nBytesReceived})
}
// the http stream we're reading from has sent us an EOF, nothing to do here.
if readErr == io.EOF {
Expand Down Expand Up @@ -420,3 +399,28 @@ func (t *transfer) Close() {
func (t *transfer) Sub() chan types.TransportEvent {
return t.eventCh
}

func (t *transfer) emitEvent(evt types.TransportEvent) {
t.lastEvt = nil
select {
case t.eventCh <- evt:
default:
// If it wasn't possible to send the event because the channel is full,
// save it so that we can ensure it gets sent before the channel is closed.
// A new event always supersedes an older event, so if there is another
// event after this one, it will simply over-write this one.
t.lastEvt = &evt
}
}

func (t *transfer) closeEventChannel(ctx context.Context) {
// If there was an event that wasn't sent because the channel was full,
// ensure that it gets sent before close
if t.lastEvt != nil {
select {
case <-ctx.Done():
case t.eventCh <- *t.lastEvt:
}
}
close(t.eventCh)
}

0 comments on commit 473b5c0

Please sign in to comment.