Skip to content

Commit

Permalink
Switch ManagedLedger to use MetadataStore interface
Browse files Browse the repository at this point in the history
  • Loading branch information
merlimat committed Jan 4, 2020
1 parent f95c71f commit a0bc3d6
Show file tree
Hide file tree
Showing 15 changed files with 342 additions and 517 deletions.
6 changes: 6 additions & 0 deletions managed-ledger/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@
<artifactId>pulsar-common</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-metadata</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>com.google.guava</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,21 +39,33 @@ public static ManagedLedgerException getManagedLedgerException(Throwable e) {
}

public static class MetaStoreException extends ManagedLedgerException {
public MetaStoreException(Exception e) {
super(e);
public MetaStoreException(Throwable t) {
super(t);
}

public MetaStoreException(String msg) {
super(msg);
}
}

public static class BadVersionException extends MetaStoreException {
public BadVersionException(Exception e) {
super(e);
}

public BadVersionException(String msg) {
super(msg);
}
}

public static class MetadataNotFoundException extends MetaStoreException {
public MetadataNotFoundException(Exception e) {
super(e);
}

public MetadataNotFoundException(String msg) {
super(msg);
}
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
@SuppressWarnings("checkstyle:javadoctype")
public class ManagedLedgerInfo {
/** Z-Node version. */
public int version;
public long version;
public String creationDate;
public String modificationDate;

Expand All @@ -42,7 +42,7 @@ public static class LedgerInfo {

public static class CursorInfo {
/** Z-Node version. */
public int version;
public long version;
public String creationDate;
public String modificationDate;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.PositionBound;
import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback;
import org.apache.bookkeeper.mledger.impl.MetaStore.Stat;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.LongProperty;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
Expand All @@ -93,6 +92,7 @@
import org.apache.pulsar.common.util.collections.ConcurrentOpenLongPairRangeSet;
import org.apache.pulsar.common.util.collections.LongPairRangeSet;
import org.apache.pulsar.common.util.collections.LongPairRangeSet.LongPairConsumer;
import org.apache.pulsar.metadata.api.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,15 +68,15 @@
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.ManagedLedgerInitializeLedgerCallback;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.State;
import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback;
import org.apache.bookkeeper.mledger.impl.MetaStore.Stat;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.LongProperty;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.MessageRange;
import org.apache.bookkeeper.mledger.util.Futures;
import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
import org.apache.pulsar.common.util.DateFormatter;
import org.apache.zookeeper.KeeperException;
import org.apache.pulsar.metadata.api.Stat;
import org.apache.pulsar.metadata.impl.zookeeper.ZKMetadataStore;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -160,7 +160,7 @@ private ManagedLedgerFactoryImpl(BookkeeperFactoryForCustomEnsemblePlacementPoli
this.bookkeeperFactory = bookKeeperGroupFactory;
this.isBookkeeperManaged = isBookkeeperManaged;
this.zookeeper = isBookkeeperManaged ? zooKeeper : null;
this.store = new MetaStoreImplZookeeper(zooKeeper, orderedExecutor);
this.store = new MetaStoreImpl(new ZKMetadataStore(zooKeeper), orderedExecutor);
this.config = config;
this.mbean = new ManagedLedgerFactoryMBeanImpl(this);
this.entryCacheManager = new EntryCacheManager(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.VoidCallback;
import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback;
import org.apache.bookkeeper.mledger.impl.MetaStore.Stat;
import org.apache.bookkeeper.mledger.offload.OffloadUtils;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
Expand All @@ -117,6 +116,7 @@
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
import org.apache.pulsar.metadata.api.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.metadata.api.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -146,7 +147,7 @@ private void readLedgerMeta(final ManagedLedgerFactoryImpl factory, final TopicN
store.getManagedLedgerInfo(managedLedgerName, false /* createIfMissing */,
new MetaStore.MetaStoreCallback<MLDataFormats.ManagedLedgerInfo>() {
@Override
public void operationComplete(MLDataFormats.ManagedLedgerInfo mlInfo, MetaStore.Stat version) {
public void operationComplete(MLDataFormats.ManagedLedgerInfo mlInfo, Stat stat) {
for (MLDataFormats.ManagedLedgerInfo.LedgerInfo ls : mlInfo.getLedgerInfoList()) {
ledgers.put(ls.getLedgerId(), ls);
}
Expand Down Expand Up @@ -230,7 +231,7 @@ private void calculateCursorBacklogs(final ManagedLedgerFactoryImpl factory, fin

store.getCursors(managedLedgerName, new MetaStore.MetaStoreCallback<List<String>>() {
@Override
public void operationComplete(List<String> cursors, MetaStore.Stat v) {
public void operationComplete(List<String> cursors, Stat v) {
// Load existing cursors
if (log.isDebugEnabled()) {
log.debug("[{}] Found {} cursors", managedLedgerName, cursors.size());
Expand Down Expand Up @@ -336,7 +337,7 @@ public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> seq,
new MetaStore.MetaStoreCallback<MLDataFormats.ManagedCursorInfo>() {
@Override
public void operationComplete(MLDataFormats.ManagedCursorInfo info,
MetaStore.Stat version) {
Stat stat) {
long cursorLedgerId = info.getCursorsLedgerId();
if (log.isDebugEnabled()) {
log.debug("[{}] Cursor {} meta-data read ledger id {}", managedLedgerName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,13 @@
import org.apache.bookkeeper.mledger.ManagedLedgerException.MetaStoreException;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo;
import org.apache.pulsar.metadata.api.Stat;

/**
* Interface that describes the operations that the ManagedLedger need to do on the metadata store.
*/
public interface MetaStore {

@SuppressWarnings("checkstyle:javadoctype")
interface Stat {
int getVersion();
long getCreationTimestamp();
long getModificationTimestamp();
}

@SuppressWarnings("checkstyle:javadoctype")
interface UpdateLedgersIdsCallback {
void updateLedgersIdsComplete(MetaStoreException status, Stat stat);
Expand Down Expand Up @@ -64,12 +58,12 @@ interface MetaStoreCallback<T> {
* the name of the ManagedLedger
* @param mlInfo
* managed ledger info
* @param version
* @param stat
* version object associated with current state
* @param callback
* callback object
*/
void asyncUpdateLedgerIds(String ledgerName, ManagedLedgerInfo mlInfo, Stat version,
void asyncUpdateLedgerIds(String ledgerName, ManagedLedgerInfo mlInfo, Stat stat,
MetaStoreCallback<Void> callback);

/**
Expand Down Expand Up @@ -98,12 +92,12 @@ void asyncUpdateLedgerIds(String ledgerName, ManagedLedgerInfo mlInfo, Stat vers
* the name of the ManagedLedger
* @param cursorName
* @param info
* @param version
* @param stat
* @param callback
* the callback
* @throws MetaStoreException
*/
void asyncUpdateCursorInfo(String ledgerName, String cursorName, ManagedCursorInfo info, Stat version,
void asyncUpdateCursorInfo(String ledgerName, String cursorName, ManagedCursorInfo info, Stat stat,
MetaStoreCallback<Void> callback);

/**
Expand Down
Loading

0 comments on commit a0bc3d6

Please sign in to comment.