Skip to content

Commit

Permalink
feature: add internal connectionReader readAtLeast error information (a…
Browse files Browse the repository at this point in the history
…pache#237)

add internal connectionReader readAtLeast error information
these error information may help to solve apache#200
  • Loading branch information
mileschao authored May 15, 2020
1 parent 2dad2b3 commit b04a842
Showing 1 changed file with 7 additions and 7 deletions.
14 changes: 7 additions & 7 deletions pulsar/internal/connection_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ func (r *connectionReader) readSingleCommand() (cmd *pb.BaseCommand, headersAndP
// If the buffer is empty, just go back to write at the beginning
r.buffer.Clear()
}
if !r.readAtLeast(4) {
return nil, nil, errors.New("Short read when reading frame size")
if err := r.readAtLeast(4); err != nil {
return nil, nil, errors.Errorf("Short read when reading frame size: %s", err)
}
}

Expand All @@ -82,8 +82,8 @@ func (r *connectionReader) readSingleCommand() (cmd *pb.BaseCommand, headersAndP
// Next, we read the rest of the frame
if r.buffer.ReadableBytes() < frameSize {
remainingBytes := frameSize - r.buffer.ReadableBytes()
if !r.readAtLeast(remainingBytes) {
return nil, nil, errors.New("Short read when reading frame")
if err := r.readAtLeast(remainingBytes); err != nil {
return nil, nil, errors.Errorf("Short read when reading frame: %s", err)
}
}

Expand All @@ -103,7 +103,7 @@ func (r *connectionReader) readSingleCommand() (cmd *pb.BaseCommand, headersAndP
return cmd, headersAndPayload, nil
}

func (r *connectionReader) readAtLeast(size uint32) (ok bool) {
func (r *connectionReader) readAtLeast(size uint32) error {
if r.buffer.WritableBytes() < size {
// There's not enough room in the current buffer to read the requested amount of data
totalFrameSize := r.buffer.ReadableBytes() + size
Expand All @@ -120,11 +120,11 @@ func (r *connectionReader) readAtLeast(size uint32) (ok bool) {
n, err := io.ReadAtLeast(r.cnx.cnx, r.buffer.WritableSlice(), int(size))
if err != nil {
r.cnx.TriggerClose()
return false
return err
}

r.buffer.WrittenBytes(uint32(n))
return true
return nil
}

func (r *connectionReader) deserializeCmd(data []byte) (*pb.BaseCommand, error) {
Expand Down

0 comments on commit b04a842

Please sign in to comment.