Skip to content

Commit

Permalink
Merge pull request #41 from msolo/pbrpc
Browse files Browse the repository at this point in the history
Add basic support for protobuf rpc. Check in envelope so there is no dep...
  • Loading branch information
alainjobart committed Apr 17, 2014
2 parents c4d1cdf + c661f52 commit 2155bd2
Show file tree
Hide file tree
Showing 6 changed files with 355 additions and 0 deletions.
104 changes: 104 additions & 0 deletions go/rpcplus/pbrpc/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
// Package pbrpc implements a ClientCodec and ServerCodec
// for the rpc package using proto.
package pbrpc

import (
"io"
"net"
"sync"

"code.google.com/p/goprotobuf/proto"
rpc "github.com/youtube/vitess/go/rpcplus"
)

// NewClientCodec returns a new rpc.ClientCodec using Protobuf on conn.
func NewClientCodec(conn io.ReadWriteCloser) rpc.ClientCodec {
return &pbClientCodec{rwc: conn}
}

// NewClient returns a new rpc.Client to handle requests to the
// set of services at the other end of the connection.
func NewClient(conn io.ReadWriteCloser) *rpc.Client {
return rpc.NewClientWithCodec(NewClientCodec(conn))
}

// Dial connects to a Protobuf-RPC server at the specified network address.
func Dial(network, address string) (*rpc.Client, error) {
conn, err := net.Dial(network, address)
if err != nil {
return nil, err
}
return NewClient(conn), err
}

type pbClientCodec struct {
mu sync.Mutex
rwc io.ReadWriteCloser
}

// WriteRequest - implement rpc.ClientCodec interface.
func (c *pbClientCodec) WriteRequest(r *rpc.Request, body interface{}) (err error) {
// Use a mutex to guarantee the header/body are written in the correct order.
c.mu.Lock()
defer c.mu.Unlock()

// This is protobuf, of course we copy it.
pbr := &Request{ServiceMethod: &r.ServiceMethod, Seq: &r.Seq}
data, err := proto.Marshal(pbr)
if err != nil {
return
}
_, err = WriteNetString(c.rwc, data)
if err != nil {
return
}

// Of course this is a protobuf! Trust me or detonate the program.
data, err = proto.Marshal(body.(proto.Message))
if err != nil {
return
}
_, err = WriteNetString(c.rwc, data)
if err != nil {
return
}

if flusher, ok := c.rwc.(flusher); ok {
err = flusher.Flush()
}
return
}

// ReadResponseHeader - implement rpc.ClientCodec interface.
func (c *pbClientCodec) ReadResponseHeader(r *rpc.Response) error {
data, err := ReadNetString(c.rwc)
if err != nil {
return err
}
rtmp := new(Response)
err = proto.Unmarshal(data, rtmp)
if err != nil {
return err
}
r.ServiceMethod = *rtmp.ServiceMethod
r.Seq = *rtmp.Seq
r.Error = *rtmp.Error
return nil
}

// ReadResponseBody - implement rpc.ClientCodec interface.
func (c *pbClientCodec) ReadResponseBody(body interface{}) error {
data, err := ReadNetString(c.rwc)
if err != nil {
return err
}
if body != nil {
return proto.Unmarshal(data, body.(proto.Message))
}
return nil
}

// Close - implement rpc.ClientCodec interface.
func (c *pbClientCodec) Close() error {
return c.rwc.Close()
}
83 changes: 83 additions & 0 deletions go/rpcplus/pbrpc/envelope.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions go/rpcplus/pbrpc/envelope.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package pbrpc;

message Request {
optional string service_method = 1;
optional fixed64 seq = 2;
}

message Response {
optional string service_method = 1;
optional fixed64 seq = 2;
optional string error = 3;
}
30 changes: 30 additions & 0 deletions go/rpcplus/pbrpc/misc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package pbrpc

import (
"time"

rpc "github.com/youtube/vitess/go/rpcplus"
"github.com/youtube/vitess/go/rpcwrap"
)

const codecName = "protobuf"

// DialHTTP with Protobuf codec.
func DialHTTP(network, address string, connectTimeout time.Duration) (*rpc.Client, error) {
return rpcwrap.DialHTTP(network, address, codecName, NewClientCodec, connectTimeout, nil)
}

// DialAuthHTTP with Protobuf codec.
func DialAuthHTTP(network, address, user, password string, connectTimeout time.Duration) (*rpc.Client, error) {
return rpcwrap.DialAuthHTTP(network, address, user, password, codecName, NewClientCodec, connectTimeout, nil)
}

// ServeRPC with Protobuf codec.
func ServeRPC() {
rpcwrap.ServeRPC(codecName, NewServerCodec)
}

// ServeAuthRPC with Protobuf codec.
func ServeAuthRPC() {
rpcwrap.ServeAuthRPC(codecName, NewServerCodec)
}
36 changes: 36 additions & 0 deletions go/rpcplus/pbrpc/netstring.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package pbrpc

import (
"encoding/binary"
"io"
)

// WriteNetString writes data to a big-endian netstring on a Writer.
// Size is always a 32-bit unsigned int.
func WriteNetString(w io.Writer, data []byte) (written int, err error) {
size := make([]byte, 4)
binary.BigEndian.PutUint32(size, uint32(len(data)))
if written, err = w.Write(size); err != nil {
return
}
return w.Write(data)
}

// ReadNetString reads data from a big-endian netstring.
func ReadNetString(r io.Reader) (data []byte, err error) {
sizeBuf := make([]byte, 4)
_, err = r.Read(sizeBuf)
if err != nil {
return nil, err
}
size := binary.BigEndian.Uint32(sizeBuf)
if size == 0 {
return nil, nil
}
data = make([]byte, size)
_, err = r.Read(data)
if err != nil {
return nil, err
}
return
}
90 changes: 90 additions & 0 deletions go/rpcplus/pbrpc/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package pbrpc

import (
"io"
"sync"

"code.google.com/p/goprotobuf/proto"
rpc "github.com/youtube/vitess/go/rpcplus"
)

type pbServerCodec struct {
mu sync.Mutex
rwc io.ReadWriteCloser
}

// NewServerCodec returns a new ServerCodec.
func NewServerCodec(rwc io.ReadWriteCloser) rpc.ServerCodec {
return &pbServerCodec{rwc: rwc}
}

// ReadRequestHeader reads a Request.
func (c *pbServerCodec) ReadRequestHeader(r *rpc.Request) error {
data, err := ReadNetString(c.rwc)
if err != nil {
return err
}
rtmp := new(Request)
err = proto.Unmarshal(data, rtmp)
if err != nil {
return err
}
r.ServiceMethod = *rtmp.ServiceMethod
r.Seq = *rtmp.Seq
return nil
}

// ReadRequestBody reads a body structure from the codec.
func (c *pbServerCodec) ReadRequestBody(body interface{}) error {
data, err := ReadNetString(c.rwc)
if err != nil {
return err
}
if body != nil {
return proto.Unmarshal(data, body.(proto.Message))
}
return nil
}

type flusher interface {
Flush() error
}

// WriteResponse writes a response on the codec.
func (c *pbServerCodec) WriteResponse(r *rpc.Response, body interface{}, last bool) (err error) {
// Use a mutex to guarantee the header/body are written in the correct order.
c.mu.Lock()
defer c.mu.Unlock()
rtmp := &Response{ServiceMethod: &r.ServiceMethod, Seq: &r.Seq, Error: &r.Error}
data, err := proto.Marshal(rtmp)
if err != nil {
return
}
_, err = WriteNetString(c.rwc, data)
if err != nil {
return
}

if pb, ok := body.(proto.Message); ok {
data, err = proto.Marshal(pb)
if err != nil {
return
}
} else {
data = nil
}
_, err = WriteNetString(c.rwc, data)
if err != nil {
return
}

if flusher, ok := c.rwc.(flusher); ok {
err = flusher.Flush()
}
return
}

// Close the underlying connection.
func (c *pbServerCodec) Close() error {
return c.rwc.Close()
}

0 comments on commit 2155bd2

Please sign in to comment.