-
Notifications
You must be signed in to change notification settings - Fork 63
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix underlying connection not being closed on protocol error #64
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -314,105 +314,88 @@ type noopHandler struct{} | |
|
||
func (noopHandler) Handle(ctx context.Context, conn *jsonrpc2.Conn, req *jsonrpc2.Request) {} | ||
|
||
type readWriteCloser struct { | ||
read, write func(p []byte) (n int, err error) | ||
} | ||
|
||
func (x readWriteCloser) Read(p []byte) (n int, err error) { | ||
return x.read(p) | ||
} | ||
|
||
func (x readWriteCloser) Write(p []byte) (n int, err error) { | ||
return x.write(p) | ||
} | ||
func TestConn_DisconnectNotify(t *testing.T) { | ||
|
||
func (readWriteCloser) Close() error { return nil } | ||
t.Run("EOF", func(t *testing.T) { | ||
connA, connB := net.Pipe() | ||
c := jsonrpc2.NewConn(context.Background(), jsonrpc2.NewPlainObjectStream(connB), nil) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I switched from |
||
// By closing connA, connB receives io.EOF | ||
if err := connA.Close(); err != nil { | ||
t.Error(err) | ||
} | ||
assertDisconnect(t, c, connB) | ||
}) | ||
|
||
func eof(p []byte) (n int, err error) { | ||
return 0, io.EOF | ||
} | ||
t.Run("Close", func(t *testing.T) { | ||
_, connB := net.Pipe() | ||
c := jsonrpc2.NewConn(context.Background(), jsonrpc2.NewPlainObjectStream(connB), nil) | ||
if err := c.Close(); err != nil { | ||
t.Error(err) | ||
} | ||
assertDisconnect(t, c, connB) | ||
}) | ||
|
||
func TestConn_DisconnectNotify_EOF(t *testing.T) { | ||
c := jsonrpc2.NewConn(context.Background(), jsonrpc2.NewBufferedStream(&readWriteCloser{eof, eof}, jsonrpc2.VarintObjectCodec{}), nil) | ||
select { | ||
case <-c.DisconnectNotify(): | ||
case <-time.After(200 * time.Millisecond): | ||
t.Fatal("no disconnect notification") | ||
} | ||
} | ||
t.Run("Close async", func(t *testing.T) { | ||
done := make(chan struct{}) | ||
_, connB := net.Pipe() | ||
c := jsonrpc2.NewConn(context.Background(), jsonrpc2.NewPlainObjectStream(connB), nil) | ||
go func() { | ||
if err := c.Close(); err != nil && err != jsonrpc2.ErrClosed { | ||
t.Error(err) | ||
} | ||
close(done) | ||
}() | ||
assertDisconnect(t, c, connB) | ||
<-done | ||
}) | ||
|
||
func TestConn_DisconnectNotify_Close(t *testing.T) { | ||
c := jsonrpc2.NewConn(context.Background(), jsonrpc2.NewBufferedStream(&readWriteCloser{eof, eof}, jsonrpc2.VarintObjectCodec{}), nil) | ||
if err := c.Close(); err != nil { | ||
t.Error(err) | ||
} | ||
select { | ||
case <-c.DisconnectNotify(): | ||
case <-time.After(200 * time.Millisecond): | ||
t.Fatal("no disconnect notification") | ||
} | ||
t.Run("protocol error", func(t *testing.T) { | ||
connA, connB := net.Pipe() | ||
c := jsonrpc2.NewConn(context.Background(), jsonrpc2.NewPlainObjectStream(connB), nil) | ||
connA.Write([]byte("invalid json")) | ||
assertDisconnect(t, c, connB) | ||
}) | ||
} | ||
|
||
func TestConn_DisconnectNotify_Close_async(t *testing.T) { | ||
done := make(chan struct{}) | ||
c := jsonrpc2.NewConn(context.Background(), jsonrpc2.NewBufferedStream(&readWriteCloser{eof, eof}, jsonrpc2.VarintObjectCodec{}), nil) | ||
go func() { | ||
func TestConn_Close(t *testing.T) { | ||
t.Run("waiting for response", func(t *testing.T) { | ||
_, connB := net.Pipe() | ||
c := jsonrpc2.NewConn(context.Background(), jsonrpc2.NewPlainObjectStream(connB), nil) | ||
done := make(chan struct{}) | ||
go func() { | ||
if err := c.Call(context.Background(), "m", nil, nil); err != jsonrpc2.ErrClosed { | ||
t.Errorf("got error %v, want %v", err, jsonrpc2.ErrClosed) | ||
} | ||
close(done) | ||
}() | ||
if err := c.Close(); err != nil && err != jsonrpc2.ErrClosed { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I discovered that this test did not run as intended. It is my understanding that the intention of this test is to ensure that pending calls receive There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nice! Now this all LGTM. Thanks for the deep diving :) |
||
t.Error(err) | ||
} | ||
close(done) | ||
}() | ||
select { | ||
case <-c.DisconnectNotify(): | ||
case <-time.After(200 * time.Millisecond): | ||
t.Fatal("no disconnect notification") | ||
} | ||
<-done | ||
assertDisconnect(t, c, connB) | ||
<-done | ||
}) | ||
} | ||
|
||
func TestConn_Close_waitingForResponse(t *testing.T) { | ||
c := jsonrpc2.NewConn(context.Background(), jsonrpc2.NewBufferedStream(&readWriteCloser{eof, eof}, jsonrpc2.VarintObjectCodec{}), noopHandler{}) | ||
done := make(chan struct{}) | ||
go func() { | ||
if err := c.Call(context.Background(), "m", nil, nil); err != jsonrpc2.ErrClosed { | ||
t.Errorf("got error %v, want %v", err, jsonrpc2.ErrClosed) | ||
func serve(ctx context.Context, lis net.Listener, h jsonrpc2.Handler, streamMaker streamMaker, opts ...jsonrpc2.ConnOpt) error { | ||
for { | ||
conn, err := lis.Accept() | ||
if err != nil { | ||
return err | ||
} | ||
close(done) | ||
}() | ||
if err := c.Close(); err != nil && err != jsonrpc2.ErrClosed { | ||
t.Error(err) | ||
} | ||
select { | ||
case <-c.DisconnectNotify(): | ||
case <-time.After(200 * time.Millisecond): | ||
t.Fatal("no disconnect notification") | ||
jsonrpc2.NewConn(ctx, streamMaker(conn), h, opts...) | ||
} | ||
<-done | ||
} | ||
|
||
func TestConn_DisconnectNotify_protocol_error(t *testing.T) { | ||
connA, connB := net.Pipe() | ||
c := jsonrpc2.NewConn(context.Background(), jsonrpc2.NewBufferedStream(connB, jsonrpc2.VarintObjectCodec{}), nil) | ||
connA.Write([]byte("invalid json")) | ||
func assertDisconnect(t *testing.T, c *jsonrpc2.Conn, conn net.Conn) { | ||
select { | ||
case <-c.DisconnectNotify(): | ||
case <-time.After(200 * time.Millisecond): | ||
t.Fatal("no disconnect notification") | ||
} | ||
// Assert that the underlying connection is closed by trying to write to it. | ||
_, got := connB.Write(nil) | ||
// Assert that conn is closed by trying to write to it. | ||
_, got := conn.Write(nil) | ||
want := io.ErrClosedPipe | ||
if got != want { | ||
t.Fatalf("got %q, want %q", got, want) | ||
} | ||
} | ||
|
||
func serve(ctx context.Context, lis net.Listener, h jsonrpc2.Handler, streamMaker streamMaker, opts ...jsonrpc2.ConnOpt) error { | ||
for { | ||
conn, err := lis.Accept() | ||
if err != nil { | ||
return err | ||
} | ||
jsonrpc2.NewConn(ctx, streamMaker(conn), h, opts...) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of modifying
readWriteCloser
so that it would allow us to test if it's been closed, I just usednet.Pipe()
from std lib instead.