From 8326a0d39984b7ebe940d9110209849e8a8cbf43 Mon Sep 17 00:00:00 2001 From: Mike Solomon Date: Sun, 13 Apr 2014 23:40:26 -0700 Subject: [PATCH 1/2] Add basic support for protobuf rpc. Check in envelope so there is no dependency on the proto compiler. --- go/rpcplus/pbrpc/client.go | 100 ++++++++++++++++++++++++++++++++ go/rpcplus/pbrpc/envelope.pb.go | 83 ++++++++++++++++++++++++++ go/rpcplus/pbrpc/envelope.proto | 12 ++++ go/rpcplus/pbrpc/misc.go | 26 +++++++++ go/rpcplus/pbrpc/netstring.go | 33 +++++++++++ go/rpcplus/pbrpc/server.go | 85 +++++++++++++++++++++++++++ 6 files changed, 339 insertions(+) create mode 100644 go/rpcplus/pbrpc/client.go create mode 100644 go/rpcplus/pbrpc/envelope.pb.go create mode 100644 go/rpcplus/pbrpc/envelope.proto create mode 100644 go/rpcplus/pbrpc/misc.go create mode 100644 go/rpcplus/pbrpc/netstring.go create mode 100644 go/rpcplus/pbrpc/server.go diff --git a/go/rpcplus/pbrpc/client.go b/go/rpcplus/pbrpc/client.go new file mode 100644 index 00000000000..d573a375bdc --- /dev/null +++ b/go/rpcplus/pbrpc/client.go @@ -0,0 +1,100 @@ +// Package pbrpc implements a ClientCodec and ServerCodec +// for the rpc package using proto. +package pbrpc + +import ( + "io" + "net" + "sync" + + "code.google.com/p/goprotobuf/proto" + rpc "github.com/youtube/vitess/go/rpcplus" +) + +// NewClientCodec returns a new rpc.ClientCodec using JSON-RPC on conn. +func NewClientCodec(conn io.ReadWriteCloser) rpc.ClientCodec { + return &pbClientCodec{rwc: conn} +} + +// NewClient returns a new rpc.Client to handle requests to the +// set of services at the other end of the connection. +func NewClient(conn io.ReadWriteCloser) *rpc.Client { + return rpc.NewClientWithCodec(NewClientCodec(conn)) +} + +// Dial connects to a Protobuf-RPC server at the specified network address. +func Dial(network, address string) (*rpc.Client, error) { + conn, err := net.Dial(network, address) + if err != nil { + return nil, err + } + return NewClient(conn), err +} + +type pbClientCodec struct { + mu sync.Mutex + rwc io.ReadWriteCloser +} + +func (c *pbClientCodec) WriteRequest(r *rpc.Request, body interface{}) (err error) { + // Use a mutex to guarantee the header/body are written in the correct order. + c.mu.Lock() + defer c.mu.Unlock() + + // This is protobuf, of course we copy it. + pbr := &Request{ServiceMethod: &r.ServiceMethod, Seq: &r.Seq} + data, err := proto.Marshal(pbr) + if err != nil { + return + } + _, err = WriteNetString(c.rwc, data) + if err != nil { + return + } + + // Of course this is a protobuf! Trust me or detonate the program. + data, err = proto.Marshal(body.(proto.Message)) + if err != nil { + return + } + _, err = WriteNetString(c.rwc, data) + if err != nil { + return + } + + if flusher, ok := c.rwc.(Flusher); ok { + err = flusher.Flush() + } + return +} + +func (c *pbClientCodec) ReadResponseHeader(r *rpc.Response) error { + data, err := ReadNetString(c.rwc) + if err != nil { + return err + } + rtmp := new(Response) + err = proto.Unmarshal(data, rtmp) + if err != nil { + return err + } + r.ServiceMethod = *rtmp.ServiceMethod + r.Seq = *rtmp.Seq + r.Error = *rtmp.Error + return nil +} + +func (c *pbClientCodec) ReadResponseBody(body interface{}) error { + data, err := ReadNetString(c.rwc) + if err != nil { + return err + } + if body != nil { + return proto.Unmarshal(data, body.(proto.Message)) + } + return nil +} + +func (c *pbClientCodec) Close() error { + return c.rwc.Close() +} diff --git a/go/rpcplus/pbrpc/envelope.pb.go b/go/rpcplus/pbrpc/envelope.pb.go new file mode 100644 index 00000000000..5fa97f28ff0 --- /dev/null +++ b/go/rpcplus/pbrpc/envelope.pb.go @@ -0,0 +1,83 @@ +// Code generated by protoc-gen-go. +// source: envelope.proto +// DO NOT EDIT! + +/* +Package pbrpc is a generated protocol buffer package. + +It is generated from these files: + envelope.proto + +It has these top-level messages: + Request + Response +*/ +package pbrpc + +import proto "code.google.com/p/goprotobuf/proto" +import json "encoding/json" +import math "math" + +// Reference proto, json, and math imports to suppress error if they are not otherwise used. +var _ = proto.Marshal +var _ = &json.SyntaxError{} +var _ = math.Inf + +type Request struct { + ServiceMethod *string `protobuf:"bytes,1,opt,name=service_method" json:"service_method,omitempty"` + Seq *uint64 `protobuf:"fixed64,2,opt,name=seq" json:"seq,omitempty"` + XXX_unrecognized []byte `json:"-"` +} + +func (m *Request) Reset() { *m = Request{} } +func (m *Request) String() string { return proto.CompactTextString(m) } +func (*Request) ProtoMessage() {} + +func (m *Request) GetServiceMethod() string { + if m != nil && m.ServiceMethod != nil { + return *m.ServiceMethod + } + return "" +} + +func (m *Request) GetSeq() uint64 { + if m != nil && m.Seq != nil { + return *m.Seq + } + return 0 +} + +type Response struct { + ServiceMethod *string `protobuf:"bytes,1,opt,name=service_method" json:"service_method,omitempty"` + Seq *uint64 `protobuf:"fixed64,2,opt,name=seq" json:"seq,omitempty"` + Error *string `protobuf:"bytes,3,opt,name=error" json:"error,omitempty"` + XXX_unrecognized []byte `json:"-"` +} + +func (m *Response) Reset() { *m = Response{} } +func (m *Response) String() string { return proto.CompactTextString(m) } +func (*Response) ProtoMessage() {} + +func (m *Response) GetServiceMethod() string { + if m != nil && m.ServiceMethod != nil { + return *m.ServiceMethod + } + return "" +} + +func (m *Response) GetSeq() uint64 { + if m != nil && m.Seq != nil { + return *m.Seq + } + return 0 +} + +func (m *Response) GetError() string { + if m != nil && m.Error != nil { + return *m.Error + } + return "" +} + +func init() { +} diff --git a/go/rpcplus/pbrpc/envelope.proto b/go/rpcplus/pbrpc/envelope.proto new file mode 100644 index 00000000000..6c5d57218ea --- /dev/null +++ b/go/rpcplus/pbrpc/envelope.proto @@ -0,0 +1,12 @@ +package pbrpc; + +message Request { + optional string service_method = 1; + optional fixed64 seq = 2; +} + +message Response { + optional string service_method = 1; + optional fixed64 seq = 2; + optional string error = 3; +} diff --git a/go/rpcplus/pbrpc/misc.go b/go/rpcplus/pbrpc/misc.go new file mode 100644 index 00000000000..8bc3977a5e4 --- /dev/null +++ b/go/rpcplus/pbrpc/misc.go @@ -0,0 +1,26 @@ +package pbrpc + +import ( + "time" + + rpc "github.com/youtube/vitess/go/rpcplus" + "github.com/youtube/vitess/go/rpcwrap" +) + +const codecName = "protobuf" + +func DialHTTP(network, address string, connectTimeout time.Duration) (*rpc.Client, error) { + return rpcwrap.DialHTTP(network, address, codecName, NewClientCodec, connectTimeout, nil) +} + +func DialAuthHTTP(network, address, user, password string, connectTimeout time.Duration) (*rpc.Client, error) { + return rpcwrap.DialAuthHTTP(network, address, user, password, codecName, NewClientCodec, connectTimeout, nil) +} + +func ServeRPC() { + rpcwrap.ServeRPC(codecName, NewServerCodec) +} + +func ServeAuthRPC() { + rpcwrap.ServeAuthRPC(codecName, NewServerCodec) +} diff --git a/go/rpcplus/pbrpc/netstring.go b/go/rpcplus/pbrpc/netstring.go new file mode 100644 index 00000000000..ea2368ddfcc --- /dev/null +++ b/go/rpcplus/pbrpc/netstring.go @@ -0,0 +1,33 @@ +package pbrpc + +import ( + "encoding/binary" + "io" +) + +func WriteNetString(w io.Writer, data []byte) (written int, err error) { + size := make([]byte, 4) + binary.BigEndian.PutUint32(size, uint32(len(data))) + if written, err = w.Write(size); err != nil { + return + } + return w.Write(data) +} + +func ReadNetString(r io.Reader) (data []byte, err error) { + sizeBuf := make([]byte, 4) + _, err = r.Read(sizeBuf) + if err != nil { + return nil, err + } + size := binary.BigEndian.Uint32(sizeBuf) + if size == 0 { + return nil, nil + } + data = make([]byte, size) + _, err = r.Read(data) + if err != nil { + return nil, err + } + return +} diff --git a/go/rpcplus/pbrpc/server.go b/go/rpcplus/pbrpc/server.go new file mode 100644 index 00000000000..a9f0fa7b4d8 --- /dev/null +++ b/go/rpcplus/pbrpc/server.go @@ -0,0 +1,85 @@ +package pbrpc + +import ( + "io" + "sync" + + "code.google.com/p/goprotobuf/proto" + rpc "github.com/youtube/vitess/go/rpcplus" +) + +type pbServerCodec struct { + mu sync.Mutex + rwc io.ReadWriteCloser +} + +func NewServerCodec(rwc io.ReadWriteCloser) rpc.ServerCodec { + return &pbServerCodec{rwc: rwc} +} + +type Flusher interface { + Flush() error +} + +func (c *pbServerCodec) ReadRequestHeader(r *rpc.Request) error { + data, err := ReadNetString(c.rwc) + if err != nil { + return err + } + rtmp := new(Request) + err = proto.Unmarshal(data, rtmp) + if err != nil { + return err + } + r.ServiceMethod = *rtmp.ServiceMethod + r.Seq = *rtmp.Seq + return nil +} + +func (c *pbServerCodec) ReadRequestBody(body interface{}) error { + data, err := ReadNetString(c.rwc) + if err != nil { + return err + } + if body != nil { + return proto.Unmarshal(data, body.(proto.Message)) + } + return nil +} + +func (c *pbServerCodec) WriteResponse(r *rpc.Response, body interface{}, last bool) (err error) { + // Use a mutex to guarantee the header/body are written in the correct order. + c.mu.Lock() + defer c.mu.Unlock() + rtmp := &Response{ServiceMethod: &r.ServiceMethod, Seq: &r.Seq, Error: &r.Error} + data, err := proto.Marshal(rtmp) + if err != nil { + return + } + _, err = WriteNetString(c.rwc, data) + if err != nil { + return + } + + if pb, ok := body.(proto.Message); ok { + data, err = proto.Marshal(pb) + if err != nil { + return + } + } else { + data = nil + } + _, err = WriteNetString(c.rwc, data) + if err != nil { + return + } + + if flusher, ok := c.rwc.(Flusher); ok { + err = flusher.Flush() + } + return +} + +func (c *pbServerCodec) Close() error { + return c.rwc.Close() +} From c661f52b8a999f1ea8f5301efdfc89228ebc6134 Mon Sep 17 00:00:00 2001 From: Mike Solomon Date: Wed, 16 Apr 2014 22:33:51 -0700 Subject: [PATCH 2/2] Add comments. --- go/rpcplus/pbrpc/client.go | 8 ++++++-- go/rpcplus/pbrpc/misc.go | 4 ++++ go/rpcplus/pbrpc/netstring.go | 3 +++ go/rpcplus/pbrpc/server.go | 15 ++++++++++----- 4 files changed, 23 insertions(+), 7 deletions(-) diff --git a/go/rpcplus/pbrpc/client.go b/go/rpcplus/pbrpc/client.go index d573a375bdc..2097a95f4fd 100644 --- a/go/rpcplus/pbrpc/client.go +++ b/go/rpcplus/pbrpc/client.go @@ -11,7 +11,7 @@ import ( rpc "github.com/youtube/vitess/go/rpcplus" ) -// NewClientCodec returns a new rpc.ClientCodec using JSON-RPC on conn. +// NewClientCodec returns a new rpc.ClientCodec using Protobuf on conn. func NewClientCodec(conn io.ReadWriteCloser) rpc.ClientCodec { return &pbClientCodec{rwc: conn} } @@ -36,6 +36,7 @@ type pbClientCodec struct { rwc io.ReadWriteCloser } +// WriteRequest - implement rpc.ClientCodec interface. func (c *pbClientCodec) WriteRequest(r *rpc.Request, body interface{}) (err error) { // Use a mutex to guarantee the header/body are written in the correct order. c.mu.Lock() @@ -62,12 +63,13 @@ func (c *pbClientCodec) WriteRequest(r *rpc.Request, body interface{}) (err erro return } - if flusher, ok := c.rwc.(Flusher); ok { + if flusher, ok := c.rwc.(flusher); ok { err = flusher.Flush() } return } +// ReadResponseHeader - implement rpc.ClientCodec interface. func (c *pbClientCodec) ReadResponseHeader(r *rpc.Response) error { data, err := ReadNetString(c.rwc) if err != nil { @@ -84,6 +86,7 @@ func (c *pbClientCodec) ReadResponseHeader(r *rpc.Response) error { return nil } +// ReadResponseBody - implement rpc.ClientCodec interface. func (c *pbClientCodec) ReadResponseBody(body interface{}) error { data, err := ReadNetString(c.rwc) if err != nil { @@ -95,6 +98,7 @@ func (c *pbClientCodec) ReadResponseBody(body interface{}) error { return nil } +// Close - implement rpc.ClientCodec interface. func (c *pbClientCodec) Close() error { return c.rwc.Close() } diff --git a/go/rpcplus/pbrpc/misc.go b/go/rpcplus/pbrpc/misc.go index 8bc3977a5e4..9f384f16ae5 100644 --- a/go/rpcplus/pbrpc/misc.go +++ b/go/rpcplus/pbrpc/misc.go @@ -9,18 +9,22 @@ import ( const codecName = "protobuf" +// DialHTTP with Protobuf codec. func DialHTTP(network, address string, connectTimeout time.Duration) (*rpc.Client, error) { return rpcwrap.DialHTTP(network, address, codecName, NewClientCodec, connectTimeout, nil) } +// DialAuthHTTP with Protobuf codec. func DialAuthHTTP(network, address, user, password string, connectTimeout time.Duration) (*rpc.Client, error) { return rpcwrap.DialAuthHTTP(network, address, user, password, codecName, NewClientCodec, connectTimeout, nil) } +// ServeRPC with Protobuf codec. func ServeRPC() { rpcwrap.ServeRPC(codecName, NewServerCodec) } +// ServeAuthRPC with Protobuf codec. func ServeAuthRPC() { rpcwrap.ServeAuthRPC(codecName, NewServerCodec) } diff --git a/go/rpcplus/pbrpc/netstring.go b/go/rpcplus/pbrpc/netstring.go index ea2368ddfcc..2d3616bcbd9 100644 --- a/go/rpcplus/pbrpc/netstring.go +++ b/go/rpcplus/pbrpc/netstring.go @@ -5,6 +5,8 @@ import ( "io" ) +// WriteNetString writes data to a big-endian netstring on a Writer. +// Size is always a 32-bit unsigned int. func WriteNetString(w io.Writer, data []byte) (written int, err error) { size := make([]byte, 4) binary.BigEndian.PutUint32(size, uint32(len(data))) @@ -14,6 +16,7 @@ func WriteNetString(w io.Writer, data []byte) (written int, err error) { return w.Write(data) } +// ReadNetString reads data from a big-endian netstring. func ReadNetString(r io.Reader) (data []byte, err error) { sizeBuf := make([]byte, 4) _, err = r.Read(sizeBuf) diff --git a/go/rpcplus/pbrpc/server.go b/go/rpcplus/pbrpc/server.go index a9f0fa7b4d8..96d6b7aed65 100644 --- a/go/rpcplus/pbrpc/server.go +++ b/go/rpcplus/pbrpc/server.go @@ -13,14 +13,12 @@ type pbServerCodec struct { rwc io.ReadWriteCloser } +// NewServerCodec returns a new ServerCodec. func NewServerCodec(rwc io.ReadWriteCloser) rpc.ServerCodec { return &pbServerCodec{rwc: rwc} } -type Flusher interface { - Flush() error -} - +// ReadRequestHeader reads a Request. func (c *pbServerCodec) ReadRequestHeader(r *rpc.Request) error { data, err := ReadNetString(c.rwc) if err != nil { @@ -36,6 +34,7 @@ func (c *pbServerCodec) ReadRequestHeader(r *rpc.Request) error { return nil } +// ReadRequestBody reads a body structure from the codec. func (c *pbServerCodec) ReadRequestBody(body interface{}) error { data, err := ReadNetString(c.rwc) if err != nil { @@ -47,6 +46,11 @@ func (c *pbServerCodec) ReadRequestBody(body interface{}) error { return nil } +type flusher interface { + Flush() error +} + +// WriteResponse writes a response on the codec. func (c *pbServerCodec) WriteResponse(r *rpc.Response, body interface{}, last bool) (err error) { // Use a mutex to guarantee the header/body are written in the correct order. c.mu.Lock() @@ -74,12 +78,13 @@ func (c *pbServerCodec) WriteResponse(r *rpc.Response, body interface{}, last bo return } - if flusher, ok := c.rwc.(Flusher); ok { + if flusher, ok := c.rwc.(flusher); ok { err = flusher.Flush() } return } +// Close the underlying connection. func (c *pbServerCodec) Close() error { return c.rwc.Close() }