Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix] [broker] Make specified producer could override the previous one #21155

Conversation

poorbarcode
Copy link
Contributor

@poorbarcode poorbarcode commented Sep 11, 2023

Motivation

2023-09-12T07:17:05,388+0000 [pulsar-web-37-4] INFO  org.apache.pulsar.broker.service.Producer - Disconnecting producer: Producer{topic=PersistentTopic{topic=persistent://public/default/tp1}, client=/127.0.0.6:36433, producerName=st-0-5, producerId=87}
2023-09-12T07:38:55,707+0000 [pulsar-io-4-1] INFO  org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.6:37543][persistent://public/default/tp1] Creating producer. producerId=87
2023-09-12T07:38:55,707+0000 [pulsar-io-4-1] WARN  org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.6:37543] Failed to add producer to topic persistent://public/default/tp1: producerId=87, Producer with name 'st-0-5' is already connected to topic
2023-09-12T07:38:55,916+0000 [pulsar-io-4-1] INFO  org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.6:37543][persistent://public/default/tp1] Creating producer. producerId=87
2023-09-12T07:38:55,916+0000 [pulsar-io-4-1] WARN  org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.6:37543] Failed to add producer to topic persistent://public/default/tp1: producerId=87, Producer with name 'st-0-5' is already connected to topic
...
...
...
2023-09-12T07:39:47,395+0000 [pulsar-io-4-1] INFO  org.apache.pulsar.broker.service.ServerCnx - Closed connection from /127.0.0.6:36433
2023-09-12T07:39:47,395+0000 [pulsar-io-4-1] WARN  org.apache.pulsar.common.protocol.PulsarHandler - [[id: 0x0f0484da, L:/10.48.3.67:6652 - R:/127.0.0.6:36433]] Forcing connection to close after keep-alive timeout
2023-09-12T07:40:12,161+0000 [pulsar-io-4-1] INFO  org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.6:37543][persistent://public/default/tp1] Creating producer. producerId=87
2023-09-12T07:40:12,161+0000 [pulsar-io-4-1] INFO  org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.6:37543] Created new producer: Producer{topic=PersistentTopic{topic=persistent://public/default/tp1}, client=/127.0.0.6:37543, producerName=st-0-5, producerId=87}

Issue 1

The client assumed the connection was inactive, but the Broker assumed the connection was fine. The Client tried to use a new connection to reconnect a producer, then got an error Producer with name 'st-0-5' is already connected to topic.

Issue 2

  • In a connection, the second connection waits for the first connection to complete[1]. But there is a bug that causes this mechanism to fail[2].
  • If a producer uses a default name, the second registration will override the first one. But it can not override the first one if it uses a specified producer name[3]. I think this mechanism is to prevent a client from creating two producers with the same name. However, method Producer.isSuccessorTo has checked the producer-id, and the producer-id of multiple producers created by the same client are different. So this mechanism can be deleted.

[1]
https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java#L1398-L1423

[2]
https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java#L1397-L1425

CompletableFuture<Producer> existingProducerFuture = producers.putIfAbsent(producerId, producerFuture);

if (existingProducerFuture != null) {
    if (existingProducerFuture.isDone() && !existingProducerFuture.isCompletedExceptionally()) {
        // step-1: If the existing producer's future is done and succeded responds success
       sendProducerSuccessResponse(...);
       return null;
    } else {
        if (!existingProducerFuture.isDone()) {
             // step-2: If the existing producer's future is not, responds fail
            error = ServerError.ServiceNotReady;
            sendErrorResponse(...);
            return null;
        } else {          
             // step-3: else: responds fail
            producers.remove(producerId, existingProducerFuture);
            sendErrorResponse(...);
            return null;
        }
    }
}

If the future is successful after the check step-1, the pulsar will remove a completed future from serverCnc.producers.

[3]
https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java#L983-L999

private void tryOverwriteOldProducer(Producer oldProducer, Producer newProducer)
            throws BrokerServiceException {
    if (newProducer.isSuccessorTo(oldProducer) && !isUserProvidedProducerName(oldProducer)
            && !isUserProvidedProducerName(newProducer)) {
        oldProducer.close(false);
        if (!producers.replace(newProducer.getProducerName(), oldProducer, newProducer)) {
            // Met concurrent update, throw exception here so that client can try reconnect later.
            throw new BrokerServiceException.NamingException("Producer with name '" + newProducer.getProducerName()
                    + "' replace concurrency error");
        } else {
            handleProducerRemoved(oldProducer);
        }
    } else {
        throw new BrokerServiceException.NamingException(
                "Producer with name '" + newProducer.getProducerName() + "' is already connected to topic");
    }
}

Modifications

  • For issue 1: If a producer with the same name tries to use a new connection, async checks the old connection is available. The producers related to the connection that is not available are automatically cleaned up.

  • For issue 2:

    • Fix the bug that causes a complete producer future will be removed from ServerCnx.
    • Remove the mechanism that prevents a producer with a specified name from overriding the previous producer.

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository: x

@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Sep 11, 2023
@poorbarcode poorbarcode self-assigned this Sep 11, 2023
@poorbarcode poorbarcode added release/3.0.2 release/2.11.3 release/2.10.6 category/reliability The function does not work properly in certain specific environments or failures. e.g. data lost labels Sep 11, 2023
@poorbarcode poorbarcode added this to the 3.2.0 milestone Sep 11, 2023
@codelipenghui
Copy link
Contributor

If the question from this comment is not a problem, I think this fix will work because cnx and producer ID is enough to determine the producer instances from one logic producer or not.

@poorbarcode
Copy link
Contributor Author

@codelipenghui

If the question from this #21144 (comment) is not a problem, I think this fix will work because cnx and producer ID is enough to determine the producer instances from one logic producer or not.

I added a new mechanism: If a producer with the same name tries to use a new connection, Pulsar will async check whether the old connection is available. The producers will be automatically cleaned up if the old connection is unavailable.

@poorbarcode poorbarcode merged commit bda16b6 into apache:master Sep 13, 2023
44 of 45 checks passed
poorbarcode added a commit that referenced this pull request Sep 14, 2023
#21155)

The client assumed the connection was inactive, but the Broker assumed the connection was fine. The Client tried to  use a new connection to reconnect a producer, then got an error `Producer with name 'st-0-5' is already connected to topic`.

- In a connection, the second connection waits for the first connection to complete\. But there is a bug that causes this mechanism to fail\.
- If a producer uses a default name, the second registration will override the first one. But it can not override the first one if it uses a specified producer name\. I think this mechanism is to prevent a client from creating two producers with the same name. However, method `Producer.isSuccessorTo` has checked the `producer-id`, and the `producer-id` of multiple producers created by the same client are different. So this mechanism can be deleted.

- For `issue 1`: If a producer with the same name tries to use a new connection, async checks the old connection is available. The producers related to the connection that is not available are automatically cleaned up.

- For `issue 2`:
  -  Fix the bug that causes a complete producer future will be removed from `ServerCnx`.
  - Remove the mechanism that prevents a producer with a specified name from overriding the previous producer.

(cherry picked from commit bda16b6)
@poorbarcode
Copy link
Contributor Author

I want to cherry-pick the tool method Servercnx.checkConnectionLiveness( introduced in the PR
#20026) into branch-2.10 and branch-2.11 along with the current PR.

See the discussion for details.

poorbarcode added a commit that referenced this pull request Sep 18, 2023
#21155)

The client assumed the connection was inactive, but the Broker assumed the connection was fine. The Client tried to  use a new connection to reconnect a producer, then got an error `Producer with name 'st-0-5' is already connected to topic`.

- In a connection, the second connection waits for the first connection to complete\. But there is a bug that causes this mechanism to fail\.
- If a producer uses a default name, the second registration will override the first one. But it can not override the first one if it uses a specified producer name\. I think this mechanism is to prevent a client from creating two producers with the same name. However, method `Producer.isSuccessorTo` has checked the `producer-id`, and the `producer-id` of multiple producers created by the same client are different. So this mechanism can be deleted.

- For `issue 1`: If a producer with the same name tries to use a new connection, async checks the old connection is available. The producers related to the connection that is not available are automatically cleaned up.

- For `issue 2`:
  -  Fix the bug that causes a complete producer future will be removed from `ServerCnx`.
  - Remove the mechanism that prevents a producer with a specified name from overriding the previous producer.

(cherry picked from commit bda16b6)
poorbarcode added a commit that referenced this pull request Sep 18, 2023
#21155)

The client assumed the connection was inactive, but the Broker assumed the connection was fine. The Client tried to  use a new connection to reconnect a producer, then got an error `Producer with name 'st-0-5' is already connected to topic`.

- In a connection, the second connection waits for the first connection to complete\. But there is a bug that causes this mechanism to fail\.
- If a producer uses a default name, the second registration will override the first one. But it can not override the first one if it uses a specified producer name\. I think this mechanism is to prevent a client from creating two producers with the same name. However, method `Producer.isSuccessorTo` has checked the `producer-id`, and the `producer-id` of multiple producers created by the same client are different. So this mechanism can be deleted.

- For `issue 1`: If a producer with the same name tries to use a new connection, async checks the old connection is available. The producers related to the connection that is not available are automatically cleaned up.

- For `issue 2`:
  -  Fix the bug that causes a complete producer future will be removed from `ServerCnx`.
  - Remove the mechanism that prevents a producer with a specified name from overriding the previous producer.

(cherry picked from commit bda16b6)
@poorbarcode
Copy link
Contributor Author

I want to cherry-pick the tool method Servercnx.checkConnectionLiveness( introduced in the PR
#20026) into branch-2.10 and branch-2.11 along with the current PR.
See the discussion for details.

Cherry-picked

// If a producer with the same name tries to use a new connection, async check the old connection is
// available. The producers related the connection that not available are automatically cleaned up.
if (!Objects.equals(oldProducer.getCnx(), newProducer.getCnx())) {
oldProducer.getCnx().checkConnectionLiveness();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This returns a future that can be used to prevent unnecessary exceptions. Did you consider wiring that up?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@BewareMyPower
Copy link
Contributor

I think this PR should be cherry-picked into branch-3.1, right?

poorbarcode added a commit that referenced this pull request Oct 16, 2023
#21155)

The client assumed the connection was inactive, but the Broker assumed the connection was fine. The Client tried to  use a new connection to reconnect a producer, then got an error `Producer with name 'st-0-5' is already connected to topic`.

- In a connection, the second connection waits for the first connection to complete\. But there is a bug that causes this mechanism to fail\.
- If a producer uses a default name, the second registration will override the first one. But it can not override the first one if it uses a specified producer name\. I think this mechanism is to prevent a client from creating two producers with the same name. However, method `Producer.isSuccessorTo` has checked the `producer-id`, and the `producer-id` of multiple producers created by the same client are different. So this mechanism can be deleted.

- For `issue 1`: If a producer with the same name tries to use a new connection, async checks the old connection is available. The producers related to the connection that is not available are automatically cleaned up.

- For `issue 2`:
  -  Fix the bug that causes a complete producer future will be removed from `ServerCnx`.
  - Remove the mechanism that prevents a producer with a specified name from overriding the previous producer.

(cherry picked from commit bda16b6)
@poorbarcode
Copy link
Contributor Author

I think this PR should be cherry-picked into branch-3.1, right?

Done

poorbarcode added a commit that referenced this pull request Oct 30, 2023
…ne faster (#21183)

### Motivation

There is an issue similar to the #21155 fixed one.

The client assumed the connection was inactive, but the Broker assumed the connection was fine. The Client tried to  use a new connection to reconnect an exclusive consumer, then got an error `Exclusive consumer is already connected`

### Modifications

- Check the connection of the old consumer is available when the new one tries to subscribe
poorbarcode added a commit that referenced this pull request Oct 31, 2023
…ne faster (#21183)

### Motivation

There is an issue similar to the #21155 fixed one.

The client assumed the connection was inactive, but the Broker assumed the connection was fine. The Client tried to  use a new connection to reconnect an exclusive consumer, then got an error `Exclusive consumer is already connected`

### Modifications

- Check the connection of the old consumer is available when the new one tries to subscribe

(cherry picked from commit 29db8f8)
poorbarcode added a commit that referenced this pull request Oct 31, 2023
…ne faster (#21183)

There is an issue similar to the #21155 fixed one.

The client assumed the connection was inactive, but the Broker assumed the connection was fine. The Client tried to  use a new connection to reconnect an exclusive consumer, then got an error `Exclusive consumer is already connected`

- Check the connection of the old consumer is available when the new one tries to subscribe

(cherry picked from commit 29db8f8)
poorbarcode added a commit that referenced this pull request Oct 31, 2023
…ne faster (#21183)

There is an issue similar to the #21155 fixed one.

The client assumed the connection was inactive, but the Broker assumed the connection was fine. The Client tried to  use a new connection to reconnect an exclusive consumer, then got an error `Exclusive consumer is already connected`

- Check the connection of the old consumer is available when the new one tries to subscribe

(cherry picked from commit 29db8f8)
nikhil-ctds pushed a commit to datastax/pulsar that referenced this pull request Dec 20, 2023
apache#21155)

The client assumed the connection was inactive, but the Broker assumed the connection was fine. The Client tried to  use a new connection to reconnect a producer, then got an error `Producer with name 'st-0-5' is already connected to topic`.

- In a connection, the second connection waits for the first connection to complete\. But there is a bug that causes this mechanism to fail\.
- If a producer uses a default name, the second registration will override the first one. But it can not override the first one if it uses a specified producer name\. I think this mechanism is to prevent a client from creating two producers with the same name. However, method `Producer.isSuccessorTo` has checked the `producer-id`, and the `producer-id` of multiple producers created by the same client are different. So this mechanism can be deleted.

- For `issue 1`: If a producer with the same name tries to use a new connection, async checks the old connection is available. The producers related to the connection that is not available are automatically cleaned up.

- For `issue 2`:
  -  Fix the bug that causes a complete producer future will be removed from `ServerCnx`.
  - Remove the mechanism that prevents a producer with a specified name from overriding the previous producer.

(cherry picked from commit bda16b6)
srinath-ctds pushed a commit to datastax/pulsar that referenced this pull request Dec 20, 2023
apache#21155)

The client assumed the connection was inactive, but the Broker assumed the connection was fine. The Client tried to  use a new connection to reconnect a producer, then got an error `Producer with name 'st-0-5' is already connected to topic`.

- In a connection, the second connection waits for the first connection to complete\. But there is a bug that causes this mechanism to fail\.
- If a producer uses a default name, the second registration will override the first one. But it can not override the first one if it uses a specified producer name\. I think this mechanism is to prevent a client from creating two producers with the same name. However, method `Producer.isSuccessorTo` has checked the `producer-id`, and the `producer-id` of multiple producers created by the same client are different. So this mechanism can be deleted.

- For `issue 1`: If a producer with the same name tries to use a new connection, async checks the old connection is available. The producers related to the connection that is not available are automatically cleaned up.

- For `issue 2`:
  -  Fix the bug that causes a complete producer future will be removed from `ServerCnx`.
  - Remove the mechanism that prevents a producer with a specified name from overriding the previous producer.

(cherry picked from commit bda16b6)
mukesh-ctds pushed a commit to datastax/pulsar that referenced this pull request Apr 15, 2024
…ne faster (apache#21183)

There is an issue similar to the apache#21155 fixed one.

The client assumed the connection was inactive, but the Broker assumed the connection was fine. The Client tried to  use a new connection to reconnect an exclusive consumer, then got an error `Exclusive consumer is already connected`

- Check the connection of the old consumer is available when the new one tries to subscribe

(cherry picked from commit 29db8f8)
(cherry picked from commit b796f56)
mukesh-ctds pushed a commit to datastax/pulsar that referenced this pull request Apr 15, 2024
…ne faster (apache#21183)

There is an issue similar to the apache#21155 fixed one.

The client assumed the connection was inactive, but the Broker assumed the connection was fine. The Client tried to  use a new connection to reconnect an exclusive consumer, then got an error `Exclusive consumer is already connected`

- Check the connection of the old consumer is available when the new one tries to subscribe

(cherry picked from commit 29db8f8)
(cherry picked from commit b796f56)
mukesh-ctds pushed a commit to datastax/pulsar that referenced this pull request Apr 17, 2024
…ne faster (apache#21183)

There is an issue similar to the apache#21155 fixed one.

The client assumed the connection was inactive, but the Broker assumed the connection was fine. The Client tried to  use a new connection to reconnect an exclusive consumer, then got an error `Exclusive consumer is already connected`

- Check the connection of the old consumer is available when the new one tries to subscribe

(cherry picked from commit 29db8f8)
(cherry picked from commit b796f56)
mukesh-ctds pushed a commit to datastax/pulsar that referenced this pull request Apr 17, 2024
…ne faster (apache#21183)

There is an issue similar to the apache#21155 fixed one.

The client assumed the connection was inactive, but the Broker assumed the connection was fine. The Client tried to  use a new connection to reconnect an exclusive consumer, then got an error `Exclusive consumer is already connected`

- Check the connection of the old consumer is available when the new one tries to subscribe

(cherry picked from commit 29db8f8)
(cherry picked from commit b796f56)
mukesh-ctds pushed a commit to datastax/pulsar that referenced this pull request Apr 19, 2024
…ne faster (apache#21183)

There is an issue similar to the apache#21155 fixed one.

The client assumed the connection was inactive, but the Broker assumed the connection was fine. The Client tried to  use a new connection to reconnect an exclusive consumer, then got an error `Exclusive consumer is already connected`

- Check the connection of the old consumer is available when the new one tries to subscribe

(cherry picked from commit 29db8f8)
(cherry picked from commit b796f56)
srinath-ctds pushed a commit to datastax/pulsar that referenced this pull request Apr 23, 2024
…ne faster (apache#21183)

There is an issue similar to the apache#21155 fixed one.

The client assumed the connection was inactive, but the Broker assumed the connection was fine. The Client tried to  use a new connection to reconnect an exclusive consumer, then got an error `Exclusive consumer is already connected`

- Check the connection of the old consumer is available when the new one tries to subscribe

(cherry picked from commit 29db8f8)
(cherry picked from commit b796f56)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants