diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index e214afdd73507..30ab0d9ea83bd 100644 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -31,17 +31,16 @@ import ( "github.com/apache/pulsar-client-go/pulsar/internal/pb" ) -type producerState int - const ( - producerInit producerState = iota + // producer states + producerInit int32 = iota producerReady producerClosing producerClosed ) type partitionProducer struct { - state producerState + state int32 client *client topic string log *log.Entry @@ -107,7 +106,7 @@ func newPartitionProducer(client *client, topic string, options *ProducerOptions p.log = p.log.WithField("producer_name", p.producerName) p.log.WithField("cnx", p.cnx.ID()).Info("Created producer") - p.state = producerReady + atomic.StoreInt32(&p.state, producerReady) go p.runEventsLoop() @@ -181,7 +180,7 @@ func (p *partitionProducer) ConnectionClosed() { func (p *partitionProducer) reconnectToBroker() { backoff := internal.Backoff{} for { - if p.state != producerReady { + if atomic.LoadInt32(&p.state) != producerReady { // Producer is already closing return } @@ -439,11 +438,10 @@ func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt) func (p *partitionProducer) internalClose(req *closeProducer) { defer req.waitGroup.Done() - if p.state != producerReady { + if !atomic.CompareAndSwapInt32(&p.state, producerReady, producerClosing) { return } - p.state = producerClosing p.log.Info("Closing producer") id := p.client.rpcClient.NewRequestID() @@ -458,7 +456,7 @@ func (p *partitionProducer) internalClose(req *closeProducer) { p.log.Info("Closed producer") } - p.state = producerClosed + atomic.StoreInt32(&p.state, producerClosed) p.cnx.UnregisterListener(p.producerID) p.batchFlushTicker.Stop() } @@ -479,7 +477,7 @@ func (p *partitionProducer) Flush() error { } func (p *partitionProducer) Close() { - if p.state != producerReady { + if atomic.LoadInt32(&p.state) != producerReady { // Producer is closing return }