Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][pip] Change cursor`s properties to store chunk ID map. #21027

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 128 additions & 0 deletions pip/pip-295.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@

# Background knowledge

In [PIP 37](https://github.com/apache/pulsar/wiki/PIP-37:-Large-message-size-handling-in-Pulsar), Pulsar introduced chunk messages to handle the large message. It will separate a large message into some chunks when the producer sends the significant message to the broker. On the consumer side, a consumer will wait to receive all the chunks of a message and then assemble them into a single chunk message before returning it.
In [PIP 6](https://github.com/apache/pulsar/wiki/PIP-6:-Guaranteed-Message-Deduplication), Pulsar introduced deduplication to make sure the messages sent by the producer are non-repeating.
In PIP 6, each producer will have a sequence ID that starts at 0 and increase for each message. The message with a lower sequence ID will be dropped in the broker.

# Motivation

In the earliest design, all the chunks in a single chunk message have the same sequence ID which causes the chunk message can not work when enabling deduplication. For example, we have a chunk message consisting of chunk-1 and chunk-2. When Broker receives chunk-1, it will update the last sequence ID to the sequence ID of chunk-1. And then, when the broker gets chunk-2, the chunk-2 will be dropped by depublication.
I opened a [PR](https://github.com/apache/pulsar/pull/20948) to resolve this case. It allowed the chunks of a single chunk message to use the same sequence ID and filter duplicated chunks in a single-chunk message on the consumer side.
It can resolve message duplication end to end, but the message duplication still exists in the topic.

# Goals

## In Scope
Chunk messages can be effectively filtered on the broker side. Ensure that chunk messages work normally after enabling deduplication and the topic has no duplicate chunks.
Copy link
Contributor

@poorbarcode poorbarcode Aug 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Background:*: There are two properties in the metadata of the cursor

[1]: a structure of properties:

properties: 
  - "producer_name_1" : {{last_persist_sequence_1}}
  - "producer_name_2" : {{last_persist_sequence_2}}

In this PIP, you want to change properties<String, Long> to properties<String, String>, right? Could you also explain this change here?


## Out of Scope



# High Level Design
Introduce a mechanism similar to [PIP 37](https://github.com/apache/pulsar/wiki/PIP-37:-Large-message-size-handling-in-Pulsar) to check the chunk ID.
For normal messages, we still only check sequence ID, but we will check both sequence ID and chunk ID for chunk messages.

# Detailed Design

## Design & Implementation Details

Add `chunkIDPushed` and `chunkIDPersisted` to store the chunk of each producer`s ongoing chunk messages. It will be used to check whether the chunks in a single message are duplicated.

```
@VisibleForTesting
final ConcurrentOpenHashMap<String, Integer> chunkIDPushed =
ConcurrentOpenHashMap.<String, Integer>newBuilder()
.expectedItems(16)
.concurrencyLevel(1)
.build();

@VisibleForTesting
final ConcurrentOpenHashMap<String, Integer> chunkIDPersisted =
ConcurrentOpenHashMap.<String, Integer>newBuilder()
.expectedItems(16)
.concurrencyLevel(1)
.build();
```

Optimize the `properties` of the `MarkDeleteEntry` from `Map<String, Long>` to `Map<String, String>`. In the depublication design, the ' MarkDeleteEntry' properties are used as a snapshot to store the sequence ID map. After introducing the chunk ID map, it cannot hold two long for each producer. So we hope to change the `MarkDeleteEntry' properties from `Map<String, Long>` to `Map<String, String>` to make it more flexible.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a demo to describe the structure of the attribute properties of cursor metadata that you wanted after this PIP?




## Public-facing Changes
None

### Public API
None

### Binary protocol
Add `repeated StringProperty markDeleteProperties = 9;` to replace the original `repeated LongProperty properties = 5;`.

### Configuration

### CLI

### Metrics


# Monitoring


# Security Considerations


# Backward & Forward Compatibility

## Revert
When reverting to the old version of Pulsar, the `ManagedCursorInfo` will not contain the properties(`repeated LongProperty properties = 5;`). Because the new version of pulsar use markDeleteProperties (`repeated StringProperty markDeleteProperties = 9;`) to record mark delete properties.
So It can only be reverted if losing many last persistent sequence ID data.

## Upgrade

Add an upgrade logic in `recover(final VoidCallback callback)`.
The original logic:
```java
Map<String, Long> recoveredProperties = Collections.emptyMap();
if (info.getPropertiesCount() > 0) {
// Recover properties map
recoveredProperties = new HashMap<>();
for (int i = 0; i < info.getPropertiesCount(); i++) {
LongProperty property = info.getProperties(i);
recoveredProperties.put(property.getName(), property.getValue());
}
}

recoveredCursor(recoveredPosition, recoveredProperties, recoveredCursorProperties, null);
```
Change to:
```java
// Recover properties map
Map<String, String> recoveredProperties;
if (info.getPropertiesCount() == 0 && info.getmarkDeletePropertiesCount() == 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The info means ManagedCursorInfo, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's true.

recoveredProperties = Collections.emptyMap();
} else if (info.getPropertiesCount() > 0) {
recoveredProperties = new HashMap<>();
for (int i = 0; i < info.getPropertiesCount(); i++) {
LongProperty property = info.getProperties(i);
//At this time, the correction of deduplication for chunk messages is not promised.
recoveredProperties.put(property.getName(), String.valueOf(property.getValue()));
}
} else if (info.getmarkDeletePropertiesCount() > 0) {
recoveredProperties = new HashMap<>();
for (int i = 0; i < info.getmarkDeletePropertiesCount(); i++) {
StringProperty property = info.getmarkDeleteProperties(i);
recoveredProperties.put(property.getName(), property.getValue());
}
}
```


# Alternatives

# General Notes

# Links

* Mailing List discussion thread:
* Mailing List voting thread:
Loading