Skip to content

Commit

Permalink
Merge pull request #31 from cpuguy83/support_context_deadlines
Browse files Browse the repository at this point in the history
Add support for request timeout propgation.
  • Loading branch information
stevvooe committed Jan 7, 2019
2 parents f51df44 + a364f44 commit f02858b
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 5 deletions.
6 changes: 6 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"strings"
"sync"
"syscall"
"time"

"github.com/gogo/protobuf/proto"
"github.com/pkg/errors"
Expand Down Expand Up @@ -86,6 +87,10 @@ func (c *Client) Call(ctx context.Context, service, method string, req, resp int
cresp = &Response{}
)

if dl, ok := ctx.Deadline(); ok {
creq.TimeoutNano = dl.Sub(time.Now()).Nanoseconds()
}

if err := c.dispatch(ctx, creq, cresp); err != nil {
return err
}
Expand All @@ -104,6 +109,7 @@ func (c *Client) Call(ctx context.Context, service, method string, req, resp int
func (c *Client) dispatch(ctx context.Context, req *Request, resp *Response) error {
errs := make(chan error, 1)
call := &callRequest{
ctx: ctx,
req: req,
resp: resp,
errs: errs,
Expand Down
15 changes: 15 additions & 0 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,9 @@ func (c *serverConn) run(sctx context.Context) {
case request := <-requests:
active++
go func(id uint32) {
ctx, cancel := getRequestContext(ctx, request.req)
defer cancel()

p, status := c.server.services.call(ctx, request.req.Service, request.req.Method, request.req.Payload)
resp := &Response{
Status: status.Proto(),
Expand Down Expand Up @@ -454,3 +457,15 @@ func (c *serverConn) run(sctx context.Context) {
}
}
}

var noopFunc = func() {}

func getRequestContext(ctx context.Context, req *Request) (retCtx context.Context, cancel func()) {
cancel = noopFunc
if req.TimeoutNano == 0 {
return ctx, cancel
}

ctx, cancel = context.WithTimeout(ctx, time.Duration(req.TimeoutNano))
return ctx, cancel
}
38 changes: 36 additions & 2 deletions server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"strings"
"sync"
"testing"
"time"

"github.com/gogo/protobuf/proto"
"github.com/pkg/errors"
Expand Down Expand Up @@ -57,7 +58,8 @@ func (tc *testingClient) Test(ctx context.Context, req *testPayload) (*testPaylo
}

type testPayload struct {
Foo string `protobuf:"bytes,1,opt,name=foo,proto3"`
Foo string `protobuf:"bytes,1,opt,name=foo,proto3"`
Deadline int64 `protobuf:"varint,2,opt,name=deadline,proto3"`
}

func (r *testPayload) Reset() { *r = testPayload{} }
Expand All @@ -68,7 +70,11 @@ func (r *testPayload) ProtoMessage() {}
type testingServer struct{}

func (s *testingServer) Test(ctx context.Context, req *testPayload) (*testPayload, error) {
return &testPayload{Foo: strings.Repeat(req.Foo, 2)}, nil
tp := &testPayload{Foo: strings.Repeat(req.Foo, 2)}
if dl, ok := ctx.Deadline(); ok {
tp.Deadline = dl.UnixNano()
}
return tp, nil
}

// registerTestingService mocks more of what is generated code. Unlike grpc, we
Expand Down Expand Up @@ -376,6 +382,34 @@ func TestUnixSocketHandshake(t *testing.T) {
}
}

func TestServerRequestTimeout(t *testing.T) {
var (
ctx, cancel = context.WithDeadline(context.Background(), time.Now().Add(10*time.Minute))
server = mustServer(t)(NewServer())
addr, listener = newTestListener(t)
testImpl = &testingServer{}
client, cleanup = newTestClient(t, addr)
result testPayload
)
defer cancel()
defer cleanup()
defer listener.Close()

registerTestingService(server, testImpl)

go server.Serve(ctx, listener)
defer server.Shutdown(ctx)

if err := client.Call(ctx, serviceName, "Test", &testPayload{}, &result); err != nil {
t.Fatalf("unexpected error making call: %v", err)
}

dl, _ := ctx.Deadline()
if result.Deadline != dl.UnixNano() {
t.Fatalf("expected deadline %v, actual: %v", dl, result.Deadline)
}
}

func BenchmarkRoundTrip(b *testing.B) {
var (
ctx = context.Background()
Expand Down
7 changes: 4 additions & 3 deletions types.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@ import (
)

type Request struct {
Service string `protobuf:"bytes,1,opt,name=service,proto3"`
Method string `protobuf:"bytes,2,opt,name=method,proto3"`
Payload []byte `protobuf:"bytes,3,opt,name=payload,proto3"`
Service string `protobuf:"bytes,1,opt,name=service,proto3"`
Method string `protobuf:"bytes,2,opt,name=method,proto3"`
Payload []byte `protobuf:"bytes,3,opt,name=payload,proto3"`
TimeoutNano int64 `protobuf:"varint,4,opt,name=timeout_nano,proto3"`
}

func (r *Request) Reset() { *r = Request{} }
Expand Down

0 comments on commit f02858b

Please sign in to comment.