Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PIP-75: Perform serialization/deserialization with LightProto #9046

Merged
merged 20 commits into from
Jan 6, 2021
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenCursorCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.TerminateCallback;
import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;

/**
* A ManagedLedger it's a superset of a BookKeeper ledger concept.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -586,7 +586,7 @@ public ManagedLedgerConfig setAddEntryTimeoutSeconds(long addEntryTimeoutSeconds
/**
* Managed-ledger can setup different custom EnsemblePlacementPolicy (eg: affinity to write ledgers to only setup of
* group of bookies).
*
*
* @return
*/
public Class<? extends EnsemblePlacementPolicy> getBookKeeperEnsemblePlacementPolicyClassName() {
Expand All @@ -595,7 +595,7 @@ public Class<? extends EnsemblePlacementPolicy> getBookKeeperEnsemblePlacementPo

/**
* Returns EnsemblePlacementPolicy configured for the Managed-ledger.
*
*
* @param bookKeeperEnsemblePlacementPolicyClassName
*/
public void setBookKeeperEnsemblePlacementPolicyClassName(
Expand All @@ -605,7 +605,7 @@ public void setBookKeeperEnsemblePlacementPolicyClassName(

/**
* Returns properties required by configured bookKeeperEnsemblePlacementPolicy.
*
*
* @return
*/
public Map<String, Object> getBookKeeperEnsemblePlacementPolicyProperties() {
Expand All @@ -615,7 +615,7 @@ public Map<String, Object> getBookKeeperEnsemblePlacementPolicyProperties() {
/**
* Managed-ledger can setup different custom EnsemblePlacementPolicy which needs
* bookKeeperEnsemblePlacementPolicy-Properties.
*
*
* @param bookKeeperEnsemblePlacementPolicyProperties
*/
public void setBookKeeperEnsemblePlacementPolicyProperties(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@
import org.apache.bookkeeper.mledger.util.CallbackMutex;
import org.apache.bookkeeper.mledger.util.Futures;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
import org.apache.pulsar.metadata.api.Stat;
import org.slf4j.Logger;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@
import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NonDurableCursorImpl extends ManagedCursorImpl {

NonDurableCursorImpl(BookKeeper bookkeeper, ManagedLedgerConfig config, ManagedLedgerImpl ledger, String cursorName,
PositionImpl startCursorPosition, PulsarApi.CommandSubscribe.InitialPosition initialPosition) {
PositionImpl startCursorPosition, CommandSubscribe.InitialPosition initialPosition) {
super(bookkeeper, config, ledger, cursorName);

// Compare with "latest" position marker by using only the ledger id. Since the C++ client is using 48bits to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.pulsar.common.api.proto.PulsarApi.IntRange;
import org.testng.annotations.Test;

public class ManagedCursorContainerTest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
import org.testng.annotations.Test;

public class ManagedCursorPropertiesTest extends MockedBookKeeperTestCase {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@
import org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.apache.pulsar.metadata.api.Stat;
import org.apache.pulsar.common.api.proto.PulsarApi.IntRange;
import org.apache.pulsar.common.api.proto.IntRange;
import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.MockZooKeeper;
import org.mockito.invocation.InvocationOnMock;
Expand Down Expand Up @@ -3070,44 +3070,44 @@ public void testBatchIndexDelete() throws ManagedLedgerException, InterruptedExc
positions[i] = ledger.addEntry(("entry-" + i).getBytes(Encoding));
}
assertEquals(cursor.getNumberOfEntries(), totalEntries);
deleteBatchIndex(cursor, positions[0], 10, Lists.newArrayList(IntRange.newBuilder().setStart(2).setEnd(4).build()));
deleteBatchIndex(cursor, positions[0], 10, Lists.newArrayList(new IntRange().setStart(2).setEnd(4)));
List<IntRange> deletedIndexes = getAckedIndexRange(cursor.getDeletedBatchIndexesAsLongArray((PositionImpl) positions[0]), 10);
Assert.assertEquals(1, deletedIndexes.size());
Assert.assertEquals(2, deletedIndexes.get(0).getStart());
Assert.assertEquals(4, deletedIndexes.get(0).getEnd());

deleteBatchIndex(cursor, positions[0], 10, Lists.newArrayList(IntRange.newBuilder().setStart(3).setEnd(8).build()));
deleteBatchIndex(cursor, positions[0], 10, Lists.newArrayList(new IntRange().setStart(3).setEnd(8)));
deletedIndexes = getAckedIndexRange(cursor.getDeletedBatchIndexesAsLongArray((PositionImpl) positions[0]), 10);
Assert.assertEquals(1, deletedIndexes.size());
Assert.assertEquals(2, deletedIndexes.get(0).getStart());
Assert.assertEquals(8, deletedIndexes.get(0).getEnd());

deleteBatchIndex(cursor, positions[0], 10, Lists.newArrayList(IntRange.newBuilder().setStart(0).setEnd(0).build()));
deleteBatchIndex(cursor, positions[0], 10, Lists.newArrayList(new IntRange().setStart(0).setEnd(0)));
deletedIndexes = getAckedIndexRange(cursor.getDeletedBatchIndexesAsLongArray((PositionImpl) positions[0]), 10);
Assert.assertEquals(2, deletedIndexes.size());
Assert.assertEquals(0, deletedIndexes.get(0).getStart());
Assert.assertEquals(0, deletedIndexes.get(0).getEnd());
Assert.assertEquals(2, deletedIndexes.get(1).getStart());
Assert.assertEquals(8, deletedIndexes.get(1).getEnd());

deleteBatchIndex(cursor, positions[0], 10, Lists.newArrayList(IntRange.newBuilder().setStart(1).setEnd(1).build()));
deleteBatchIndex(cursor, positions[0], 10, Lists.newArrayList(IntRange.newBuilder().setStart(9).setEnd(9).build()));
deleteBatchIndex(cursor, positions[0], 10, Lists.newArrayList(new IntRange().setStart(1).setEnd(1)));
deleteBatchIndex(cursor, positions[0], 10, Lists.newArrayList(new IntRange().setStart(9).setEnd(9)));
deletedIndexes = getAckedIndexRange(cursor.getDeletedBatchIndexesAsLongArray((PositionImpl) positions[0]), 10);
Assert.assertNull(deletedIndexes);
Assert.assertEquals(positions[0], cursor.getMarkDeletedPosition());

deleteBatchIndex(cursor, positions[1], 10, Lists.newArrayList(IntRange.newBuilder().setStart(0).setEnd(5).build()));
deleteBatchIndex(cursor, positions[1], 10, Lists.newArrayList(new IntRange().setStart(0).setEnd(5)));
cursor.delete(positions[1]);
deleteBatchIndex(cursor, positions[1], 10, Lists.newArrayList(IntRange.newBuilder().setStart(6).setEnd(8).build()));
deleteBatchIndex(cursor, positions[1], 10, Lists.newArrayList(new IntRange().setStart(6).setEnd(8)));
deletedIndexes = getAckedIndexRange(cursor.getDeletedBatchIndexesAsLongArray((PositionImpl) positions[1]), 10);
Assert.assertNull(deletedIndexes);

deleteBatchIndex(cursor, positions[2], 10, Lists.newArrayList(IntRange.newBuilder().setStart(0).setEnd(5).build()));
deleteBatchIndex(cursor, positions[2], 10, Lists.newArrayList(new IntRange().setStart(0).setEnd(5)));
cursor.markDelete(positions[3]);
deletedIndexes = getAckedIndexRange(cursor.getDeletedBatchIndexesAsLongArray((PositionImpl) positions[2]), 10);
Assert.assertNull(deletedIndexes);

deleteBatchIndex(cursor, positions[3], 10, Lists.newArrayList(IntRange.newBuilder().setStart(0).setEnd(5).build()));
deleteBatchIndex(cursor, positions[3], 10, Lists.newArrayList(new IntRange().setStart(0).setEnd(5)));
cursor.resetCursor(positions[0]);
deletedIndexes = getAckedIndexRange(cursor.getDeletedBatchIndexesAsLongArray((PositionImpl) positions[3]), 10);
Assert.assertNull(deletedIndexes);
Expand All @@ -3125,12 +3125,12 @@ public void testBatchIndexesDeletionPersistAndRecover() throws ManagedLedgerExce
positions[i] = ledger.addEntry(("entry-" + i).getBytes(Encoding));
}
assertEquals(cursor.getNumberOfEntries(), totalEntries);
deleteBatchIndex(cursor, positions[5], 10, Lists.newArrayList(IntRange.newBuilder().setStart(3).setEnd(6).build()));
deleteBatchIndex(cursor, positions[0], 10, Lists.newArrayList(IntRange.newBuilder().setStart(0).setEnd(9).build()));
deleteBatchIndex(cursor, positions[1], 10, Lists.newArrayList(IntRange.newBuilder().setStart(0).setEnd(9).build()));
deleteBatchIndex(cursor, positions[2], 10, Lists.newArrayList(IntRange.newBuilder().setStart(0).setEnd(9).build()));
deleteBatchIndex(cursor, positions[3], 10, Lists.newArrayList(IntRange.newBuilder().setStart(0).setEnd(9).build()));
deleteBatchIndex(cursor, positions[4], 10, Lists.newArrayList(IntRange.newBuilder().setStart(0).setEnd(9).build()));
deleteBatchIndex(cursor, positions[5], 10, Lists.newArrayList(new IntRange().setStart(3).setEnd(6)));
deleteBatchIndex(cursor, positions[0], 10, Lists.newArrayList(new IntRange().setStart(0).setEnd(9)));
deleteBatchIndex(cursor, positions[1], 10, Lists.newArrayList(new IntRange().setStart(0).setEnd(9)));
deleteBatchIndex(cursor, positions[2], 10, Lists.newArrayList(new IntRange().setStart(0).setEnd(9)));
deleteBatchIndex(cursor, positions[3], 10, Lists.newArrayList(new IntRange().setStart(0).setEnd(9)));
deleteBatchIndex(cursor, positions[4], 10, Lists.newArrayList(new IntRange().setStart(0).setEnd(9)));

ledger = factory.open("test_batch_indexes_deletion_persistent");
cursor = ledger.openCursor("c1");
Expand All @@ -3139,7 +3139,7 @@ public void testBatchIndexesDeletionPersistAndRecover() throws ManagedLedgerExce
Assert.assertEquals(deletedIndexes.get(0).getStart(), 3);
Assert.assertEquals(deletedIndexes.get(0).getEnd(), 6);
Assert.assertEquals(cursor.getMarkDeletedPosition(), positions[4]);
deleteBatchIndex(cursor, positions[5], 10, Lists.newArrayList(IntRange.newBuilder().setStart(0).setEnd(9).build()));
deleteBatchIndex(cursor, positions[5], 10, Lists.newArrayList(new IntRange().setStart(0).setEnd(9)));
deletedIndexes = getAckedIndexRange(cursor.getDeletedBatchIndexesAsLongArray((PositionImpl) positions[5]), 10);
Assert.assertNull(deletedIndexes);
Assert.assertEquals(cursor.getMarkDeletedPosition(), positions[5]);
Expand Down Expand Up @@ -3179,13 +3179,12 @@ private List<IntRange> getAckedIndexRange(long[] bitSetLongArray, int batchSize)
List<IntRange> result = new ArrayList<>();
BitSet bitSet = BitSet.valueOf(bitSetLongArray);
int nextClearBit = bitSet.nextClearBit(0);
IntRange.Builder builder = IntRange.newBuilder();
while (nextClearBit != -1 && nextClearBit <= batchSize) {
int nextSetBit = bitSet.nextSetBit(nextClearBit);
if (nextSetBit == -1) {
break;
}
result.add(builder.setStart(nextClearBit).setEnd(nextSetBit - 1).build());
result.add(new IntRange().setStart(nextClearBit).setEnd(nextSetBit - 1));
nextClearBit = bitSet.nextClearBit(nextSetBit);
}
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
import org.apache.pulsar.metadata.api.Stat;
import org.apache.pulsar.metadata.impl.zookeeper.ZKMetadataStore;
import org.apache.zookeeper.CreateMode;
Expand Down
15 changes: 1 addition & 14 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,6 @@ flexible messaging model and an intuitive client API.</description>
<testRetryCount>1</testRetryCount>
<docker.organization>apachepulsar</docker.organization>

<!-- pin the protobuf-shaded version to make the pulsar build friendly to intellij -->
<pulsar.protobuf.shaded.version>2.1.0-incubating</pulsar.protobuf.shaded.version>

<!-- apache commons -->
<commons-compress.version>1.19</commons-compress.version>

Expand All @@ -121,7 +118,6 @@ flexible messaging model and an intuitive client API.</description>
<puppycrawl.checkstyle.version>8.37</puppycrawl.checkstyle.version>
<dockerfile-maven.version>1.4.13</dockerfile-maven.version>
<typetools.version>0.5.0</typetools.version>
<protobuf2.version>2.4.1</protobuf2.version>
<protobuf3.version>3.11.4</protobuf3.version>
<protoc3.version>${protobuf3.version}</protoc3.version>
<grpc.version>1.18.0</grpc.version>
Expand Down Expand Up @@ -226,7 +222,7 @@ flexible messaging model and an intuitive client API.</description>
<errorprone.version>2.4.0</errorprone.version>
<errorprone.javac.version>9+181-r4173-1</errorprone.javac.version>
<errorprone-slf4j.version>0.1.4</errorprone-slf4j.version>

<lightproto-maven-plugin.version>0.2</lightproto-maven-plugin.version>

<!-- Used to configure rename.netty.native. Libs -->
<rename.netty.native.libs>rename-netty-native-libs.sh</rename.netty.native.libs>
Expand Down Expand Up @@ -1246,10 +1242,6 @@ flexible messaging model and an intuitive client API.</description>
<!-- These files are generated automatically by the Protobuf compiler
and are included in source tree for convenience -->
<exclude>src/main/java/org/apache/bookkeeper/mledger/proto/MLDataFormats.java</exclude>
<exclude>src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java</exclude>
<exclude>src/main/java/org/apache/pulsar/transaction/coordinator/proto/PulsarTransactionMetadata.java</exclude>
<exclude>src/test/java/org/apache/pulsar/common/api/proto/TestApi.java</exclude>
<exclude>src/main/java/org/apache/pulsar/common/api/proto/PulsarMarkers.java</exclude>
<exclude>src/main/java/org/apache/pulsar/broker/service/schema/proto/SchemaRegistryFormat.java</exclude>
<exclude>bin/proto/MLDataFormats_pb2.py</exclude>

Expand All @@ -1267,10 +1259,6 @@ flexible messaging model and an intuitive client API.</description>
<exclude>src/main/java/org/apache/pulsar/io/kinesis/fbs/KeyValue.java</exclude>
<exclude>src/main/java/org/apache/pulsar/io/kinesis/fbs/Message.java</exclude>

<!-- These files are BSD licensed files -->
<exclude>src/main/java/org/apache/pulsar/common/util/protobuf/ByteBufCodedInputStream.java</exclude>
<exclude>src/main/java/org/apache/pulsar/common/util/protobuf/ByteBufCodedOutputStream.java</exclude>

<!-- Imported from Netty - Apache License v2 -->
<exclude>src/main/java/org/apache/bookkeeper/mledger/util/AbstractCASReferenceCounted.java</exclude>

Expand Down Expand Up @@ -1871,4 +1859,3 @@ flexible messaging model and an intuitive client API.</description>
</repositories>

</project>

76 changes: 0 additions & 76 deletions protobuf-shaded/pom.xml

This file was deleted.

Loading