Skip to content

Commit

Permalink
PositionInfo Util serialization fix and test (apache#272)
Browse files Browse the repository at this point in the history
(cherry picked from commit f1323c6)
  • Loading branch information
dlg99 committed Sep 23, 2024
1 parent 0240250 commit ed8df4d
Show file tree
Hide file tree
Showing 3 changed files with 131 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.concurrent.FastThreadLocal;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
Expand Down Expand Up @@ -133,13 +132,6 @@
@SuppressWarnings("checkstyle:javadoctype")
public class ManagedCursorImpl implements ManagedCursor {

private static final FastThreadLocal<LightMLDataFormats.PositionInfo> piThreadLocal = new FastThreadLocal<>() {
@Override
protected LightMLDataFormats.PositionInfo initialValue() {
return new LightMLDataFormats.PositionInfo();
}
};

private static final Comparator<Entry> ENTRY_COMPARATOR = (e1, e2) -> {
if (e1.getLedgerId() != e2.getLedgerId()) {
return e1.getLedgerId() < e2.getLedgerId() ? -1 : 1;
Expand Down Expand Up @@ -493,7 +485,7 @@ public boolean removeProperty(String key) {
if (lastMarkDeleteEntry != null) {
LAST_MARK_DELETE_ENTRY_UPDATER.updateAndGet(this, last -> {
Map<String, Long> properties = last.properties;
if (properties != null && properties.containsKey(key)) {
if (properties != null) {
properties.remove(key);
}
return last;
Expand Down Expand Up @@ -2070,7 +2062,7 @@ public void asyncMarkDelete(final Position position, Map<String, Long> propertie
}
callback.markDeleteFailed(
new ManagedLedgerException("Reset cursor in progress - unable to mark delete position "
+ position.toString()),
+ position),
ctx);
return;
}
Expand Down Expand Up @@ -3302,13 +3294,6 @@ private void buildBatchEntryDeletionIndexInfoList(
}
}

private static ByteBuf toByteBuf(LightMLDataFormats.PositionInfo pi) {
int size = pi.getSerializedSize();
ByteBuf buf = PulsarByteBufAllocator.DEFAULT.buffer(size, size);
pi.writeTo(buf);
return buf;
}

void persistPositionToLedger(final LedgerHandle lh, MarkDeleteEntry mdEntry, final VoidCallback callback) {
long now = System.nanoTime();
Position position = mdEntry.newPosition;
Expand Down Expand Up @@ -3547,7 +3532,7 @@ boolean shouldCloseLedger(LedgerHandle lh) {
long now = clock.millis();
if (ledger.getFactory().isMetadataServiceAvailable()
&& (lh.getLastAddConfirmed() >= getConfig().getMetadataMaxEntriesPerLedger()
|| lastLedgerSwitchTimestamp < (now - getConfig().getLedgerRolloverTimeout() * 1000))
|| lastLedgerSwitchTimestamp < (now - getConfig().getLedgerRolloverTimeout() * 1000L))
&& (STATE_UPDATER.get(this) != State.Closed && STATE_UPDATER.get(this) != State.Closing)) {
// It's safe to modify the timestamp since this method will be only called from a callback, implying that
// calls will be serialized on one single thread
Expand Down Expand Up @@ -3686,7 +3671,6 @@ private void asyncDeleteLedger(final LedgerHandle lh, int retry) {
ledger.getScheduledExecutor().schedule(() -> asyncDeleteLedger(lh, retry - 1),
DEFAULT_LEDGER_DELETE_BACKOFF_TIME_SEC, TimeUnit.SECONDS);
}
return;
} else {
log.info("[{}][{}] Successfully closed & deleted ledger {} in cursor", ledger.getName(), name,
lh.getId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,9 @@
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.util.collections.BitSetRecyclable;

final class PositionInfoUtils {

Expand All @@ -47,18 +42,18 @@ static ByteBuf serializePositionInfo(ManagedCursorImpl.MarkDeleteEntry mdEntry,
int size = Math.max(lastSerializedSize, 64 * 1024);
ByteBuf _b = PulsarByteBufAllocator.DEFAULT.buffer(size);

int _writeIdx = _b.writerIndex();
LightProtoCodec.writeVarInt(_b, PositionInfo._LEDGER_ID_TAG);
LightProtoCodec.writeVarInt64(_b, position.getLedgerId());
LightProtoCodec.writeVarInt(_b, PositionInfo._ENTRY_ID_TAG);
LightProtoCodec.writeVarInt64(_b, position.getEntryId());

MessageRange _item = new MessageRange();
NestedPositionInfo lower = _item.setLowerEndpoint();
NestedPositionInfo upper = _item.setUpperEndpoint();
rangeScanner.accept(new IndividuallyDeletedMessagesRangeConsumer() {
@Override
public void acceptRange(long lowerLegerId, long lowerEntryId, long upperLedgerId, long upperEntryId) {
_item.clear();
NestedPositionInfo lower = _item.setLowerEndpoint();
NestedPositionInfo upper = _item.setUpperEndpoint();
lower.setLedgerId(lowerLegerId);
lower.setEntryId(lowerEntryId);
upper.setLedgerId(upperLedgerId);
Expand All @@ -82,15 +77,14 @@ public void acceptRange(long lowerLegerId, long lowerEntryId, long upperLedgerId
}

final BatchedEntryDeletionIndexInfo batchDeletedIndexInfo = new BatchedEntryDeletionIndexInfo();
final NestedPositionInfo nestedPositionInfo = batchDeletedIndexInfo.setPosition();

batchDeletedIndexesScanner.accept(new BatchedEntryDeletionIndexInfoConsumer() {
@Override
public void acceptRange(long ledgerId, long entryId, long[] array) {
batchDeletedIndexInfo.clear();
final NestedPositionInfo nestedPositionInfo = batchDeletedIndexInfo.setPosition();
nestedPositionInfo.setLedgerId(ledgerId);
nestedPositionInfo.setEntryId(entryId);
List<Long> deleteSet = new ArrayList<>(array.length);
batchDeletedIndexInfo.clearDeleteSet();
for (long l : array) {
batchDeletedIndexInfo.addDeleteSet(l);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,23 @@
package org.apache.bookkeeper.mledger.impl;

import static org.testng.Assert.*;

import com.google.protobuf.InvalidProtocolBufferException;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import java.util.Map;
import java.util.List;

import org.apache.bookkeeper.mledger.proto.LightMLDataFormats;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.Test;

public class PositionInfoUtilsTest {
private static final Logger log = LoggerFactory.getLogger(PositionInfoUtilsTest.class);

@Test
public void testSerializeDeserialize() throws Exception {
PositionImpl position = new PositionImpl(1, 2);
Expand Down Expand Up @@ -88,4 +97,118 @@ public void testSerializeDeserializeEmpty() throws Exception {
assertEquals(0, positionInfoParsed.getBatchedEntryDeletionIndexInfoCount());
result.release();
}

@Test
public void testSerializeDeserialize2() throws Exception {
PositionImpl position = new PositionImpl(1, 2);
ManagedCursorImpl.MarkDeleteEntry entry = new ManagedCursorImpl.MarkDeleteEntry(position,
Map.of("foo", 1L), null, null);

final int numRanges = 10000;
ByteBuf result = PositionInfoUtils.serializePositionInfo(entry, position, (scanner) -> {
for (int i = 0; i < numRanges; i++) {
scanner.acceptRange(i*4 + 1, i*4 + 2, i*4 + 3, i*4 + 4);
}
}, (scanner) -> {
long[] array = {7L, 8L};
for (int i = 0; i < numRanges; i++) {
scanner.acceptRange(i*2 + 1, i*2 + 2, array);
}
}, 1024);

// deserialize PIUtils -> lightproto
final int idx = result.readerIndex();
LightMLDataFormats.PositionInfo lighPositionInfoParsed = new LightMLDataFormats.PositionInfo();
lighPositionInfoParsed.parseFrom(result, result.readableBytes());
result.readerIndex(idx);

validateLightproto(lighPositionInfoParsed, numRanges);

// serialize lightproto
int serializedSz = lighPositionInfoParsed.getSerializedSize();
ByteBuf lightResult = PulsarByteBufAllocator.DEFAULT.buffer(serializedSz);
lighPositionInfoParsed.writeTo(lightResult);

byte[] light = ByteBufUtil.getBytes(lightResult);
byte[] util = ByteBufUtil.getBytes(result);

assertEquals(light.length, util.length);

for (int i = 0; i < light.length; i++) {
if (light[i] != util[i]) {
log.error("Mismatch at index {} light={} util={}", i, light[i], util[i]);
}
}

assertEquals(light, util);

// deserialize lightproto -> protobuf
parseProtobufAndValidate(light, numRanges);

// deserialize PIUtils -> protobuf
parseProtobufAndValidate(util, numRanges);

result.release();
lightResult.release();
}

private static void validateLightproto(LightMLDataFormats.PositionInfo lighPositionInfoParsed, int numRanges) {
assertEquals(1, lighPositionInfoParsed.getLedgerId());
assertEquals(2, lighPositionInfoParsed.getEntryId());

assertEquals(1, lighPositionInfoParsed.getPropertiesCount());
assertEquals("foo", lighPositionInfoParsed.getPropertyAt(0).getName());
assertEquals(1, lighPositionInfoParsed.getPropertyAt(0).getValue());

assertEquals(numRanges, lighPositionInfoParsed.getIndividualDeletedMessagesCount());
int curr = 0;
for (int i = 0; i < numRanges; i++) {
assertEquals(i*4 + 1, lighPositionInfoParsed.getIndividualDeletedMessageAt(curr).getLowerEndpoint().getLedgerId());
assertEquals(i*4 + 2, lighPositionInfoParsed.getIndividualDeletedMessageAt(curr).getLowerEndpoint().getEntryId());
assertEquals(i*4 + 3, lighPositionInfoParsed.getIndividualDeletedMessageAt(curr).getUpperEndpoint().getLedgerId());
assertEquals(i*4 + 4, lighPositionInfoParsed.getIndividualDeletedMessageAt(curr).getUpperEndpoint().getEntryId());
curr++;
}

assertEquals(numRanges, lighPositionInfoParsed.getBatchedEntryDeletionIndexInfosCount());
curr = 0;
for (int i = 0; i < numRanges; i++) {
assertEquals(i*2 + 1, lighPositionInfoParsed.getBatchedEntryDeletionIndexInfoAt(curr).getPosition().getLedgerId());
assertEquals(i*2 + 2, lighPositionInfoParsed.getBatchedEntryDeletionIndexInfoAt(curr).getPosition().getEntryId());
assertEquals(7L, lighPositionInfoParsed.getBatchedEntryDeletionIndexInfoAt(curr).getDeleteSetAt(0));
assertEquals(8L, lighPositionInfoParsed.getBatchedEntryDeletionIndexInfoAt(curr).getDeleteSetAt(1));
curr++;
}
}

private static void parseProtobufAndValidate(byte[] data, int numRanges) throws InvalidProtocolBufferException {
MLDataFormats.PositionInfo positionInfoParsed = MLDataFormats.PositionInfo.parseFrom(data);

assertEquals(1, positionInfoParsed.getLedgerId());
assertEquals(2, positionInfoParsed.getEntryId());

assertEquals(1, positionInfoParsed.getPropertiesCount());
assertEquals("foo", positionInfoParsed.getProperties(0).getName());
assertEquals(1, positionInfoParsed.getProperties(0).getValue());

assertEquals(numRanges, positionInfoParsed.getIndividualDeletedMessagesCount());
int curr = 0;
for (int i = 0; i < numRanges; i++) {
assertEquals(i*4 + 1, positionInfoParsed.getIndividualDeletedMessages(curr).getLowerEndpoint().getLedgerId());
assertEquals(i*4 + 2, positionInfoParsed.getIndividualDeletedMessages(curr).getLowerEndpoint().getEntryId());
assertEquals(i*4 + 3, positionInfoParsed.getIndividualDeletedMessages(curr).getUpperEndpoint().getLedgerId());
assertEquals(i*4 + 4, positionInfoParsed.getIndividualDeletedMessages(curr).getUpperEndpoint().getEntryId());
curr++;
}

assertEquals(numRanges, positionInfoParsed.getBatchedEntryDeletionIndexInfoCount());
curr = 0;
for (int i = 0; i < numRanges; i++) {
assertEquals(i*2 + 1, positionInfoParsed.getBatchedEntryDeletionIndexInfo(curr).getPosition().getLedgerId());
assertEquals(i*2 + 2, positionInfoParsed.getBatchedEntryDeletionIndexInfo(curr).getPosition().getEntryId());
assertEquals(List.of(7L, 8L), positionInfoParsed.getBatchedEntryDeletionIndexInfo(curr).getDeleteSetList());
curr++;
}
}

}

0 comments on commit ed8df4d

Please sign in to comment.