Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve] [broker] Improve cache handling for partitioned topic metadata when doing lookup #21063

Merged

Conversation

poorbarcode
Copy link
Contributor

@poorbarcode poorbarcode commented Aug 24, 2023

Motivation

If we set allowAutoTopicCreationType to PARTITIONED, the flow of the create topic progress is like the below:

  1. Client-side: Lookup topic to get partitioned topic metadata to create a producer.
  2. Broker-side: Create partitioned topic metadata.
  3. Broker-side: response {"partitions":3}.
  4. Client-side: Create separate connections for each partition of the topic.
  5. Broker-side: Receive 3 connect requests and create 3 partition-topics.

In the step 2 above, the flow of the progress is like the below:

  1. Check the policy of topic auto-creation( the policy is {allowAutoTopicCreationType=PARTITIONED, defaultNumPartitions=3} )
  2. Check the partitioned topic metadata already exists.
  3. Try to create the partitioned topic metadata if it does not exist.
  4. If created failed by the partitioned topic metadata already exists( maybe another broker is also creating now), read partitioned topic metadata from the metadata store and respond to the client.

There is a race condition that makes the client get non-partitioned metadata of the topic:

time broker-1 broker-2
1 get policy: PARTITIONED, 3 get policy: PARTITIONED, 3
2 check the partitioned topic metadata already exists Check the partitioned topic metadata already exists
3 Partitioned topic metadata does not exist, the metadata cache will cache an empty optional for the path Partitioned topic metadata does not exist, the metadata cache will cache an empty optional for the path
4 succeed create the partitioned topic metadata
5 Receive a ZK node changed event to invalidate the cache of the partitioned topic metadata
6 Creating the metadata failed due to it already exists
7 Read the partitioned topic metadata again

If step-5 is executed later than step-7, broker-1 will get an empty optional from the cache of the partitioned topic metadata and respond non-partitioned metadata to the client.

What thing would make the step-5 is executed later than step-7?
Provide a scenario: Such as the issue that the PR #20303 fixed, it makes zk operation and zk node changed notifications executed in different threads: main-thread of ZK client and metadata store thread.

Therefore, the mechanism of the lookup partitioned topic metadata is fragile and we need to optimize it.


We hit the issue above when using the release: 2.10.3, the log is here

2023-08-24T09:29:57,212+0000 [pulsar-2-1] DEBUG org.apache.pulsar.broker.admin.impl.PersistentTopicsBase - Total number of partitions for topic persistent://public/default/TP_DEFAULT is 3
2023-08-24T09:29:57,318+0000 [pulsar-2-1] DEBUG org.apache.pulsar.broker.admin.impl.PersistentTopicsBase - Total number of partitions for topic persistent://public/default/TP_2_DEFAULT is 3
2023-08-24T09:30:00,443+0000 [pulsar-2-2] DEBUG org.apache.pulsar.broker.admin.impl.PersistentTopicsBase - Total number of partitions for topic persistent://public/default/TP_51 is 3
2023-08-24T09:30:00,539+0000 [pulsar-2-2] DEBUG org.apache.pulsar.broker.admin.impl.PersistentTopicsBase - Total number of partitions for topic persistent://public/default/TP_51 is 3
2023-08-24T09:30:00,588+0000 [pulsar-2-1] DEBUG org.apache.pulsar.broker.admin.impl.PersistentTopicsBase - Total number of partitions for topic persistent://public/default/TP_51 is 3
2023-08-24T09:30:00,627+0000 [main-EventThread] DEBUG org.apache.pulsar.broker.admin.impl.PersistentTopicsBase - Total number of partitions for topic persistent://public/default/TP_52 is 0
2023-08-24T09:30:00,629+0000 [pulsar-io-6-1] INFO  org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.6:55085][persistent://public/default/TP_52] Creating producer. producerId=401
2023-08-24T09:30:00,629+0000 [pulsar-io-6-1] INFO  org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - Opening managed ledger public/default/persistent/TP_52
2023-08-24T09:30:00,630+0000 [metadata-store-12-1] DEBUG org.apache.pulsar.broker.admin.impl.PersistentTopicsBase - Total number of partitions for topic persistent://public/default/TP_52 is 3
2023-08-24T09:30:00,630+0000 [pulsar-io-6-1] INFO  org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.6:37957][persistent://public/default/TP_52] Creating producer. producerId=411
2023-08-24T09:30:00,635+0000 [bookkeeper-ml-scheduler-OrderedScheduler-0-0] INFO  org.apache.bookkeeper.mledger.impl.MetaStoreImpl - Creating '/managed-ledgers/public/default/persistent/TP_52'
2023-08-24T09:30:00,642+0000 [main-EventThread] INFO  org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [public/default/persistent/TP_52] Creating ledger, metadata: {component=[...0], application=[...]} - metadata ops timeout : 60 seconds
2023-08-24T09:30:00,652+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] INFO  org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [public/default/persistent/TP_52] Created ledger 772115

When we create the topic public/default/persistent/TP_52, the broker responds to the client a non-partitioned topic metadata: Total number of partitions for topic persistent://public/default/TP_52 is 0


Modifications

Before reading the partitioned topic metadata again, refresh the cache first.

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository: x

@poorbarcode poorbarcode self-assigned this Aug 24, 2023
@poorbarcode poorbarcode added this to the 3.2.0 milestone Aug 24, 2023
@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Aug 24, 2023
@poorbarcode poorbarcode added the category/reliability The function does not work properly in certain specific environments or failures. e.g. data lost label Aug 24, 2023
@mattisonchao
Copy link
Member

mattisonchao commented Aug 25, 2023

It means we disabled the cache. Right? Could you please write a unit test to demonstrate it?

btw, it looks like a clean cache way still 100% ensures it, because zk allowed the client to read from a follower. maybe we can consider the sync method of metadata?

@poorbarcode
Copy link
Contributor Author

poorbarcode commented Aug 25, 2023

@mattisonchao

It means we disabled the cache. Right?

No, only refresh the cache if the write conflicts; the probability is very low.

截屏2023-08-25 14 44 30

Could you please write a unit test to demonstrate it?

Since the test is too hard to write, I wrote a simple test testFetchPartitionedTopicMetadataWithCacheRefresh to indicate the cache might be wrong.

btw, it looks like a clean cache way still 100% ensures it, because zk allowed the client to read from a follower. maybe we can consider the sync method of metadata?

The internal method BaseResources.refreshAndGetAsync will do sync first. see: https://github.com/apache/pulsar/blob/master/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java#L132-L137

Copy link
Contributor

@liangyepianzhou liangyepianzhou left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch!

@poorbarcode poorbarcode force-pushed the fix/partitioned_metadata_cache branch from dbd0e23 to f831ed0 Compare August 25, 2023 11:24
@poorbarcode poorbarcode merged commit d099ac4 into apache:master Aug 28, 2023
poorbarcode added a commit that referenced this pull request Aug 29, 2023
…ata when doing lookup (#21063)

Motivation: If we set `allowAutoTopicCreationType` to `PARTITIONED`, the flow of the create topic progress is like the below:
1. `Client-side`: Lookup topic to get partitioned topic metadata to create a producer.
1. `Broker-side`: Create partitioned topic metadata.
1. `Broker-side`: response `{"partitions":3}`.
1. `Client-side`: Create separate connections for each partition of the topic.
1. `Broker-side`: Receive 3 connect requests and create 3 partition-topics.

In the `step 2` above, the flow of the progress is like the below:
1. Check the policy of topic auto-creation( the policy is `{allowAutoTopicCreationType=PARTITIONED, defaultNumPartitions=3}` )
1. Check the partitioned topic metadata already exists.
1. Try to create the partitioned topic metadata if it does not exist.
1. If created failed by the partitioned topic metadata already exists( maybe another broker is also creating now), read partitioned topic metadata from the metadata store and respond to the client.

There is a race condition that makes the client get non-partitioned metadata of the topic:
| time | `broker-1` | `broker-2` |
| --- | --- | --- |
| 1 | get policy: `PARTITIONED, 3` | get policy: `PARTITIONED, 3` |
| 2 | check the partitioned topic metadata already exists | Check the partitioned topic metadata already exists |
| 3 | Partitioned topic metadata does not exist, the metadata cache will cache an empty optional for the path | Partitioned topic metadata does not exist, the metadata cache will cache an empty optional for the path |
| 4 |  | succeed create the partitioned topic metadata |
| 5 | Receive a ZK node changed event to invalidate the cache of the partitioned topic metadata |
| 6 | Creating the metadata failed due to it already exists |
| 7 | Read the partitioned topic metadata again |

If `step-5` is executed later than `step-7`, `broker-1` will get an empty optional from the cache of the partitioned topic metadata and respond non-partitioned metadata to the client.

**What thing would make the `step-5` is executed later than `step-7`?**
Provide a scenario: Such as the issue that the PR #20303 fixed, it makes `zk operation` and `zk node changed notifications`  executed in different threads: `main-thread of ZK client` and `metadata store thread`.

Therefore, the mechanism of the lookup partitioned topic metadata is fragile and we need to optimize it.

Modifications: Before reading the partitioned topic metadata again, refresh the cache first.
(cherry picked from commit d099ac4)
poorbarcode added a commit that referenced this pull request Aug 31, 2023
…ata when doing lookup (#21063)

Motivation: If we set `allowAutoTopicCreationType` to `PARTITIONED`, the flow of the create topic progress is like the below:
1. `Client-side`: Lookup topic to get partitioned topic metadata to create a producer.
1. `Broker-side`: Create partitioned topic metadata.
1. `Broker-side`: response `{"partitions":3}`.
1. `Client-side`: Create separate connections for each partition of the topic.
1. `Broker-side`: Receive 3 connect requests and create 3 partition-topics.

In the `step 2` above, the flow of the progress is like the below:
1. Check the policy of topic auto-creation( the policy is `{allowAutoTopicCreationType=PARTITIONED, defaultNumPartitions=3}` )
1. Check the partitioned topic metadata already exists.
1. Try to create the partitioned topic metadata if it does not exist.
1. If created failed by the partitioned topic metadata already exists( maybe another broker is also creating now), read partitioned topic metadata from the metadata store and respond to the client.

There is a race condition that makes the client get non-partitioned metadata of the topic:
| time | `broker-1` | `broker-2` |
| --- | --- | --- |
| 1 | get policy: `PARTITIONED, 3` | get policy: `PARTITIONED, 3` |
| 2 | check the partitioned topic metadata already exists | Check the partitioned topic metadata already exists |
| 3 | Partitioned topic metadata does not exist, the metadata cache will cache an empty optional for the path | Partitioned topic metadata does not exist, the metadata cache will cache an empty optional for the path |
| 4 |  | succeed create the partitioned topic metadata |
| 5 | Receive a ZK node changed event to invalidate the cache of the partitioned topic metadata |
| 6 | Creating the metadata failed due to it already exists |
| 7 | Read the partitioned topic metadata again |

If `step-5` is executed later than `step-7`, `broker-1` will get an empty optional from the cache of the partitioned topic metadata and respond non-partitioned metadata to the client.

**What thing would make the `step-5` is executed later than `step-7`?**
Provide a scenario: Such as the issue that the PR #20303 fixed, it makes `zk operation` and `zk node changed notifications`  executed in different threads: `main-thread of ZK client` and `metadata store thread`.

Therefore, the mechanism of the lookup partitioned topic metadata is fragile and we need to optimize it.

Modifications: Before reading the partitioned topic metadata again, refresh the cache first.
(cherry picked from commit d099ac4)
poorbarcode added a commit that referenced this pull request Aug 31, 2023
…ata when doing lookup (#21063)

Motivation: If we set `allowAutoTopicCreationType` to `PARTITIONED`, the flow of the create topic progress is like the below:
1. `Client-side`: Lookup topic to get partitioned topic metadata to create a producer.
1. `Broker-side`: Create partitioned topic metadata.
1. `Broker-side`: response `{"partitions":3}`.
1. `Client-side`: Create separate connections for each partition of the topic.
1. `Broker-side`: Receive 3 connect requests and create 3 partition-topics.

In the `step 2` above, the flow of the progress is like the below:
1. Check the policy of topic auto-creation( the policy is `{allowAutoTopicCreationType=PARTITIONED, defaultNumPartitions=3}` )
1. Check the partitioned topic metadata already exists.
1. Try to create the partitioned topic metadata if it does not exist.
1. If created failed by the partitioned topic metadata already exists( maybe another broker is also creating now), read partitioned topic metadata from the metadata store and respond to the client.

There is a race condition that makes the client get non-partitioned metadata of the topic:
| time | `broker-1` | `broker-2` |
| --- | --- | --- |
| 1 | get policy: `PARTITIONED, 3` | get policy: `PARTITIONED, 3` |
| 2 | check the partitioned topic metadata already exists | Check the partitioned topic metadata already exists |
| 3 | Partitioned topic metadata does not exist, the metadata cache will cache an empty optional for the path | Partitioned topic metadata does not exist, the metadata cache will cache an empty optional for the path |
| 4 |  | succeed create the partitioned topic metadata |
| 5 | Receive a ZK node changed event to invalidate the cache of the partitioned topic metadata |
| 6 | Creating the metadata failed due to it already exists |
| 7 | Read the partitioned topic metadata again |

If `step-5` is executed later than `step-7`, `broker-1` will get an empty optional from the cache of the partitioned topic metadata and respond non-partitioned metadata to the client.

**What thing would make the `step-5` is executed later than `step-7`?**
Provide a scenario: Such as the issue that the PR #20303 fixed, it makes `zk operation` and `zk node changed notifications`  executed in different threads: `main-thread of ZK client` and `metadata store thread`.

Therefore, the mechanism of the lookup partitioned topic metadata is fragile and we need to optimize it.

Modifications: Before reading the partitioned topic metadata again, refresh the cache first.
(cherry picked from commit d099ac4)
Demogorgon314 pushed a commit to streamnative/pulsar-archived that referenced this pull request Dec 14, 2023
…ata when doing lookup (apache#21063)

Motivation: If we set `allowAutoTopicCreationType` to `PARTITIONED`, the flow of the create topic progress is like the below:
1. `Client-side`: Lookup topic to get partitioned topic metadata to create a producer.
1. `Broker-side`: Create partitioned topic metadata.
1. `Broker-side`: response `{"partitions":3}`.
1. `Client-side`: Create separate connections for each partition of the topic.
1. `Broker-side`: Receive 3 connect requests and create 3 partition-topics.

In the `step 2` above, the flow of the progress is like the below:
1. Check the policy of topic auto-creation( the policy is `{allowAutoTopicCreationType=PARTITIONED, defaultNumPartitions=3}` )
1. Check the partitioned topic metadata already exists.
1. Try to create the partitioned topic metadata if it does not exist.
1. If created failed by the partitioned topic metadata already exists( maybe another broker is also creating now), read partitioned topic metadata from the metadata store and respond to the client.

There is a race condition that makes the client get non-partitioned metadata of the topic:
| time | `broker-1` | `broker-2` |
| --- | --- | --- |
| 1 | get policy: `PARTITIONED, 3` | get policy: `PARTITIONED, 3` |
| 2 | check the partitioned topic metadata already exists | Check the partitioned topic metadata already exists |
| 3 | Partitioned topic metadata does not exist, the metadata cache will cache an empty optional for the path | Partitioned topic metadata does not exist, the metadata cache will cache an empty optional for the path |
| 4 |  | succeed create the partitioned topic metadata |
| 5 | Receive a ZK node changed event to invalidate the cache of the partitioned topic metadata |
| 6 | Creating the metadata failed due to it already exists |
| 7 | Read the partitioned topic metadata again |

If `step-5` is executed later than `step-7`, `broker-1` will get an empty optional from the cache of the partitioned topic metadata and respond non-partitioned metadata to the client.

**What thing would make the `step-5` is executed later than `step-7`?**
Provide a scenario: Such as the issue that the PR apache#20303 fixed, it makes `zk operation` and `zk node changed notifications`  executed in different threads: `main-thread of ZK client` and `metadata store thread`.

Therefore, the mechanism of the lookup partitioned topic metadata is fragile and we need to optimize it.

Modifications: Before reading the partitioned topic metadata again, refresh the cache first.
(cherry picked from commit d099ac4)
mukesh-ctds pushed a commit to datastax/pulsar that referenced this pull request Apr 15, 2024
…ata when doing lookup (apache#21063)

Motivation: If we set `allowAutoTopicCreationType` to `PARTITIONED`, the flow of the create topic progress is like the below:
1. `Client-side`: Lookup topic to get partitioned topic metadata to create a producer.
1. `Broker-side`: Create partitioned topic metadata.
1. `Broker-side`: response `{"partitions":3}`.
1. `Client-side`: Create separate connections for each partition of the topic.
1. `Broker-side`: Receive 3 connect requests and create 3 partition-topics.

In the `step 2` above, the flow of the progress is like the below:
1. Check the policy of topic auto-creation( the policy is `{allowAutoTopicCreationType=PARTITIONED, defaultNumPartitions=3}` )
1. Check the partitioned topic metadata already exists.
1. Try to create the partitioned topic metadata if it does not exist.
1. If created failed by the partitioned topic metadata already exists( maybe another broker is also creating now), read partitioned topic metadata from the metadata store and respond to the client.

There is a race condition that makes the client get non-partitioned metadata of the topic:
| time | `broker-1` | `broker-2` |
| --- | --- | --- |
| 1 | get policy: `PARTITIONED, 3` | get policy: `PARTITIONED, 3` |
| 2 | check the partitioned topic metadata already exists | Check the partitioned topic metadata already exists |
| 3 | Partitioned topic metadata does not exist, the metadata cache will cache an empty optional for the path | Partitioned topic metadata does not exist, the metadata cache will cache an empty optional for the path |
| 4 |  | succeed create the partitioned topic metadata |
| 5 | Receive a ZK node changed event to invalidate the cache of the partitioned topic metadata |
| 6 | Creating the metadata failed due to it already exists |
| 7 | Read the partitioned topic metadata again |

If `step-5` is executed later than `step-7`, `broker-1` will get an empty optional from the cache of the partitioned topic metadata and respond non-partitioned metadata to the client.

**What thing would make the `step-5` is executed later than `step-7`?**
Provide a scenario: Such as the issue that the PR apache#20303 fixed, it makes `zk operation` and `zk node changed notifications`  executed in different threads: `main-thread of ZK client` and `metadata store thread`.

Therefore, the mechanism of the lookup partitioned topic metadata is fragile and we need to optimize it.

Modifications: Before reading the partitioned topic metadata again, refresh the cache first.
(cherry picked from commit d099ac4)
(cherry picked from commit 2e534d2)
mukesh-ctds pushed a commit to datastax/pulsar that referenced this pull request Apr 19, 2024
…ata when doing lookup (apache#21063)

Motivation: If we set `allowAutoTopicCreationType` to `PARTITIONED`, the flow of the create topic progress is like the below:
1. `Client-side`: Lookup topic to get partitioned topic metadata to create a producer.
1. `Broker-side`: Create partitioned topic metadata.
1. `Broker-side`: response `{"partitions":3}`.
1. `Client-side`: Create separate connections for each partition of the topic.
1. `Broker-side`: Receive 3 connect requests and create 3 partition-topics.

In the `step 2` above, the flow of the progress is like the below:
1. Check the policy of topic auto-creation( the policy is `{allowAutoTopicCreationType=PARTITIONED, defaultNumPartitions=3}` )
1. Check the partitioned topic metadata already exists.
1. Try to create the partitioned topic metadata if it does not exist.
1. If created failed by the partitioned topic metadata already exists( maybe another broker is also creating now), read partitioned topic metadata from the metadata store and respond to the client.

There is a race condition that makes the client get non-partitioned metadata of the topic:
| time | `broker-1` | `broker-2` |
| --- | --- | --- |
| 1 | get policy: `PARTITIONED, 3` | get policy: `PARTITIONED, 3` |
| 2 | check the partitioned topic metadata already exists | Check the partitioned topic metadata already exists |
| 3 | Partitioned topic metadata does not exist, the metadata cache will cache an empty optional for the path | Partitioned topic metadata does not exist, the metadata cache will cache an empty optional for the path |
| 4 |  | succeed create the partitioned topic metadata |
| 5 | Receive a ZK node changed event to invalidate the cache of the partitioned topic metadata |
| 6 | Creating the metadata failed due to it already exists |
| 7 | Read the partitioned topic metadata again |

If `step-5` is executed later than `step-7`, `broker-1` will get an empty optional from the cache of the partitioned topic metadata and respond non-partitioned metadata to the client.

**What thing would make the `step-5` is executed later than `step-7`?**
Provide a scenario: Such as the issue that the PR apache#20303 fixed, it makes `zk operation` and `zk node changed notifications`  executed in different threads: `main-thread of ZK client` and `metadata store thread`.

Therefore, the mechanism of the lookup partitioned topic metadata is fragile and we need to optimize it.

Modifications: Before reading the partitioned topic metadata again, refresh the cache first.
(cherry picked from commit d099ac4)
(cherry picked from commit 2e534d2)
srinath-ctds pushed a commit to datastax/pulsar that referenced this pull request Apr 23, 2024
…ata when doing lookup (apache#21063)

Motivation: If we set `allowAutoTopicCreationType` to `PARTITIONED`, the flow of the create topic progress is like the below:
1. `Client-side`: Lookup topic to get partitioned topic metadata to create a producer.
1. `Broker-side`: Create partitioned topic metadata.
1. `Broker-side`: response `{"partitions":3}`.
1. `Client-side`: Create separate connections for each partition of the topic.
1. `Broker-side`: Receive 3 connect requests and create 3 partition-topics.

In the `step 2` above, the flow of the progress is like the below:
1. Check the policy of topic auto-creation( the policy is `{allowAutoTopicCreationType=PARTITIONED, defaultNumPartitions=3}` )
1. Check the partitioned topic metadata already exists.
1. Try to create the partitioned topic metadata if it does not exist.
1. If created failed by the partitioned topic metadata already exists( maybe another broker is also creating now), read partitioned topic metadata from the metadata store and respond to the client.

There is a race condition that makes the client get non-partitioned metadata of the topic:
| time | `broker-1` | `broker-2` |
| --- | --- | --- |
| 1 | get policy: `PARTITIONED, 3` | get policy: `PARTITIONED, 3` |
| 2 | check the partitioned topic metadata already exists | Check the partitioned topic metadata already exists |
| 3 | Partitioned topic metadata does not exist, the metadata cache will cache an empty optional for the path | Partitioned topic metadata does not exist, the metadata cache will cache an empty optional for the path |
| 4 |  | succeed create the partitioned topic metadata |
| 5 | Receive a ZK node changed event to invalidate the cache of the partitioned topic metadata |
| 6 | Creating the metadata failed due to it already exists |
| 7 | Read the partitioned topic metadata again |

If `step-5` is executed later than `step-7`, `broker-1` will get an empty optional from the cache of the partitioned topic metadata and respond non-partitioned metadata to the client.

**What thing would make the `step-5` is executed later than `step-7`?**
Provide a scenario: Such as the issue that the PR apache#20303 fixed, it makes `zk operation` and `zk node changed notifications`  executed in different threads: `main-thread of ZK client` and `metadata store thread`.

Therefore, the mechanism of the lookup partitioned topic metadata is fragile and we need to optimize it.

Modifications: Before reading the partitioned topic metadata again, refresh the cache first.
(cherry picked from commit d099ac4)
(cherry picked from commit 2e534d2)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
category/reliability The function does not work properly in certain specific environments or failures. e.g. data lost cherry-picked/branch-2.10 cherry-picked/branch-2.11 cherry-picked/branch-3.0 doc-not-needed Your PR changes do not impact docs release/2.10.6 release/2.11.3 release/3.0.2
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants