From 9fd6bdb32af9a3bb4a4aae98d3967967fed874f1 Mon Sep 17 00:00:00 2001 From: Dimitar Dimitrov Date: Wed, 4 Dec 2024 16:15:21 +0200 Subject: [PATCH] Export createTopic Signed-off-by: Dimitar Dimitrov --- pkg/storage/ingest/reader.go | 2 +- pkg/storage/ingest/util.go | 4 ++-- pkg/storage/ingest/util_test.go | 10 +++++----- pkg/storage/ingest/writer.go | 2 +- 4 files changed, 9 insertions(+), 9 deletions(-) diff --git a/pkg/storage/ingest/reader.go b/pkg/storage/ingest/reader.go index 998256c774..4f3a29dfea 100644 --- a/pkg/storage/ingest/reader.go +++ b/pkg/storage/ingest/reader.go @@ -175,7 +175,7 @@ func (r *PartitionReader) EstimatedBytesPerRecord() int64 { func (r *PartitionReader) start(ctx context.Context) (returnErr error) { if r.kafkaCfg.AutoCreateTopicEnabled { - if err := createTopic(r.kafkaCfg, r.logger); err != nil { + if err := CreateTopic(r.kafkaCfg, r.logger); err != nil { return err } } diff --git a/pkg/storage/ingest/util.go b/pkg/storage/ingest/util.go index 05b2846404..78ed565d0c 100644 --- a/pkg/storage/ingest/util.go +++ b/pkg/storage/ingest/util.go @@ -152,9 +152,9 @@ func (w *resultPromise[T]) wait(ctx context.Context) (T, error) { } } -// createTopic creates the topic in the Kafka cluster. If creating the topic fails, then an error is returned. +// CreateTopic creates the topic in the Kafka cluster. If creating the topic fails, then an error is returned. // If the topic already exists, then the function logs a message and returns nil. -func createTopic(cfg KafkaConfig, logger log.Logger) error { +func CreateTopic(cfg KafkaConfig, logger log.Logger) error { logger = log.With(logger, "task", "autocreate_topic") cl, err := kgo.NewClient(commonKafkaClientOptions(cfg, nil, logger)...) diff --git a/pkg/storage/ingest/util_test.go b/pkg/storage/ingest/util_test.go index 6db8d86e1a..e750f2ab13 100644 --- a/pkg/storage/ingest/util_test.go +++ b/pkg/storage/ingest/util_test.go @@ -175,7 +175,7 @@ func TestCreateTopic(t *testing.T) { logger := log.NewNopLogger() - require.NoError(t, createTopic(cfg, logger)) + require.NoError(t, CreateTopic(cfg, logger)) }) t.Run("should return an error if the request fails", func(t *testing.T) { @@ -193,7 +193,7 @@ func TestCreateTopic(t *testing.T) { logger := log.NewNopLogger() - require.NoError(t, createTopic(cfg, logger)) + require.NoError(t, CreateTopic(cfg, logger)) }) t.Run("should return an error if the request succeed but the response contains an error", func(t *testing.T) { @@ -230,7 +230,7 @@ func TestCreateTopic(t *testing.T) { logger := log.NewNopLogger() - require.NoError(t, createTopic(cfg, logger)) + require.NoError(t, CreateTopic(cfg, logger)) }) t.Run("should not return error when topic already exists", func(t *testing.T) { @@ -245,9 +245,9 @@ func TestCreateTopic(t *testing.T) { ) // First call should create the topic - assert.NoError(t, createTopic(cfg, logger)) + assert.NoError(t, CreateTopic(cfg, logger)) // Second call should succeed because topic already exists - assert.NoError(t, createTopic(cfg, logger)) + assert.NoError(t, CreateTopic(cfg, logger)) }) } diff --git a/pkg/storage/ingest/writer.go b/pkg/storage/ingest/writer.go index 556aa9b7ac..46d0bf131a 100644 --- a/pkg/storage/ingest/writer.go +++ b/pkg/storage/ingest/writer.go @@ -107,7 +107,7 @@ func NewWriter(kafkaCfg KafkaConfig, logger log.Logger, reg prometheus.Registere func (w *Writer) starting(_ context.Context) error { if w.kafkaCfg.AutoCreateTopicEnabled { - return createTopic(w.kafkaCfg, w.logger) + return CreateTopic(w.kafkaCfg, w.logger) } return nil }