Skip to content

Commit

Permalink
Merge pull request #478 from mailgun/master
Browse files Browse the repository at this point in the history
Add decode method to request types
  • Loading branch information
eapache committed Jul 13, 2015
2 parents db6a85d + aa411f1 commit c1d582e
Show file tree
Hide file tree
Showing 18 changed files with 396 additions and 51 deletions.
10 changes: 5 additions & 5 deletions broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ func (b *Broker) FetchOffset(request *OffsetFetchRequest) (*OffsetFetchResponse,
return response, nil
}

func (b *Broker) send(req requestEncoder, promiseResponse bool) (*responsePromise, error) {
func (b *Broker) send(rb requestBody, promiseResponse bool) (*responsePromise, error) {
b.lock.Lock()
defer b.lock.Unlock()

Expand All @@ -245,8 +245,8 @@ func (b *Broker) send(req requestEncoder, promiseResponse bool) (*responsePromis
return nil, ErrNotConnected
}

fullRequest := request{b.correlationID, b.conf.ClientID, req}
buf, err := encode(&fullRequest)
req := &request{correlationID: b.correlationID, clientID: b.conf.ClientID, body: rb}
buf, err := encode(req)
if err != nil {
return nil, err
}
Expand All @@ -266,13 +266,13 @@ func (b *Broker) send(req requestEncoder, promiseResponse bool) (*responsePromis
return nil, nil
}

promise := responsePromise{fullRequest.correlationID, make(chan []byte), make(chan error)}
promise := responsePromise{req.correlationID, make(chan []byte), make(chan error)}
b.responses <- promise

return &promise, nil
}

func (b *Broker) sendAndReceive(req requestEncoder, res decoder) error {
func (b *Broker) sendAndReceive(req requestBody, res decoder) error {
promise, err := b.send(req, res != nil)

if err != nil {
Expand Down
5 changes: 5 additions & 0 deletions consumer_metadata_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@ func (r *ConsumerMetadataRequest) encode(pe packetEncoder) error {
return pe.putString(r.ConsumerGroup)
}

func (r *ConsumerMetadataRequest) decode(pd packetDecoder) (err error) {
r.ConsumerGroup, err = pd.getString()
return err
}

func (r *ConsumerMetadataRequest) key() int16 {
return 10
}
Expand Down
4 changes: 2 additions & 2 deletions consumer_metadata_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ var (

func TestConsumerMetadataRequest(t *testing.T) {
request := new(ConsumerMetadataRequest)
testEncodable(t, "empty string", request, consumerMetadataRequestEmpty)
testRequest(t, "empty string", request, consumerMetadataRequestEmpty)

request.ConsumerGroup = "foobar"
testEncodable(t, "with string", request, consumerMetadataRequestString)
testRequest(t, "with string", request, consumerMetadataRequestString)
}
8 changes: 4 additions & 4 deletions encoder_decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,15 @@ type encoder interface {
}

// Encode takes an Encoder and turns it into bytes.
func encode(in encoder) ([]byte, error) {
if in == nil {
func encode(e encoder) ([]byte, error) {
if e == nil {
return nil, nil
}

var prepEnc prepEncoder
var realEnc realEncoder

err := in.encode(&prepEnc)
err := e.encode(&prepEnc)
if err != nil {
return nil, err
}
Expand All @@ -27,7 +27,7 @@ func encode(in encoder) ([]byte, error) {
}

realEnc.raw = make([]byte, prepEnc.length)
err = in.encode(&realEnc)
err = e.encode(&realEnc)
if err != nil {
return nil, err
}
Expand Down
53 changes: 53 additions & 0 deletions fetch_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,16 @@ func (f *fetchRequestBlock) encode(pe packetEncoder) error {
return nil
}

func (f *fetchRequestBlock) decode(pd packetDecoder) (err error) {
if f.fetchOffset, err = pd.getInt64(); err != nil {
return err
}
if f.maxBytes, err = pd.getInt32(); err != nil {
return err
}
return nil
}

type FetchRequest struct {
MaxWaitTime int32
MinBytes int32
Expand Down Expand Up @@ -45,6 +55,49 @@ func (f *FetchRequest) encode(pe packetEncoder) (err error) {
return nil
}

func (f *FetchRequest) decode(pd packetDecoder) (err error) {
if _, err = pd.getInt32(); err != nil {
return err
}
if f.MaxWaitTime, err = pd.getInt32(); err != nil {
return err
}
if f.MinBytes, err = pd.getInt32(); err != nil {
return err
}
topicCount, err := pd.getArrayLength()
if err != nil {
return err
}
if topicCount == 0 {
return nil
}
f.blocks = make(map[string]map[int32]*fetchRequestBlock)
for i := 0; i < topicCount; i++ {
topic, err := pd.getString()
if err != nil {
return err
}
partitionCount, err := pd.getArrayLength()
if err != nil {
return err
}
f.blocks[topic] = make(map[int32]*fetchRequestBlock)
for j := 0; j < partitionCount; j++ {
partition, err := pd.getInt32()
if err != nil {
return err
}
fetchBlock := &fetchRequestBlock{}
if err = fetchBlock.decode(pd); err != nil {
return nil
}
f.blocks[topic][partition] = fetchBlock
}
}
return nil
}

func (f *FetchRequest) key() int16 {
return 1
}
Expand Down
6 changes: 3 additions & 3 deletions fetch_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@ var (

func TestFetchRequest(t *testing.T) {
request := new(FetchRequest)
testEncodable(t, "no blocks", request, fetchRequestNoBlocks)
testRequest(t, "no blocks", request, fetchRequestNoBlocks)

request.MaxWaitTime = 0x20
request.MinBytes = 0xEF
testEncodable(t, "with properties", request, fetchRequestWithProperties)
testRequest(t, "with properties", request, fetchRequestWithProperties)

request.MaxWaitTime = 0
request.MinBytes = 0
request.AddBlock("topic", 0x12, 0x34, 0x56)
testEncodable(t, "one block", request, fetchRequestOneBlock)
testRequest(t, "one block", request, fetchRequestOneBlock)
}
20 changes: 20 additions & 0 deletions metadata_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,26 @@ func (mr *MetadataRequest) encode(pe packetEncoder) error {
return nil
}

func (mr *MetadataRequest) decode(pd packetDecoder) error {
topicCount, err := pd.getArrayLength()
if err != nil {
return err
}
if topicCount == 0 {
return nil
}

mr.Topics = make([]string, topicCount)
for i := range mr.Topics {
topic, err := pd.getString()
if err != nil {
return err
}
mr.Topics[i] = topic
}
return nil
}

func (mr *MetadataRequest) key() int16 {
return 3
}
Expand Down
6 changes: 3 additions & 3 deletions metadata_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ var (

func TestMetadataRequest(t *testing.T) {
request := new(MetadataRequest)
testEncodable(t, "no topics", request, metadataRequestNoTopics)
testRequest(t, "no topics", request, metadataRequestNoTopics)

request.Topics = []string{"topic1"}
testEncodable(t, "one topic", request, metadataRequestOneTopic)
testRequest(t, "one topic", request, metadataRequestOneTopic)

request.Topics = []string{"foo", "bar", "baz"}
testEncodable(t, "three topics", request, metadataRequestThreeTopics)
testRequest(t, "three topics", request, metadataRequestThreeTopics)
}
66 changes: 66 additions & 0 deletions offset_commit_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,19 @@ func (r *offsetCommitRequestBlock) encode(pe packetEncoder, version int16) error
return pe.putString(r.metadata)
}

func (r *offsetCommitRequestBlock) decode(pd packetDecoder, version int16) (err error) {
if r.offset, err = pd.getInt64(); err != nil {
return err
}
if version == 1 {
if r.timestamp, err = pd.getInt64(); err != nil {
return err
}
}
r.metadata, err = pd.getString()
return err
}

type OffsetCommitRequest struct {
ConsumerGroup string
ConsumerGroupGeneration int32 // v1 or later
Expand Down Expand Up @@ -85,6 +98,59 @@ func (r *OffsetCommitRequest) encode(pe packetEncoder) error {
return nil
}

func (r *OffsetCommitRequest) decode(pd packetDecoder) (err error) {
if r.ConsumerGroup, err = pd.getString(); err != nil {
return err
}

if r.Version >= 1 {
if r.ConsumerGroupGeneration, err = pd.getInt32(); err != nil {
return err
}
if r.ConsumerID, err = pd.getString(); err != nil {
return err
}
}

if r.Version >= 2 {
if r.RetentionTime, err = pd.getInt64(); err != nil {
return err
}
}

topicCount, err := pd.getArrayLength()
if err != nil {
return err
}
if topicCount == 0 {
return nil
}
r.blocks = make(map[string]map[int32]*offsetCommitRequestBlock)
for i := 0; i < topicCount; i++ {
topic, err := pd.getString()
if err != nil {
return err
}
partitionCount, err := pd.getArrayLength()
if err != nil {
return err
}
r.blocks[topic] = make(map[int32]*offsetCommitRequestBlock)
for j := 0; j < partitionCount; j++ {
partition, err := pd.getInt32()
if err != nil {
return err
}
block := &offsetCommitRequestBlock{}
if err := block.decode(pd, r.Version); err != nil {
return err
}
r.blocks[topic][partition] = block
}
}
return nil
}

func (r *OffsetCommitRequest) key() int16 {
return 8
}
Expand Down
12 changes: 6 additions & 6 deletions offset_commit_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,10 @@ func TestOffsetCommitRequestV0(t *testing.T) {
request := new(OffsetCommitRequest)
request.Version = 0
request.ConsumerGroup = "foobar"
testEncodable(t, "no blocks v0", request, offsetCommitRequestNoBlocksV0)
testRequest(t, "no blocks v0", request, offsetCommitRequestNoBlocksV0)

request.AddBlock("topic", 0x5221, 0xDEADBEEF, 0, "metadata")
testEncodable(t, "one block v0", request, offsetCommitRequestOneBlockV0)
testRequest(t, "one block v0", request, offsetCommitRequestOneBlockV0)
}

func TestOffsetCommitRequestV1(t *testing.T) {
Expand All @@ -70,10 +70,10 @@ func TestOffsetCommitRequestV1(t *testing.T) {
request.ConsumerID = "cons"
request.ConsumerGroupGeneration = 0x1122
request.Version = 1
testEncodable(t, "no blocks v1", request, offsetCommitRequestNoBlocksV1)
testRequest(t, "no blocks v1", request, offsetCommitRequestNoBlocksV1)

request.AddBlock("topic", 0x5221, 0xDEADBEEF, ReceiveTime, "metadata")
testEncodable(t, "one block v1", request, offsetCommitRequestOneBlockV1)
testRequest(t, "one block v1", request, offsetCommitRequestOneBlockV1)
}

func TestOffsetCommitRequestV2(t *testing.T) {
Expand All @@ -83,8 +83,8 @@ func TestOffsetCommitRequestV2(t *testing.T) {
request.ConsumerGroupGeneration = 0x1122
request.RetentionTime = 0x4433
request.Version = 2
testEncodable(t, "no blocks v2", request, offsetCommitRequestNoBlocksV2)
testRequest(t, "no blocks v2", request, offsetCommitRequestNoBlocksV2)

request.AddBlock("topic", 0x5221, 0xDEADBEEF, 0, "metadata")
testEncodable(t, "one block v2", request, offsetCommitRequestOneBlockV2)
testRequest(t, "one block v2", request, offsetCommitRequestOneBlockV2)
}
26 changes: 26 additions & 0 deletions offset_fetch_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,32 @@ func (r *OffsetFetchRequest) encode(pe packetEncoder) (err error) {
return nil
}

func (r *OffsetFetchRequest) decode(pd packetDecoder) (err error) {
if r.ConsumerGroup, err = pd.getString(); err != nil {
return err
}
partitionCount, err := pd.getArrayLength()
if err != nil {
return err
}
if partitionCount == 0 {
return nil
}
r.partitions = make(map[string][]int32)
for i := 0; i < partitionCount; i++ {
topic, err := pd.getString()
if err != nil {
return err
}
partitions, err := pd.getInt32Array()
if err != nil {
return err
}
r.partitions[topic] = partitions
}
return nil
}

func (r *OffsetFetchRequest) key() int16 {
return 9
}
Expand Down
6 changes: 3 additions & 3 deletions offset_fetch_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ var (

func TestOffsetFetchRequest(t *testing.T) {
request := new(OffsetFetchRequest)
testEncodable(t, "no group, no partitions", request, offsetFetchRequestNoGroupNoPartitions)
testRequest(t, "no group, no partitions", request, offsetFetchRequestNoGroupNoPartitions)

request.ConsumerGroup = "blah"
testEncodable(t, "no partitions", request, offsetFetchRequestNoPartitions)
testRequest(t, "no partitions", request, offsetFetchRequestNoPartitions)

request.AddPartition("topicTheFirst", 0x4F4F4F4F)
testEncodable(t, "one partition", request, offsetFetchRequestOnePartition)
testRequest(t, "one partition", request, offsetFetchRequestOnePartition)
}
Loading

0 comments on commit c1d582e

Please sign in to comment.