Skip to content

Commit

Permalink
Export createTopic
Browse files Browse the repository at this point in the history
Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>
  • Loading branch information
dimitarvdimitrov committed Dec 4, 2024
1 parent 7341c2a commit 9fd6bdb
Show file tree
Hide file tree
Showing 4 changed files with 9 additions and 9 deletions.
2 changes: 1 addition & 1 deletion pkg/storage/ingest/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func (r *PartitionReader) EstimatedBytesPerRecord() int64 {

func (r *PartitionReader) start(ctx context.Context) (returnErr error) {
if r.kafkaCfg.AutoCreateTopicEnabled {
if err := createTopic(r.kafkaCfg, r.logger); err != nil {
if err := CreateTopic(r.kafkaCfg, r.logger); err != nil {
return err
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/ingest/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,9 @@ func (w *resultPromise[T]) wait(ctx context.Context) (T, error) {
}
}

// createTopic creates the topic in the Kafka cluster. If creating the topic fails, then an error is returned.
// CreateTopic creates the topic in the Kafka cluster. If creating the topic fails, then an error is returned.
// If the topic already exists, then the function logs a message and returns nil.
func createTopic(cfg KafkaConfig, logger log.Logger) error {
func CreateTopic(cfg KafkaConfig, logger log.Logger) error {
logger = log.With(logger, "task", "autocreate_topic")

cl, err := kgo.NewClient(commonKafkaClientOptions(cfg, nil, logger)...)
Expand Down
10 changes: 5 additions & 5 deletions pkg/storage/ingest/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func TestCreateTopic(t *testing.T) {

logger := log.NewNopLogger()

require.NoError(t, createTopic(cfg, logger))
require.NoError(t, CreateTopic(cfg, logger))
})

t.Run("should return an error if the request fails", func(t *testing.T) {
Expand All @@ -193,7 +193,7 @@ func TestCreateTopic(t *testing.T) {

logger := log.NewNopLogger()

require.NoError(t, createTopic(cfg, logger))
require.NoError(t, CreateTopic(cfg, logger))
})

t.Run("should return an error if the request succeed but the response contains an error", func(t *testing.T) {
Expand Down Expand Up @@ -230,7 +230,7 @@ func TestCreateTopic(t *testing.T) {

logger := log.NewNopLogger()

require.NoError(t, createTopic(cfg, logger))
require.NoError(t, CreateTopic(cfg, logger))
})

t.Run("should not return error when topic already exists", func(t *testing.T) {
Expand All @@ -245,9 +245,9 @@ func TestCreateTopic(t *testing.T) {
)

// First call should create the topic
assert.NoError(t, createTopic(cfg, logger))
assert.NoError(t, CreateTopic(cfg, logger))

// Second call should succeed because topic already exists
assert.NoError(t, createTopic(cfg, logger))
assert.NoError(t, CreateTopic(cfg, logger))
})
}
2 changes: 1 addition & 1 deletion pkg/storage/ingest/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func NewWriter(kafkaCfg KafkaConfig, logger log.Logger, reg prometheus.Registere

func (w *Writer) starting(_ context.Context) error {
if w.kafkaCfg.AutoCreateTopicEnabled {
return createTopic(w.kafkaCfg, w.logger)
return CreateTopic(w.kafkaCfg, w.logger)
}
return nil
}
Expand Down

0 comments on commit 9fd6bdb

Please sign in to comment.