Skip to content

Commit

Permalink
fix mq test
Browse files Browse the repository at this point in the history
  • Loading branch information
CharlesCheung96 committed May 18, 2022
1 parent 7649bda commit 5c54e1a
Show file tree
Hide file tree
Showing 15 changed files with 304 additions and 152 deletions.
24 changes: 24 additions & 0 deletions cdc/sink/mq/dispatcher/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package dispatcher

import (
"testing"

"github.com/pingcap/tiflow/pkg/leakutil"
)

func TestMain(m *testing.M) {
leakutil.SetUpLeakTest(m)
}
24 changes: 24 additions & 0 deletions cdc/sink/mq/dispatcher/partition/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package partition

import (
"testing"

"github.com/pingcap/tiflow/pkg/leakutil"
)

func TestMain(m *testing.M) {
leakutil.SetUpLeakTest(m)
}
24 changes: 24 additions & 0 deletions cdc/sink/mq/dispatcher/topic/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package topic

import (
"testing"

"github.com/pingcap/tiflow/pkg/leakutil"
)

func TestMain(m *testing.M) {
leakutil.SetUpLeakTest(m)
}
24 changes: 24 additions & 0 deletions cdc/sink/mq/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package mq

import (
"testing"

"github.com/pingcap/tiflow/pkg/leakutil"
)

func TestMain(m *testing.M) {
leakutil.SetUpLeakTest(m)
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package kafka
package manager

import (
"fmt"
Expand All @@ -28,8 +28,8 @@ import (
"go.uber.org/zap"
)

// TopicManager is a manager for kafka topics.
type TopicManager struct {
// kafkaTopicManager is a manager for kafka topics.
type kafkaTopicManager struct {
client kafka.Client
admin kafka.ClusterAdminClient

Expand All @@ -40,13 +40,13 @@ type TopicManager struct {
lastMetadataRefresh atomic.Int64
}

// NewTopicManager creates a new topic manager.
func NewTopicManager(
// NewKafkaTopicManager creates a new topic manager.
func NewKafkaTopicManager(
client kafka.Client,
admin kafka.ClusterAdminClient,
cfg *kafkaconfig.AutoCreateTopicConfig,
) *TopicManager {
return &TopicManager{
) *kafkaTopicManager {
return &kafkaTopicManager{
client: client,
admin: admin,
cfg: cfg,
Expand All @@ -55,7 +55,7 @@ func NewTopicManager(

// GetPartitionNum returns the number of partitions of the topic.
// It may also try to update the topics' information maintained by manager.
func (m *TopicManager) GetPartitionNum(topic string) (int32, error) {
func (m *kafkaTopicManager) GetPartitionNum(topic string) (int32, error) {
err := m.tryRefreshMeta()
if err != nil {
return 0, errors.Trace(err)
Expand All @@ -69,7 +69,7 @@ func (m *TopicManager) GetPartitionNum(topic string) (int32, error) {
}

// tryRefreshMeta try to refresh the topics' information maintained by manager.
func (m *TopicManager) tryRefreshMeta() error {
func (m *kafkaTopicManager) tryRefreshMeta() error {
if time.Since(time.Unix(m.lastMetadataRefresh.Load(), 0)) > time.Minute {
topics, err := m.client.Topics()
if err != nil {
Expand All @@ -90,7 +90,7 @@ func (m *TopicManager) tryRefreshMeta() error {
}

// tryUpdatePartitionsAndLogging try to update the partitions of the topic.
func (m *TopicManager) tryUpdatePartitionsAndLogging(topic string, partitions int32) {
func (m *kafkaTopicManager) tryUpdatePartitionsAndLogging(topic string, partitions int32) {
oldPartitions, ok := m.topics.Load(topic)
if ok {
if oldPartitions.(int32) != partitions {
Expand All @@ -114,7 +114,7 @@ func (m *TopicManager) tryUpdatePartitionsAndLogging(topic string, partitions in

// CreateTopic creates a topic with the given name
// and returns the number of partitions.
func (m *TopicManager) CreateTopic(topicName string) (int32, error) {
func (m *kafkaTopicManager) CreateTopic(topicName string) (int32, error) {
start := time.Now()
topics, err := m.admin.ListTopics()
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package kafka
package manager

import (
"testing"
Expand All @@ -36,7 +36,7 @@ func TestPartitions(t *testing.T) {
ReplicationFactor: 1,
}

manager := NewTopicManager(client, adminClient, cfg)
manager := NewKafkaTopicManager(client, adminClient, cfg)
partitionsNum, err := manager.GetPartitionNum(
kafkamock.DefaultMockTopicName)
require.Nil(t, err)
Expand All @@ -57,7 +57,7 @@ func TestTryRefreshMeta(t *testing.T) {
ReplicationFactor: 1,
}

manager := NewTopicManager(client, adminClient, cfg)
manager := NewKafkaTopicManager(client, adminClient, cfg)
partitionsNum, err := manager.GetPartitionNum(
kafkamock.DefaultMockTopicName)
require.Nil(t, err)
Expand Down Expand Up @@ -92,7 +92,7 @@ func TestCreateTopic(t *testing.T) {
ReplicationFactor: 1,
}

manager := NewTopicManager(client, adminClient, cfg)
manager := NewKafkaTopicManager(client, adminClient, cfg)
partitionNum, err := manager.CreateTopic(kafkamock.DefaultMockTopicName)
require.Nil(t, err)
require.Equal(t, int32(3), partitionNum)
Expand All @@ -106,7 +106,7 @@ func TestCreateTopic(t *testing.T) {

// Try to create a topic without auto create.
cfg.AutoCreate = false
manager = NewTopicManager(client, adminClient, cfg)
manager = NewKafkaTopicManager(client, adminClient, cfg)
_, err = manager.CreateTopic("new-topic2")
require.Regexp(
t,
Expand Down
24 changes: 24 additions & 0 deletions cdc/sink/mq/manager/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package manager

import (
"testing"

"github.com/pingcap/tiflow/pkg/leakutil"
)

func TestMain(m *testing.M) {
leakutil.SetUpLeakTest(m)
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,30 +11,30 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package pulsar
package manager

// TopicManager is the interface
// pulsarTopicManager is the interface
// that wraps the basic Pulsar topic management operations.
// Right now it doesn't have any implementation,
// Pulsar doesn't support multiple topics yet.
// So it now just returns a fixed number of partitions for a fixed topic.
type TopicManager struct {
type pulsarTopicManager struct {
partitionNum int32
}

// NewTopicManager creates a new TopicManager.
func NewTopicManager(partitionNum int32) *TopicManager {
return &TopicManager{
// NewPulsarTopicManager creates a new TopicManager.
func NewPulsarTopicManager(partitionNum int32) *pulsarTopicManager {
return &pulsarTopicManager{
partitionNum: partitionNum,
}
}

// GetPartitionNum returns the number of partitions of the topic.
func (m *TopicManager) GetPartitionNum(_ string) (int32, error) {
func (m *pulsarTopicManager) GetPartitionNum(_ string) (int32, error) {
return m.partitionNum, nil
}

// CreateTopic do nothing.
func (m *TopicManager) CreateTopic(_ string) (int32, error) {
func (m *pulsarTopicManager) CreateTopic(_ string) (int32, error) {
return m.partitionNum, nil
}
6 changes: 2 additions & 4 deletions cdc/sink/mq/mq.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@ import (
"github.com/pingcap/tiflow/cdc/sink/metrics"
"github.com/pingcap/tiflow/cdc/sink/mq/dispatcher"
"github.com/pingcap/tiflow/cdc/sink/mq/manager"
kafkamanager "github.com/pingcap/tiflow/cdc/sink/mq/manager/kafka"
pulsarmanager "github.com/pingcap/tiflow/cdc/sink/mq/manager/pulsar"
"github.com/pingcap/tiflow/cdc/sink/mq/producer"
"github.com/pingcap/tiflow/cdc/sink/mq/producer/kafka"
"github.com/pingcap/tiflow/cdc/sink/mq/producer/pulsar"
Expand Down Expand Up @@ -431,7 +429,7 @@ func NewKafkaSaramaSink(ctx context.Context, sinkURI *url.URL,
return nil, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
}

topicManager := kafkamanager.NewTopicManager(
topicManager := manager.NewKafkaTopicManager(
client,
adminClient,
baseConfig.DeriveTopicConfig(),
Expand Down Expand Up @@ -500,7 +498,7 @@ func NewPulsarSink(ctx context.Context, sinkURI *url.URL, filter *filter.Filter,
if err != nil {
return nil, errors.Trace(err)
}
fakeTopicManager := pulsarmanager.NewTopicManager(
fakeTopicManager := manager.NewPulsarTopicManager(
producer.GetPartitionNum(),
)
sink, err := newMqSink(
Expand Down
Loading

0 comments on commit 5c54e1a

Please sign in to comment.