Skip to content

Commit

Permalink
Make mock brokers and protocol packets available for outsider
Browse files Browse the repository at this point in the history
  • Loading branch information
horkhe committed Feb 3, 2016
1 parent 4ba9bba commit 201ab73
Show file tree
Hide file tree
Showing 8 changed files with 215 additions and 212 deletions.
66 changes: 33 additions & 33 deletions async_producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,8 @@ func (f flakyEncoder) Encode() ([]byte, error) {
}

func TestAsyncProducer(t *testing.T) {
seedBroker := newMockBroker(t, 1)
leader := newMockBroker(t, 2)
seedBroker := NewMockBroker(t, 1)
leader := NewMockBroker(t, 2)

metadataResponse := new(MetadataResponse)
metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
Expand Down Expand Up @@ -140,8 +140,8 @@ func TestAsyncProducer(t *testing.T) {
}

func TestAsyncProducerMultipleFlushes(t *testing.T) {
seedBroker := newMockBroker(t, 1)
leader := newMockBroker(t, 2)
seedBroker := NewMockBroker(t, 1)
leader := NewMockBroker(t, 2)

metadataResponse := new(MetadataResponse)
metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
Expand Down Expand Up @@ -175,9 +175,9 @@ func TestAsyncProducerMultipleFlushes(t *testing.T) {
}

func TestAsyncProducerMultipleBrokers(t *testing.T) {
seedBroker := newMockBroker(t, 1)
leader0 := newMockBroker(t, 2)
leader1 := newMockBroker(t, 3)
seedBroker := NewMockBroker(t, 1)
leader0 := NewMockBroker(t, 2)
leader1 := NewMockBroker(t, 3)

metadataResponse := new(MetadataResponse)
metadataResponse.AddBroker(leader0.Addr(), leader0.BrokerID())
Expand Down Expand Up @@ -215,8 +215,8 @@ func TestAsyncProducerMultipleBrokers(t *testing.T) {
}

func TestAsyncProducerCustomPartitioner(t *testing.T) {
seedBroker := newMockBroker(t, 1)
leader := newMockBroker(t, 2)
seedBroker := NewMockBroker(t, 1)
leader := NewMockBroker(t, 2)

metadataResponse := new(MetadataResponse)
metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
Expand Down Expand Up @@ -257,9 +257,9 @@ func TestAsyncProducerCustomPartitioner(t *testing.T) {
}

func TestAsyncProducerFailureRetry(t *testing.T) {
seedBroker := newMockBroker(t, 1)
leader1 := newMockBroker(t, 2)
leader2 := newMockBroker(t, 3)
seedBroker := NewMockBroker(t, 1)
leader1 := NewMockBroker(t, 2)
leader2 := NewMockBroker(t, 3)

metadataLeader1 := new(MetadataResponse)
metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID())
Expand Down Expand Up @@ -305,8 +305,8 @@ func TestAsyncProducerFailureRetry(t *testing.T) {
}

func TestAsyncProducerEncoderFailures(t *testing.T) {
seedBroker := newMockBroker(t, 1)
leader := newMockBroker(t, 2)
seedBroker := NewMockBroker(t, 1)
leader := NewMockBroker(t, 2)

metadataResponse := new(MetadataResponse)
metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
Expand Down Expand Up @@ -344,8 +344,8 @@ func TestAsyncProducerEncoderFailures(t *testing.T) {
// producer reconnects to it and continues sending messages.
func TestAsyncProducerBrokerBounce(t *testing.T) {
// Given
seedBroker := newMockBroker(t, 1)
leader := newMockBroker(t, 2)
seedBroker := NewMockBroker(t, 1)
leader := NewMockBroker(t, 2)
leaderAddr := leader.Addr()

metadataResponse := new(MetadataResponse)
Expand All @@ -370,7 +370,7 @@ func TestAsyncProducerBrokerBounce(t *testing.T) {

// When: a broker connection gets reset by a broker (network glitch, restart, you name it).
leader.Close() // producer should get EOF
leader = newMockBrokerAddr(t, 2, leaderAddr) // start it up again right away for giggles
leader = NewMockBrokerAddr(t, 2, leaderAddr) // start it up again right away for giggles
seedBroker.Returns(metadataResponse) // tell it to go to broker 2 again

// Then: a produced message goes through the new broker connection.
Expand All @@ -384,9 +384,9 @@ func TestAsyncProducerBrokerBounce(t *testing.T) {
}

func TestAsyncProducerBrokerBounceWithStaleMetadata(t *testing.T) {
seedBroker := newMockBroker(t, 1)
leader1 := newMockBroker(t, 2)
leader2 := newMockBroker(t, 3)
seedBroker := NewMockBroker(t, 1)
leader1 := NewMockBroker(t, 2)
leader2 := NewMockBroker(t, 3)

metadataLeader1 := new(MetadataResponse)
metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID())
Expand Down Expand Up @@ -427,9 +427,9 @@ func TestAsyncProducerBrokerBounceWithStaleMetadata(t *testing.T) {
}

func TestAsyncProducerMultipleRetries(t *testing.T) {
seedBroker := newMockBroker(t, 1)
leader1 := newMockBroker(t, 2)
leader2 := newMockBroker(t, 3)
seedBroker := NewMockBroker(t, 1)
leader1 := NewMockBroker(t, 2)
leader2 := NewMockBroker(t, 3)

metadataLeader1 := new(MetadataResponse)
metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID())
Expand Down Expand Up @@ -484,8 +484,8 @@ func TestAsyncProducerMultipleRetries(t *testing.T) {
func TestAsyncProducerOutOfRetries(t *testing.T) {
t.Skip("Enable once bug #294 is fixed.")

seedBroker := newMockBroker(t, 1)
leader := newMockBroker(t, 2)
seedBroker := NewMockBroker(t, 1)
leader := NewMockBroker(t, 2)

metadataResponse := new(MetadataResponse)
metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
Expand Down Expand Up @@ -539,8 +539,8 @@ func TestAsyncProducerOutOfRetries(t *testing.T) {
}

func TestAsyncProducerRetryWithReferenceOpen(t *testing.T) {
seedBroker := newMockBroker(t, 1)
leader := newMockBroker(t, 2)
seedBroker := NewMockBroker(t, 1)
leader := NewMockBroker(t, 2)
leaderAddr := leader.Addr()

metadataResponse := new(MetadataResponse)
Expand Down Expand Up @@ -575,7 +575,7 @@ func TestAsyncProducerRetryWithReferenceOpen(t *testing.T) {

// reboot the broker (the producer will get EOF on its existing connection)
leader.Close()
leader = newMockBrokerAddr(t, 2, leaderAddr)
leader = NewMockBrokerAddr(t, 2, leaderAddr)

// send another message on partition 0 to trigger the EOF and retry
producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
Expand All @@ -596,8 +596,8 @@ func TestAsyncProducerRetryWithReferenceOpen(t *testing.T) {
}

func TestAsyncProducerFlusherRetryCondition(t *testing.T) {
seedBroker := newMockBroker(t, 1)
leader := newMockBroker(t, 2)
seedBroker := NewMockBroker(t, 1)
leader := NewMockBroker(t, 2)

metadataResponse := new(MetadataResponse)
metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
Expand Down Expand Up @@ -638,7 +638,7 @@ func TestAsyncProducerFlusherRetryCondition(t *testing.T) {
time.Sleep(50 * time.Millisecond)

leader.SetHandlerByMap(map[string]MockResponse{
"ProduceRequest": newMockProduceResponse(t).
"ProduceRequest": NewMockProduceResponse(t).
SetError("my_topic", 0, ErrNoError),
})

Expand All @@ -661,8 +661,8 @@ func TestAsyncProducerFlusherRetryCondition(t *testing.T) {
}

func TestAsyncProducerRetryShutdown(t *testing.T) {
seedBroker := newMockBroker(t, 1)
leader := newMockBroker(t, 2)
seedBroker := NewMockBroker(t, 1)
leader := NewMockBroker(t, 2)

metadataLeader := new(MetadataResponse)
metadataLeader.AddBroker(leader.Addr(), leader.BrokerID())
Expand Down
2 changes: 1 addition & 1 deletion broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func TestBrokerAccessors(t *testing.T) {
}

func TestSimpleBrokerCommunication(t *testing.T) {
mb := newMockBroker(t, 0)
mb := NewMockBroker(t, 0)
defer mb.Close()

broker := NewBroker(mb.Addr())
Expand Down
54 changes: 27 additions & 27 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ func safeClose(t testing.TB, c io.Closer) {
}

func TestSimpleClient(t *testing.T) {
seedBroker := newMockBroker(t, 1)
seedBroker := NewMockBroker(t, 1)

seedBroker.Returns(new(MetadataResponse))

Expand All @@ -29,7 +29,7 @@ func TestSimpleClient(t *testing.T) {
}

func TestCachedPartitions(t *testing.T) {
seedBroker := newMockBroker(t, 1)
seedBroker := NewMockBroker(t, 1)

replicas := []int32{3, 1, 5}
isr := []int32{5, 1}
Expand Down Expand Up @@ -68,7 +68,7 @@ func TestCachedPartitions(t *testing.T) {
}

func TestClientDoesntCachePartitionsForTopicsWithErrors(t *testing.T) {
seedBroker := newMockBroker(t, 1)
seedBroker := NewMockBroker(t, 1)

replicas := []int32{seedBroker.BrokerID()}

Expand Down Expand Up @@ -122,7 +122,7 @@ func TestClientDoesntCachePartitionsForTopicsWithErrors(t *testing.T) {
}

func TestClientSeedBrokers(t *testing.T) {
seedBroker := newMockBroker(t, 1)
seedBroker := NewMockBroker(t, 1)

metadataResponse := new(MetadataResponse)
metadataResponse.AddBroker("localhost:12345", 2)
Expand All @@ -138,8 +138,8 @@ func TestClientSeedBrokers(t *testing.T) {
}

func TestClientMetadata(t *testing.T) {
seedBroker := newMockBroker(t, 1)
leader := newMockBroker(t, 5)
seedBroker := NewMockBroker(t, 1)
leader := NewMockBroker(t, 5)

replicas := []int32{3, 1, 5}
isr := []int32{5, 1}
Expand Down Expand Up @@ -202,8 +202,8 @@ func TestClientMetadata(t *testing.T) {
}

func TestClientGetOffset(t *testing.T) {
seedBroker := newMockBroker(t, 1)
leader := newMockBroker(t, 2)
seedBroker := NewMockBroker(t, 1)
leader := NewMockBroker(t, 2)
leaderAddr := leader.Addr()

metadata := new(MetadataResponse)
Expand Down Expand Up @@ -231,7 +231,7 @@ func TestClientGetOffset(t *testing.T) {
leader.Close()
seedBroker.Returns(metadata)

leader = newMockBrokerAddr(t, 2, leaderAddr)
leader = NewMockBrokerAddr(t, 2, leaderAddr)
offsetResponse = new(OffsetResponse)
offsetResponse.AddTopicPartition("foo", 0, 456)
leader.Returns(offsetResponse)
Expand All @@ -250,7 +250,7 @@ func TestClientGetOffset(t *testing.T) {
}

func TestClientReceivingUnknownTopic(t *testing.T) {
seedBroker := newMockBroker(t, 1)
seedBroker := NewMockBroker(t, 1)

metadataResponse1 := new(MetadataResponse)
seedBroker.Returns(metadataResponse1)
Expand Down Expand Up @@ -286,8 +286,8 @@ func TestClientReceivingUnknownTopic(t *testing.T) {
}

func TestClientReceivingPartialMetadata(t *testing.T) {
seedBroker := newMockBroker(t, 1)
leader := newMockBroker(t, 5)
seedBroker := NewMockBroker(t, 1)
leader := NewMockBroker(t, 5)

metadataResponse1 := new(MetadataResponse)
metadataResponse1.AddBroker(leader.Addr(), leader.BrokerID())
Expand Down Expand Up @@ -339,8 +339,8 @@ func TestClientReceivingPartialMetadata(t *testing.T) {
}

func TestClientRefreshBehaviour(t *testing.T) {
seedBroker := newMockBroker(t, 1)
leader := newMockBroker(t, 5)
seedBroker := NewMockBroker(t, 1)
leader := NewMockBroker(t, 5)

metadataResponse1 := new(MetadataResponse)
metadataResponse1.AddBroker(leader.Addr(), leader.BrokerID())
Expand Down Expand Up @@ -375,7 +375,7 @@ func TestClientRefreshBehaviour(t *testing.T) {
}

func TestClientResurrectDeadSeeds(t *testing.T) {
initialSeed := newMockBroker(t, 0)
initialSeed := NewMockBroker(t, 0)
emptyMetadata := new(MetadataResponse)
initialSeed.Returns(emptyMetadata)

Expand All @@ -390,9 +390,9 @@ func TestClientResurrectDeadSeeds(t *testing.T) {

client := c.(*client)

seed1 := newMockBroker(t, 1)
seed2 := newMockBroker(t, 2)
seed3 := newMockBroker(t, 3)
seed1 := NewMockBroker(t, 1)
seed2 := NewMockBroker(t, 2)
seed3 := NewMockBroker(t, 3)
addr1 := seed1.Addr()
addr2 := seed2.Addr()
addr3 := seed3.Addr()
Expand All @@ -413,8 +413,8 @@ func TestClientResurrectDeadSeeds(t *testing.T) {
seed1.Close()
seed2.Close()

seed1 = newMockBrokerAddr(t, 1, addr1)
seed2 = newMockBrokerAddr(t, 2, addr2)
seed1 = NewMockBrokerAddr(t, 1, addr1)
seed2 = NewMockBrokerAddr(t, 2, addr2)

seed3.Close()

Expand All @@ -434,9 +434,9 @@ func TestClientResurrectDeadSeeds(t *testing.T) {
}

func TestClientCoordinatorWithConsumerOffsetsTopic(t *testing.T) {
seedBroker := newMockBroker(t, 1)
staleCoordinator := newMockBroker(t, 2)
freshCoordinator := newMockBroker(t, 3)
seedBroker := NewMockBroker(t, 1)
staleCoordinator := NewMockBroker(t, 2)
freshCoordinator := NewMockBroker(t, 3)

replicas := []int32{staleCoordinator.BrokerID(), freshCoordinator.BrokerID()}
metadataResponse1 := new(MetadataResponse)
Expand Down Expand Up @@ -513,8 +513,8 @@ func TestClientCoordinatorWithConsumerOffsetsTopic(t *testing.T) {
}

func TestClientCoordinatorWithoutConsumerOffsetsTopic(t *testing.T) {
seedBroker := newMockBroker(t, 1)
coordinator := newMockBroker(t, 2)
seedBroker := NewMockBroker(t, 1)
coordinator := NewMockBroker(t, 2)

metadataResponse1 := new(MetadataResponse)
seedBroker.Returns(metadataResponse1)
Expand Down Expand Up @@ -566,7 +566,7 @@ func TestClientCoordinatorWithoutConsumerOffsetsTopic(t *testing.T) {
}

func TestClientAutorefreshShutdownRace(t *testing.T) {
seedBroker := newMockBroker(t, 1)
seedBroker := NewMockBroker(t, 1)

metadataResponse := new(MetadataResponse)
seedBroker.Returns(metadataResponse)
Expand Down Expand Up @@ -594,7 +594,7 @@ func TestClientAutorefreshShutdownRace(t *testing.T) {
time.Sleep(10 * time.Millisecond)

// Then return some metadata to the still-running background thread
leader := newMockBroker(t, 2)
leader := NewMockBroker(t, 2)
metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
metadataResponse.AddTopicPartition("foo", 0, leader.BrokerID(), []int32{2}, []int32{2}, ErrNoError)
seedBroker.Returns(metadataResponse)
Expand Down
Loading

0 comments on commit 201ab73

Please sign in to comment.