Skip to content

Commit

Permalink
Version 1.5.3 (#42)
Browse files Browse the repository at this point in the history
* fix pipe read
* fix unit test
* fix changelog typo
  • Loading branch information
felixhao authored Jan 2, 2019
1 parent be21c0f commit 02580c9
Show file tree
Hide file tree
Showing 9 changed files with 61 additions and 19 deletions.
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
# Overlord

## Version 1.5.3
1. fix pipe read when one err.

## Version 1.5.2
1. max redirects 5.

## Version 1.5.1
1. reset sub message only in nedd.
1. reset sub message only in need.

## Version 1.5.0
1. refactor message pipeline.
Expand Down
2 changes: 1 addition & 1 deletion proto/memcache/binary/node_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ func (n *nodeConn) Read(m *proto.Message) (err error) {
err = errors.WithStack(ErrAssertReq)
return
}
mcr.data = mcr.data[:0]
REREAD:
var bs []byte
if bs, err = n.br.ReadExact(requestHeaderLen); err == bufio.ErrBufferFull {
Expand Down Expand Up @@ -120,7 +121,6 @@ REREADData:
err = errors.WithStack(err)
return
}
mcr.data = mcr.data[:0]
mcr.data = append(mcr.data, data...)
return
}
Expand Down
3 changes: 1 addition & 2 deletions proto/memcache/node_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ func (n *nodeConn) Read(m *proto.Message) (err error) {
err = errors.WithStack(ErrAssertReq)
return
}
mcr.data = mcr.data[:0]
REREAD:
var bs []byte
if bs, err = n.br.ReadLine(); err == bufio.ErrBufferFull {
Expand All @@ -97,7 +98,6 @@ REREAD:
return
}
if _, ok := withValueTypes[mcr.rTp]; !ok || bytes.Equal(bs, endBytes) || bytes.Equal(bs, errorBytes) {
mcr.data = mcr.data[:0]
mcr.data = append(mcr.data, bs...)
return
}
Expand All @@ -119,7 +119,6 @@ REREADData:
err = errors.WithStack(err)
return
}
mcr.data = mcr.data[:0]
mcr.data = append(mcr.data, bs...)
mcr.data = append(mcr.data, data...)
return
Expand Down
14 changes: 7 additions & 7 deletions proto/memcache/proxy_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,14 +340,14 @@ func revSpacIdx(bs []byte) int {

// Encode encode response and write into writer.
func (p *proxyConn) Encode(m *proto.Message) (err error) {
if me := m.Err(); me != nil {
se := errors.Cause(me).Error()
_ = p.bw.Write(serverErrorBytes)
_ = p.bw.Write([]byte(se))
err = p.bw.Write(crlfBytes)
return
}
if !m.IsBatch() {
if me := m.Err(); me != nil {
se := errors.Cause(me).Error()
_ = p.bw.Write(serverErrorBytes)
_ = p.bw.Write([]byte(se))
err = p.bw.Write(crlfBytes)
return
}
mcr, ok := m.Request().(*MCRequest)
if !ok {
_ = p.bw.Write(serverErrorBytes)
Expand Down
1 change: 1 addition & 0 deletions proto/memcache/proxy_conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ func TestEncodeErr(t *testing.T) {
msg.Type = proto.CacheTypeMemcache
msg.WithRequest(&mockReq{})
msg.WithRequest(&mockReq{}) // NOTE: batch
msg.Batch()
err = p.Encode(msg)
assert.NoError(t, err)

Expand Down
4 changes: 2 additions & 2 deletions proto/memcache/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,9 +226,9 @@ func NewReq() *MCRequest {

// Put put req back to pool.
func (r *MCRequest) Put() {
r.data = nil
r.rTp = RequestTypeUnknown
r.key = nil
r.key = r.key[:0]
r.data = r.data[:0]
msgPool.Put(r)
}

Expand Down
4 changes: 2 additions & 2 deletions proto/memcache/request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,6 @@ func TestMCRequestFuncsOk(t *testing.T) {
req.Put()

assert.Equal(t, RequestTypeUnknown, req.rTp)
assert.Nil(t, req.key)
assert.Nil(t, req.data)
assert.Equal(t, []byte{}, req.key)
assert.Equal(t, []byte{}, req.data)
}
5 changes: 4 additions & 1 deletion proto/pipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,10 @@ func (mp *msgPipe) pipe() {
}
var rerr error
for i := 0; i < mp.count; i++ {
if rerr = nc.Read(mp.batch[i]); rerr != nil {
if rerr == nil {
rerr = nc.Read(mp.batch[i])
} // NOTE: no else!!!
if rerr != nil {
mp.batch[i].WithError(rerr)
}
mp.batch[i].Done()
Expand Down
42 changes: 39 additions & 3 deletions proto/pipe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package proto

import (
"crypto/rand"
"errors"
"sync"
"testing"
"time"
Expand All @@ -10,12 +11,20 @@ import (
)

type mockNodeConn struct {
closed bool
closed bool
count, num int
err error
}

func (n *mockNodeConn) Write(*Message) error { return nil }
func (n *mockNodeConn) Read(*Message) error { return nil }
func (n *mockNodeConn) Flush() error { return nil }
func (n *mockNodeConn) Read(*Message) error {
if n.count == n.num {
return n.err
}
n.count++
return nil
}
func (n *mockNodeConn) Flush() error { return nil }
func (n *mockNodeConn) Close() error {
n.closed = true
return nil
Expand Down Expand Up @@ -55,4 +64,31 @@ func TestPipe(t *testing.T) {
time.Sleep(10 * time.Millisecond)
assert.True(t, nc1.closed)
assert.True(t, nc2.closed)

const whenErrNum = 3
nc3 := &mockNodeConn{}
nc3.num = whenErrNum
nc3.err = errors.New("some error")
ncp3 := NewNodeConnPipe(1, func() NodeConn {
return nc3
})
wg = &sync.WaitGroup{}
var msgs []*Message
for i := 0; i < 10; i++ {
m := getMsg()
m.WithRequest(&mockRequest{})
m.WithWaitGroup(wg)
ncp3.Push(m)
msgs = append(msgs, m)
}
wg.Wait()
ncp3.Close()
time.Sleep(10 * time.Millisecond)
for i, msg := range msgs {
if i < whenErrNum {
assert.NoError(t, msg.Err())
} else {
assert.EqualError(t, msg.Err(), "some error")
}
}
}

0 comments on commit 02580c9

Please sign in to comment.