Skip to content

Commit

Permalink
[managed-ledger] Compressed cursors: fix problem with little buffers (a…
Browse files Browse the repository at this point in the history
…pache#275)

(cherry picked from commit 6a2a010)
  • Loading branch information
eolivelli authored and dlg99 committed May 28, 2024
1 parent 9b6144b commit 5b70064
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import com.google.common.collect.Range;
import com.google.common.util.concurrent.RateLimiter;
import com.google.protobuf.InvalidProtocolBufferException;
import io.airlift.compress.MalformedInputException;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.CompositeByteBuf;
Expand Down Expand Up @@ -3435,25 +3436,33 @@ private ByteBuf compressDataIfNeeded(ByteBuf data, LedgerHandle lh) {
}
}

private static byte[] decompressDataIfNeeded(byte[] data, LedgerHandle lh) {
static byte[] decompressDataIfNeeded(byte[] data, LedgerHandle lh) {
byte[] pulsarCursorInfoCompression =
lh.getCustomMetadata().get(METADATA_PROPERTY_CURSOR_COMPRESSION_TYPE);
if (pulsarCursorInfoCompression != null) {
String pulsarCursorInfoCompressionString = new String(pulsarCursorInfoCompression);
CompressionCodec compressionCodec = CompressionCodecProvider.getCompressionCodec(
CompressionType.valueOf(pulsarCursorInfoCompressionString));
if (log.isDebugEnabled()) {
log.debug("Ledger {} compression {} decompressing {} bytes, full {}",
lh.getId(), pulsarCursorInfoCompressionString, data.length,
ByteBufUtil.prettyHexDump(Unpooled.wrappedBuffer(data)));
}
ByteArrayInputStream input = new ByteArrayInputStream(data);
DataInputStream dataInputStream = new DataInputStream(input);
try {
int uncompressedSize = dataInputStream.readInt();
byte[] compressedData = dataInputStream.readNBytes(uncompressedSize);
byte[] compressedData = dataInputStream.readAllBytes();
CompressionCodec compressionCodec = CompressionCodecProvider.getCompressionCodec(
CompressionType.valueOf(pulsarCursorInfoCompressionString));
ByteBuf decode = compressionCodec.decode(Unpooled.wrappedBuffer(compressedData), uncompressedSize);
try {
return ByteBufUtil.getBytes(decode);
} finally {
decode.release();
}
} catch (IOException error) {
} catch (IOException | MalformedInputException error) {
log.error("Cannot decompress cursor position using {}. Payload is {}",
pulsarCursorInfoCompressionString,
ByteBufUtil.prettyHexDump(Unpooled.wrappedBuffer(data)), error);
throw new RuntimeException(error);
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,11 @@ public static Object[][] useOpenRangeSet() {
return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } };
}

@Override
protected void setupManagedLedgerFactoryConfig(ManagedLedgerFactoryConfig config) {
super.setupManagedLedgerFactoryConfig(config);
config.setManagedCursorInfoCompressionType("LZ4");
}

@Test
public void testCloseCursor() throws Exception {
Expand Down Expand Up @@ -3268,7 +3273,9 @@ public void operationFailed(MetaStoreException e) {
try {
LedgerEntry entry = seq.nextElement();
PositionInfo positionInfo;
positionInfo = PositionInfo.parseFrom(entry.getEntry());
byte[] data = entry.getEntry();
data = ManagedCursorImpl.decompressDataIfNeeded(data, lh);
positionInfo = PositionInfo.parseFrom(data);
individualDeletedMessagesCount.set(positionInfo.getIndividualDeletedMessagesCount());
} catch (Exception e) {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ public MockedBookKeeperTestCase(int numBookies) {
this.numBookies = numBookies;
}

protected void setupManagedLedgerFactoryConfig(ManagedLedgerFactoryConfig config) {
// No-op
}

@BeforeMethod(alwaysRun = true)
public final void setUp(Method method) throws Exception {
LOG.info(">>>>>> starting {}", method);
Expand All @@ -85,7 +89,8 @@ public final void setUp(Method method) throws Exception {
ManagedLedgerFactoryConfig managedLedgerFactoryConfig = new ManagedLedgerFactoryConfig();
// increase default cache eviction interval so that caching could be tested with less flakyness
managedLedgerFactoryConfig.setCacheEvictionIntervalMs(200);
factory = new ManagedLedgerFactoryImpl(metadataStore, bkc);
setupManagedLedgerFactoryConfig(managedLedgerFactoryConfig);
factory = new ManagedLedgerFactoryImpl(metadataStore, bkc, managedLedgerFactoryConfig);

setUpTestCase();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ public void asyncCreateLedger(int ensSize, int writeQuorumSize, int ackQuorumSiz
long id = sequence.getAndIncrement();
log.info("Creating ledger {}", id);
PulsarMockLedgerHandle lh =
new PulsarMockLedgerHandle(PulsarMockBookKeeper.this, id, digestType, passwd);
new PulsarMockLedgerHandle(PulsarMockBookKeeper.this, id, digestType, passwd, properties);
ledgers.put(id, lh);
return FutureUtils.value(lh);
} catch (Throwable t) {
Expand All @@ -147,7 +147,7 @@ public LedgerHandle createLedger(int ensSize, int writeQuorumSize, int ackQuorum
try {
long id = sequence.getAndIncrement();
log.info("Creating ledger {}", id);
PulsarMockLedgerHandle lh = new PulsarMockLedgerHandle(this, id, digestType, passwd);
PulsarMockLedgerHandle lh = new PulsarMockLedgerHandle(this, id, digestType, passwd, null);
ledgers.put(id, lh);
return lh;
} catch (Throwable t) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Enumeration;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import lombok.Getter;
Expand Down Expand Up @@ -65,8 +67,10 @@ public class PulsarMockLedgerHandle extends LedgerHandle {
boolean fenced = false;

public PulsarMockLedgerHandle(PulsarMockBookKeeper bk, long id,
DigestType digest, byte[] passwd) throws GeneralSecurityException {
super(bk.getClientCtx(), id, new Versioned<>(createMetadata(id, digest, passwd), new LongVersion(0L)),
DigestType digest, byte[] passwd,
Map<String, byte[]> properties) throws GeneralSecurityException {
super(bk.getClientCtx(), id, new Versioned<>(createMetadata(id, digest, passwd, properties),
new LongVersion(0L)),
digest, passwd, WriteFlag.NONE);
this.bk = bk;
this.id = id;
Expand Down Expand Up @@ -267,13 +271,15 @@ public CompletableFuture<LastConfirmedAndEntry> readLastAddConfirmedAndEntryAsyn
return readHandle.readLastAddConfirmedAndEntryAsync(entryId, timeOutInMillis, parallel);
}

private static LedgerMetadata createMetadata(long id, DigestType digest, byte[] passwd) {
private static LedgerMetadata createMetadata(long id, DigestType digest, byte[] passwd,
Map<String, byte[]> properties) {
List<BookieId> ensemble = new ArrayList<>(PulsarMockBookKeeper.getMockEnsemble());
return LedgerMetadataBuilder.create()
.withDigestType(digest.toApiDigestType())
.withPassword(passwd)
.withId(id)
.newEnsembleEntry(0L, ensemble)
.withCustomMetadata(properties != null ? properties : Collections.emptyMap())
.build();
}

Expand Down

0 comments on commit 5b70064

Please sign in to comment.