diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index cd89862ab8..5c038aa53f 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -1494,6 +1494,10 @@ func (p *partitionProducer) Flush() error { } func (p *partitionProducer) FlushWithCtx(ctx context.Context) error { + if p.getProducerState() != producerReady { + return ErrProducerClosed + } + flushReq := &flushRequest{ doneCh: make(chan struct{}), err: nil, diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go index 0e38d637bf..5b23182d5d 100644 --- a/pulsar/producer_test.go +++ b/pulsar/producer_test.go @@ -523,6 +523,21 @@ func TestFlushInPartitionedProducer(t *testing.T) { assert.Equal(t, msgCount, numOfMessages/2) } +func TestProducerReturnsErrorOnFlushWhenClosed(t *testing.T) { + client, err := NewClient(ClientOptions{URL: serviceURL}) + assert.NoError(t, err) + defer client.Close() + + producer, err := client.CreateProducer(ProducerOptions{Topic: newTopicName()}) + assert.NoError(t, err) + assert.NotNil(t, producer) + + producer.Close() + + err = producer.FlushWithCtx(context.Background()) + assert.Error(t, err) +} + func TestRoundRobinRouterPartitionedProducer(t *testing.T) { topicName := "public/default/partition-testRoundRobinRouterPartitionedProducer" numberOfPartitions := 5