diff --git a/transport/httptransport/http_transport.go b/transport/httptransport/http_transport.go index 9cb2c24e5..c66d4c598 100644 --- a/transport/httptransport/http_transport.go +++ b/transport/httptransport/http_transport.go @@ -140,7 +140,7 @@ func (h *httpTransport) Execute(ctx context.Context, transportInfo []byte, dealI cleanupFns := []func(){ cancel, - func() { close(t.eventCh) }, + func() { t.closeEventChannel(tctx) }, } cleanup := func() { for _, fn := range cleanupFns { @@ -178,12 +178,7 @@ func (h *httpTransport) Execute(ctx context.Context, transportInfo []byte, dealI if fileSize == dealInfo.DealSize { defer cleanup() - if err := t.emitEvent(tctx, types.TransportEvent{ - NBytesReceived: fileSize, - }, dealInfo.DealUuid); err != nil { - return nil, fmt.Errorf("failed to publish transfer completion event, id: %s, err: %w", t.dealInfo.DealUuid, err) - } - + t.emitEvent(types.TransportEvent{NBytesReceived: fileSize}) h.dl.Infow(duuid, "file size is already equal to deal size, returning") return t, nil } @@ -195,11 +190,7 @@ func (h *httpTransport) Execute(ctx context.Context, transportInfo []byte, dealI defer cleanup() if err := t.execute(tctx); err != nil { - if err := t.emitEvent(tctx, types.TransportEvent{ - Error: err, - }, dealInfo.DealUuid); err != nil { - t.dl.LogError(duuid, "failed to publish transport error", err) - } + t.emitEvent(types.TransportEvent{Error: err}) } }() @@ -212,6 +203,7 @@ type transfer struct { cancel context.CancelFunc eventCh chan types.TransportEvent + lastEvt *types.TransportEvent tInfo *types.HttpRequest dealInfo *types.TransportDealInfo @@ -226,15 +218,6 @@ type transfer struct { dl *logs.DealLogger } -func (t *transfer) emitEvent(ctx context.Context, evt types.TransportEvent, id uuid.UUID) error { - select { - case t.eventCh <- evt: - return nil - default: - return fmt.Errorf("dropping event %+v as channel is full for deal id %s", evt, id) - } -} - func (t *transfer) execute(ctx context.Context) error { duuid := t.dealInfo.DealUuid for { @@ -387,11 +370,7 @@ func (t *transfer) doHttp(ctx context.Context, req *http.Request, dst io.Writer, t.nBytesReceived = t.nBytesReceived + int64(nw) // emit event updating the number of bytes received - if err := t.emitEvent(ctx, types.TransportEvent{ - NBytesReceived: t.nBytesReceived, - }, t.dealInfo.DealUuid); err != nil { - t.dl.LogError(duid, "failed to publish transport event", err) - } + t.emitEvent(types.TransportEvent{NBytesReceived: t.nBytesReceived}) } // the http stream we're reading from has sent us an EOF, nothing to do here. if readErr == io.EOF { @@ -420,3 +399,28 @@ func (t *transfer) Close() { func (t *transfer) Sub() chan types.TransportEvent { return t.eventCh } + +func (t *transfer) emitEvent(evt types.TransportEvent) { + t.lastEvt = nil + select { + case t.eventCh <- evt: + default: + // If it wasn't possible to send the event because the channel is full, + // save it so that we can ensure it gets sent before the channel is closed. + // A new event always supersedes an older event, so if there is another + // event after this one, it will simply over-write this one. + t.lastEvt = &evt + } +} + +func (t *transfer) closeEventChannel(ctx context.Context) { + // If there was an event that wasn't sent because the channel was full, + // ensure that it gets sent before close + if t.lastEvt != nil { + select { + case <-ctx.Done(): + case t.eventCh <- *t.lastEvt: + } + } + close(t.eventCh) +}