diff --git a/managed-ledger/pom.xml b/managed-ledger/pom.xml
index 06d057263317b..b883c8661d9c4 100644
--- a/managed-ledger/pom.xml
+++ b/managed-ledger/pom.xml
@@ -60,6 +60,12 @@
pulsar-common
${project.version}
+
+
+ org.apache.pulsar
+ pulsar-metadata
+ ${project.version}
+
com.google.guava
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java
index 698acfd33e74d..6e6e6a2b848c7 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java
@@ -39,8 +39,12 @@ public static ManagedLedgerException getManagedLedgerException(Throwable e) {
}
public static class MetaStoreException extends ManagedLedgerException {
- public MetaStoreException(Exception e) {
- super(e);
+ public MetaStoreException(Throwable t) {
+ super(t);
+ }
+
+ public MetaStoreException(String msg) {
+ super(msg);
}
}
@@ -48,12 +52,20 @@ public static class BadVersionException extends MetaStoreException {
public BadVersionException(Exception e) {
super(e);
}
+
+ public BadVersionException(String msg) {
+ super(msg);
+ }
}
public static class MetadataNotFoundException extends MetaStoreException {
public MetadataNotFoundException(Exception e) {
super(e);
}
+
+ public MetadataNotFoundException(String msg) {
+ super(msg);
+ }
}
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerInfo.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerInfo.java
index ff3f6b505a57a..c03da89921e7c 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerInfo.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerInfo.java
@@ -24,7 +24,7 @@
@SuppressWarnings("checkstyle:javadoctype")
public class ManagedLedgerInfo {
/** Z-Node version. */
- public int version;
+ public long version;
public String creationDate;
public String modificationDate;
@@ -42,7 +42,7 @@ public static class LedgerInfo {
public static class CursorInfo {
/** Z-Node version. */
- public int version;
+ public long version;
public String creationDate;
public String modificationDate;
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index d76553877fbef..681d35534186e 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -83,7 +83,6 @@
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.PositionBound;
import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback;
-import org.apache.bookkeeper.mledger.impl.MetaStore.Stat;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.LongProperty;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
@@ -93,6 +92,7 @@
import org.apache.pulsar.common.util.collections.ConcurrentOpenLongPairRangeSet;
import org.apache.pulsar.common.util.collections.LongPairRangeSet;
import org.apache.pulsar.common.util.collections.LongPairRangeSet.LongPairConsumer;
+import org.apache.pulsar.metadata.api.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
index f8abcebafda4f..85164642f1b1e 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
@@ -68,7 +68,6 @@
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.ManagedLedgerInitializeLedgerCallback;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.State;
import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback;
-import org.apache.bookkeeper.mledger.impl.MetaStore.Stat;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.LongProperty;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
@@ -76,7 +75,8 @@
import org.apache.bookkeeper.mledger.util.Futures;
import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
import org.apache.pulsar.common.util.DateFormatter;
-import org.apache.zookeeper.KeeperException;
+import org.apache.pulsar.metadata.api.Stat;
+import org.apache.pulsar.metadata.impl.zookeeper.ZKMetadataStore;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -160,7 +160,7 @@ private ManagedLedgerFactoryImpl(BookkeeperFactoryForCustomEnsemblePlacementPoli
this.bookkeeperFactory = bookKeeperGroupFactory;
this.isBookkeeperManaged = isBookkeeperManaged;
this.zookeeper = isBookkeeperManaged ? zooKeeper : null;
- this.store = new MetaStoreImplZookeeper(zooKeeper, orderedExecutor);
+ this.store = new MetaStoreImpl(new ZKMetadataStore(zooKeeper), orderedExecutor);
this.config = config;
this.mbean = new ManagedLedgerFactoryMBeanImpl(this);
this.entryCacheManager = new EntryCacheManager(this);
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 3a803864f5ecd..3b0495e6deb05 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -106,7 +106,6 @@
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.VoidCallback;
import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback;
-import org.apache.bookkeeper.mledger.impl.MetaStore.Stat;
import org.apache.bookkeeper.mledger.offload.OffloadUtils;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
@@ -117,6 +116,7 @@
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
+import org.apache.pulsar.metadata.api.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerOfflineBacklog.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerOfflineBacklog.java
index 382f457688b4a..d8b49ad959b62 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerOfflineBacklog.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerOfflineBacklog.java
@@ -41,6 +41,7 @@
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
+import org.apache.pulsar.metadata.api.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -146,7 +147,7 @@ private void readLedgerMeta(final ManagedLedgerFactoryImpl factory, final TopicN
store.getManagedLedgerInfo(managedLedgerName, false /* createIfMissing */,
new MetaStore.MetaStoreCallback() {
@Override
- public void operationComplete(MLDataFormats.ManagedLedgerInfo mlInfo, MetaStore.Stat version) {
+ public void operationComplete(MLDataFormats.ManagedLedgerInfo mlInfo, Stat stat) {
for (MLDataFormats.ManagedLedgerInfo.LedgerInfo ls : mlInfo.getLedgerInfoList()) {
ledgers.put(ls.getLedgerId(), ls);
}
@@ -230,7 +231,7 @@ private void calculateCursorBacklogs(final ManagedLedgerFactoryImpl factory, fin
store.getCursors(managedLedgerName, new MetaStore.MetaStoreCallback>() {
@Override
- public void operationComplete(List cursors, MetaStore.Stat v) {
+ public void operationComplete(List cursors, Stat v) {
// Load existing cursors
if (log.isDebugEnabled()) {
log.debug("[{}] Found {} cursors", managedLedgerName, cursors.size());
@@ -336,7 +337,7 @@ public void readComplete(int rc, LedgerHandle lh, Enumeration seq,
new MetaStore.MetaStoreCallback() {
@Override
public void operationComplete(MLDataFormats.ManagedCursorInfo info,
- MetaStore.Stat version) {
+ Stat stat) {
long cursorLedgerId = info.getCursorsLedgerId();
if (log.isDebugEnabled()) {
log.debug("[{}] Cursor {} meta-data read ledger id {}", managedLedgerName,
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStore.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStore.java
index 72ef566a9cc7d..8b99203c91a57 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStore.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStore.java
@@ -22,19 +22,13 @@
import org.apache.bookkeeper.mledger.ManagedLedgerException.MetaStoreException;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo;
+import org.apache.pulsar.metadata.api.Stat;
/**
* Interface that describes the operations that the ManagedLedger need to do on the metadata store.
*/
public interface MetaStore {
- @SuppressWarnings("checkstyle:javadoctype")
- interface Stat {
- int getVersion();
- long getCreationTimestamp();
- long getModificationTimestamp();
- }
-
@SuppressWarnings("checkstyle:javadoctype")
interface UpdateLedgersIdsCallback {
void updateLedgersIdsComplete(MetaStoreException status, Stat stat);
@@ -64,12 +58,12 @@ interface MetaStoreCallback {
* the name of the ManagedLedger
* @param mlInfo
* managed ledger info
- * @param version
+ * @param stat
* version object associated with current state
* @param callback
* callback object
*/
- void asyncUpdateLedgerIds(String ledgerName, ManagedLedgerInfo mlInfo, Stat version,
+ void asyncUpdateLedgerIds(String ledgerName, ManagedLedgerInfo mlInfo, Stat stat,
MetaStoreCallback callback);
/**
@@ -98,12 +92,12 @@ void asyncUpdateLedgerIds(String ledgerName, ManagedLedgerInfo mlInfo, Stat vers
* the name of the ManagedLedger
* @param cursorName
* @param info
- * @param version
+ * @param stat
* @param callback
* the callback
* @throws MetaStoreException
*/
- void asyncUpdateCursorInfo(String ledgerName, String cursorName, ManagedCursorInfo info, Stat version,
+ void asyncUpdateCursorInfo(String ledgerName, String cursorName, ManagedCursorInfo info, Stat stat,
MetaStoreCallback callback);
/**
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java
new file mode 100644
index 0000000000000..8f229fdb478e9
--- /dev/null
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java
@@ -0,0 +1,286 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.impl;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletionException;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.bookkeeper.common.util.OrderedExecutor;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.ManagedLedgerException.MetaStoreException;
+import org.apache.bookkeeper.mledger.ManagedLedgerException.MetadataNotFoundException;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo;
+import org.apache.bookkeeper.util.SafeRunnable;
+import org.apache.pulsar.metadata.api.MetadataStore;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
+import org.apache.pulsar.metadata.api.Stat;
+
+@Slf4j
+public class MetaStoreImpl implements MetaStore {
+
+ private static final String BASE_NODE = "/managed-ledgers";
+ private static final String PREFIX = BASE_NODE + "/";
+
+ private final MetadataStore store;
+ private final OrderedExecutor executor;
+
+ public MetaStoreImpl(MetadataStore store, OrderedExecutor executor) {
+ this.store = store;
+ this.executor = executor;
+ }
+
+ @Override
+ public void getManagedLedgerInfo(String ledgerName, boolean createIfMissing,
+ MetaStoreCallback callback) {
+ // Try to get the content or create an empty node
+ String path = PREFIX + ledgerName;
+ store.get(path)
+ .thenAcceptAsync(optResult -> {
+ if (optResult.isPresent()) {
+ ManagedLedgerInfo info;
+ try {
+ info = ManagedLedgerInfo.parseFrom(optResult.get().getValue());
+ info = updateMLInfoTimestamp(info);
+ callback.operationComplete(info, optResult.get().getStat());
+ } catch (InvalidProtocolBufferException e) {
+ callback.operationFailed(getException(e));
+ }
+ } else {
+ // Z-node doesn't exist
+ if (createIfMissing) {
+ log.info("Creating '{}'", path);
+
+ store.put(path, new byte[0], Optional.of(-1L))
+ .thenAccept(stat -> {
+ ManagedLedgerInfo info = ManagedLedgerInfo.getDefaultInstance();
+ callback.operationComplete(info, stat);
+ }).exceptionally(ex -> {
+ callback.operationFailed(getException(ex));
+ return null;
+ });
+ } else {
+ // Tried to open a managed ledger but it doesn't exist and we shouldn't creating it at this
+ // point
+ callback.operationFailed(new MetadataNotFoundException("Managed ledger not found"));
+ }
+ }
+ }, executor.chooseThread(ledgerName))
+ .exceptionally(ex -> {
+ executor.executeOrdered(ledgerName, SafeRunnable.safeRun(() -> {
+ callback.operationFailed(getException(ex));
+ }));
+ return null;
+ });
+ }
+
+ @Override
+ public void asyncUpdateLedgerIds(String ledgerName, ManagedLedgerInfo mlInfo, Stat stat,
+ MetaStoreCallback callback) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Updating metadata version={} with content={}", ledgerName, stat, mlInfo);
+ }
+
+ byte[] serializedMlInfo = mlInfo.toByteArray(); // Binary format
+ String path = PREFIX + ledgerName;
+ store.put(path, serializedMlInfo, Optional.of(stat.getVersion()))
+ .thenAcceptAsync(newVersion -> {
+ callback.operationComplete(null, newVersion);
+ }, executor.chooseThread(ledgerName))
+ .exceptionally(ex -> {
+ executor.executeOrdered(ledgerName, SafeRunnable.safeRun(() -> {
+ callback.operationFailed(getException(ex));
+ }));
+ return null;
+ });
+ }
+
+ @Override
+ public void getCursors(String ledgerName, MetaStoreCallback> callback) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Get cursors list", ledgerName);
+ }
+
+ String path = PREFIX + ledgerName;
+ store.getChildren(path)
+ .thenAcceptAsync(cursors -> {
+ callback.operationComplete(cursors, null);
+ }, executor.chooseThread(ledgerName))
+ .exceptionally(ex -> {
+ executor.executeOrdered(ledgerName, SafeRunnable.safeRun(() -> {
+ callback.operationFailed(getException(ex));
+ }));
+ return null;
+ });
+ }
+
+ @Override
+ public void asyncGetCursorInfo(String ledgerName, String cursorName,
+ MetaStoreCallback callback) {
+ String path = PREFIX + ledgerName + "/" + cursorName;
+ if (log.isDebugEnabled()) {
+ log.debug("Reading from {}", path);
+ }
+
+ store.get(path)
+ .thenAcceptAsync(optRes -> {
+ if (optRes.isPresent()) {
+ try {
+ ManagedCursorInfo info = ManagedCursorInfo.parseFrom(optRes.get().getValue());
+ callback.operationComplete(info, optRes.get().getStat());
+ } catch (InvalidProtocolBufferException e) {
+ callback.operationFailed(getException(e));
+ }
+ } else {
+ callback.operationFailed(new MetadataNotFoundException("Cursor metadata not found"));
+ }
+ }, executor.chooseThread(ledgerName))
+ .exceptionally(ex -> {
+ executor.executeOrdered(ledgerName, SafeRunnable.safeRun(() -> {
+ callback.operationFailed(getException(ex));
+ }));
+ return null;
+ });
+ }
+
+ @Override
+ public void asyncUpdateCursorInfo(String ledgerName, String cursorName, ManagedCursorInfo info, Stat stat,
+ MetaStoreCallback callback) {
+ log.info("[{}] [{}] Updating cursor info ledgerId={} mark-delete={}:{}", ledgerName, cursorName,
+ info.getCursorsLedgerId(), info.getMarkDeleteLedgerId(), info.getMarkDeleteEntryId());
+
+ String path = PREFIX + ledgerName + "/" + cursorName;
+ byte[] content = info.toByteArray(); // Binary format
+
+ long expectedVersion;
+
+ if (stat != null) {
+ expectedVersion = stat.getVersion();
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Creating consumer {} on meta-data store with {}", ledgerName, cursorName, info);
+ }
+ } else {
+ expectedVersion = -1;
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Updating consumer {} on meta-data store with {}", ledgerName, cursorName, info);
+ }
+ }
+
+ store.put(path, content, Optional.of(expectedVersion))
+ .thenAcceptAsync(optStat -> {
+ callback.operationComplete(null, optStat);
+ }, executor.chooseThread(ledgerName))
+ .exceptionally(ex -> {
+ executor.executeOrdered(ledgerName, SafeRunnable.safeRun(() -> {
+ callback.operationFailed(getException(ex));
+ }));
+ return null;
+ });
+ }
+
+ @Override
+ public void asyncRemoveCursor(String ledgerName, String cursorName, MetaStoreCallback callback) {
+ String path = PREFIX + ledgerName + "/" + cursorName;
+ log.info("[{}] Remove consumer={}", ledgerName, cursorName);
+
+ store.delete(path, Optional.empty())
+ .thenAcceptAsync(v -> {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] [{}] cursor delete done", ledgerName, cursorName);
+ }
+ callback.operationComplete(null, null);
+ }, executor.chooseThread(ledgerName))
+ .exceptionally(ex -> {
+ executor.executeOrdered(ledgerName, SafeRunnable.safeRun(() -> {
+ callback.operationFailed(getException(ex));
+ }));
+ return null;
+ });
+ }
+
+ @Override
+ public void removeManagedLedger(String ledgerName, MetaStoreCallback callback) {
+ log.info("[{}] Remove ManagedLedger", ledgerName);
+
+ String path = PREFIX + ledgerName;
+ store.delete(path, Optional.empty())
+ .thenAcceptAsync(v -> {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] managed ledger delete done", ledgerName);
+ }
+ callback.operationComplete(null, null);
+ }, executor.chooseThread(ledgerName))
+ .exceptionally(ex -> {
+ executor.executeOrdered(ledgerName, SafeRunnable.safeRun(() -> {
+ callback.operationFailed(getException(ex));
+ }));
+ return null;
+ });
+ }
+
+ @Override
+ public Iterable getManagedLedgers() throws MetaStoreException {
+ try {
+ return store.getChildren(BASE_NODE).join();
+ } catch (CompletionException e) {
+ throw getException(e);
+ }
+ }
+
+ //
+ // update timestamp if missing or 0
+ // 3 cases - timestamp does not exist for ledgers serialized before
+ // - timestamp is 0 for a ledger in recovery
+ // - ledger has timestamp which is the normal case now
+
+ private static ManagedLedgerInfo updateMLInfoTimestamp(ManagedLedgerInfo info) {
+ List infoList = new ArrayList<>(info.getLedgerInfoCount());
+ long currentTime = System.currentTimeMillis();
+
+ for (ManagedLedgerInfo.LedgerInfo ledgerInfo : info.getLedgerInfoList()) {
+ if (!ledgerInfo.hasTimestamp() || ledgerInfo.getTimestamp() == 0) {
+ ManagedLedgerInfo.LedgerInfo.Builder singleInfoBuilder = ledgerInfo.toBuilder();
+ singleInfoBuilder.setTimestamp(currentTime);
+ infoList.add(singleInfoBuilder.build());
+ } else {
+ infoList.add(ledgerInfo);
+ }
+ }
+ ManagedLedgerInfo.Builder mlInfo = ManagedLedgerInfo.newBuilder();
+ mlInfo.addAllLedgerInfo(infoList);
+ if (info.hasTerminatedPosition()) {
+ mlInfo.setTerminatedPosition(info.getTerminatedPosition());
+ }
+ return mlInfo.build();
+ }
+
+ private static MetaStoreException getException(Throwable t) {
+ if (t.getCause() instanceof MetadataStoreException.BadVersionException) {
+ return new ManagedLedgerException.BadVersionException(t.getMessage());
+ } else {
+ return new MetaStoreException(t);
+ }
+ }
+}
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplZookeeper.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplZookeeper.java
deleted file mode 100644
index 2e69614f65000..0000000000000
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplZookeeper.java
+++ /dev/null
@@ -1,422 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.bookkeeper.mledger.impl;
-
-import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
-
-import com.google.common.base.Charsets;
-import com.google.protobuf.InvalidProtocolBufferException;
-import com.google.protobuf.TextFormat;
-import com.google.protobuf.TextFormat.ParseException;
-
-import java.io.File;
-import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ForkJoinPool;
-import java.util.function.Consumer;
-
-import org.apache.bookkeeper.common.util.OrderedExecutor;
-import org.apache.bookkeeper.mledger.ManagedLedgerException;
-import org.apache.bookkeeper.mledger.ManagedLedgerException.BadVersionException;
-import org.apache.bookkeeper.mledger.ManagedLedgerException.MetaStoreException;
-import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
-import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo;
-import org.apache.zookeeper.AsyncCallback.StringCallback;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.KeeperException.Code;
-import org.apache.zookeeper.ZooDefs;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.data.ACL;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@SuppressWarnings("checkstyle:javadoctype")
-public class MetaStoreImplZookeeper implements MetaStore {
-
- private static final Charset Encoding = Charsets.UTF_8;
- private static final List Acl = ZooDefs.Ids.OPEN_ACL_UNSAFE;
-
- private static final String prefixName = "/managed-ledgers";
- private static final String prefix = prefixName + "/";
-
- private final ZooKeeper zk;
- private final OrderedExecutor executor;
-
- private static class ZKStat implements Stat {
- private final int version;
- private final long creationTimestamp;
- private final long modificationTimestamp;
-
- ZKStat(org.apache.zookeeper.data.Stat stat) {
- this.version = stat.getVersion();
- this.creationTimestamp = stat.getCtime();
- this.modificationTimestamp = stat.getMtime();
- }
-
- ZKStat() {
- this.version = 0;
- this.creationTimestamp = System.currentTimeMillis();
- this.modificationTimestamp = System.currentTimeMillis();
- }
-
- @Override
- public int getVersion() {
- return version;
- }
-
- @Override
- public long getCreationTimestamp() {
- return creationTimestamp;
- }
-
- @Override
- public long getModificationTimestamp() {
- return modificationTimestamp;
- }
- }
-
- public MetaStoreImplZookeeper(ZooKeeper zk, OrderedExecutor executor)
- throws Exception {
- this.zk = zk;
- this.executor = executor;
- }
-
- //
- // update timestamp if missing or 0
- // 3 cases - timestamp does not exist for ledgers serialized before
- // - timestamp is 0 for a ledger in recovery
- // - ledger has timestamp which is the normal case now
-
- private ManagedLedgerInfo updateMLInfoTimestamp(ManagedLedgerInfo info) {
- List infoList = new ArrayList<>(info.getLedgerInfoCount());
- long currentTime = System.currentTimeMillis();
-
- for (ManagedLedgerInfo.LedgerInfo ledgerInfo : info.getLedgerInfoList()) {
- if (!ledgerInfo.hasTimestamp() || ledgerInfo.getTimestamp() == 0) {
- ManagedLedgerInfo.LedgerInfo.Builder singleInfoBuilder = ledgerInfo.toBuilder();
- singleInfoBuilder.setTimestamp(currentTime);
- infoList.add(singleInfoBuilder.build());
- } else {
- infoList.add(ledgerInfo);
- }
- }
- ManagedLedgerInfo.Builder mlInfo = ManagedLedgerInfo.newBuilder();
- mlInfo.addAllLedgerInfo(infoList);
- if (info.hasTerminatedPosition()) {
- mlInfo.setTerminatedPosition(info.getTerminatedPosition());
- }
- return mlInfo.build();
- }
-
- @Override
- public void getManagedLedgerInfo(final String ledgerName, boolean createIfMissing,
- final MetaStoreCallback callback) {
- // Try to get the content or create an empty node
- zk.getData(prefix + ledgerName, false,
- (rc, path, ctx, readData, stat) -> executor.executeOrdered(ledgerName, safeRun(() -> {
- if (rc == Code.OK.intValue()) {
- try {
- ManagedLedgerInfo info = parseManagedLedgerInfo(readData);
- info = updateMLInfoTimestamp(info);
- callback.operationComplete(info, new ZKStat(stat));
- } catch (ParseException | InvalidProtocolBufferException e) {
- callback.operationFailed(new MetaStoreException(e));
- }
- } else if (rc == Code.NONODE.intValue()) {
- // Z-node doesn't exist
- if (createIfMissing) {
- log.info("Creating '{}{}'", prefix, ledgerName);
-
- StringCallback createcb = (rc1, path1, ctx1, name) -> {
- if (rc1 == Code.OK.intValue()) {
- ManagedLedgerInfo info = ManagedLedgerInfo.getDefaultInstance();
- callback.operationComplete(info, new ZKStat());
- } else {
- callback.operationFailed(
- new MetaStoreException(KeeperException.create(Code.get(rc1))));
- }
- };
-
- asyncCreateFullPathOptimistic(prefixName, ledgerName, new byte[0], Acl,
- CreateMode.PERSISTENT, createcb);
- } else {
- // Tried to open a managed ledger but it doesn't exist and we shouldn't creating it at this
- // point
-
- callback.operationFailed(new ManagedLedgerException.MetadataNotFoundException(
- KeeperException.create(Code.get(rc))));
- }
- } else {
- // Other ZK error
- callback.operationFailed(new MetaStoreException(KeeperException.create(Code.get(rc))));
- }
- })), null);
- }
-
- @Override
- public void asyncUpdateLedgerIds(String ledgerName, ManagedLedgerInfo mlInfo, Stat stat,
- final MetaStoreCallback callback) {
-
- ZKStat zkStat = (ZKStat) stat;
- if (log.isDebugEnabled()) {
- log.debug("[{}] Updating metadata version={} with content={}", ledgerName, zkStat.version, mlInfo);
- }
-
- byte[] serializedMlInfo = mlInfo.toByteArray(); // Binary format
-
- zk.setData(prefix + ledgerName, serializedMlInfo, zkStat.getVersion(),
- (rc, path, zkCtx, stat1) -> executor.executeOrdered(ledgerName, safeRun(() -> {
- if (log.isDebugEnabled()) {
- log.debug("[{}] UpdateLedgersIdsCallback.processResult rc={} newVersion={}", ledgerName,
- Code.get(rc), stat != null ? stat.getVersion() : "null");
- }
- MetaStoreException status = null;
- if (rc == Code.BADVERSION.intValue()) {
- // Content has been modified on ZK since our last read
- status = new BadVersionException(KeeperException.create(Code.get(rc)));
- callback.operationFailed(status);
- } else if (rc != Code.OK.intValue()) {
- status = new MetaStoreException(KeeperException.create(Code.get(rc)));
- callback.operationFailed(status);
- } else {
- callback.operationComplete(null, new ZKStat(stat1));
- }
- })), null);
- }
-
- @Override
- public void getCursors(final String ledgerName, final MetaStoreCallback> callback) {
- if (log.isDebugEnabled()) {
- log.debug("[{}] Get cursors list", ledgerName);
- }
- zk.getChildren(prefix + ledgerName, false,
- (rc, path, ctx, children, stat) -> executor.executeOrdered(ledgerName, safeRun(() -> {
- if (log.isDebugEnabled()) {
- log.debug("[{}] getConsumers complete rc={} children={}", ledgerName, Code.get(rc), children);
- }
- if (rc != Code.OK.intValue()) {
- callback.operationFailed(new MetaStoreException(KeeperException.create(Code.get(rc))));
- return;
- }
-
- if (log.isDebugEnabled()) {
- log.debug("[{}] Get childrend completed version={}", ledgerName, stat.getVersion());
- }
- callback.operationComplete(children, new ZKStat(stat));
- })), null);
- }
-
- @Override
- public void asyncGetCursorInfo(String ledgerName, String consumerName,
- final MetaStoreCallback callback) {
- String path = prefix + ledgerName + "/" + consumerName;
- if (log.isDebugEnabled()) {
- log.debug("Reading from {}", path);
- }
-
- zk.getData(path, false, (rc, path1, ctx, data, stat) -> executor.executeOrdered(ledgerName, safeRun(() -> {
- if (rc != Code.OK.intValue()) {
- callback.operationFailed(new MetaStoreException(KeeperException.create(Code.get(rc))));
- } else {
- try {
- ManagedCursorInfo info = parseManagedCursorInfo(data);
- callback.operationComplete(info, new ZKStat(stat));
- } catch (ParseException | InvalidProtocolBufferException e) {
- callback.operationFailed(new MetaStoreException(e));
- }
- }
- })), null);
-
- if (log.isDebugEnabled()) {
- log.debug("Reading from {} ok", path);
- }
- }
-
- @Override
- public void asyncUpdateCursorInfo(final String ledgerName, final String cursorName, final ManagedCursorInfo info,
- Stat stat, final MetaStoreCallback callback) {
- log.info("[{}] [{}] Updating cursor info ledgerId={} mark-delete={}:{}", ledgerName, cursorName,
- info.getCursorsLedgerId(), info.getMarkDeleteLedgerId(), info.getMarkDeleteEntryId());
-
- String path = prefix + ledgerName + "/" + cursorName;
- byte[] content = info.toByteArray(); // Binary format
-
- if (stat == null) {
- if (log.isDebugEnabled()) {
- log.debug("[{}] Creating consumer {} on meta-data store with {}", ledgerName, cursorName, info);
- }
- zk.create(path, content, Acl, CreateMode.PERSISTENT,
- (rc, path1, ctx, name) -> executor.executeOrdered(ledgerName, safeRun(() -> {
- if (rc != Code.OK.intValue()) {
- log.warn("[{}] Error creating cosumer {} node on meta-data store with {}: ", ledgerName,
- cursorName, info, Code.get(rc));
- callback.operationFailed(new MetaStoreException(KeeperException.create(Code.get(rc))));
- } else {
- if (log.isDebugEnabled()) {
- log.debug("[{}] Created consumer {} on meta-data store with {}", ledgerName, cursorName,
- info);
- }
- callback.operationComplete(null, new ZKStat());
- }
- })), null);
- } else {
- ZKStat zkStat = (ZKStat) stat;
- if (log.isDebugEnabled()) {
- log.debug("[{}] Updating consumer {} on meta-data store with {}", ledgerName, cursorName, info);
- }
- zk.setData(path, content, zkStat.getVersion(),
- (rc, path1, ctx, stat1) -> executor.executeOrdered(ledgerName, safeRun(() -> {
- if (rc == Code.BADVERSION.intValue()) {
- callback.operationFailed(new BadVersionException(KeeperException.create(Code.get(rc))));
- } else if (rc != Code.OK.intValue()) {
- callback.operationFailed(new MetaStoreException(KeeperException.create(Code.get(rc))));
- } else {
- callback.operationComplete(null, new ZKStat(stat1));
- }
- })), null);
- }
- }
-
- @Override
- public void asyncRemoveCursor(final String ledgerName, final String consumerName,
- final MetaStoreCallback callback) {
- log.info("[{}] Remove consumer={}", ledgerName, consumerName);
- zk.delete(prefix + ledgerName + "/" + consumerName, -1,
- (rc, path, ctx) -> executor.executeOrdered(ledgerName, safeRun(() -> {
- if (log.isDebugEnabled()) {
- log.debug("[{}] [{}] zk delete done. rc={}", ledgerName, consumerName, Code.get(rc));
- }
- if (rc == Code.OK.intValue()) {
- callback.operationComplete(null, null);
- } else {
- callback.operationFailed(new MetaStoreException(KeeperException.create(Code.get(rc))));
- }
- })), null);
- }
-
- @Override
- public void removeManagedLedger(String ledgerName, MetaStoreCallback callback) {
- log.info("[{}] Remove ManagedLedger", ledgerName);
- zk.delete(prefix + ledgerName, -1, (rc, path, ctx) -> executor.executeOrdered(ledgerName, safeRun(() -> {
- if (log.isDebugEnabled()) {
- log.debug("[{}] zk delete done. rc={}", ledgerName, Code.get(rc));
- }
- if (rc == Code.OK.intValue()) {
- callback.operationComplete(null, null);
- } else {
- callback.operationFailed(new MetaStoreException(KeeperException.create(Code.get(rc))));
- }
- })), null);
- }
-
- @Override
- public Iterable getManagedLedgers() throws MetaStoreException {
- try {
- return zk.getChildren(prefixName, false);
- } catch (Exception e) {
- throw new MetaStoreException(e);
- }
- }
-
- private ManagedLedgerInfo parseManagedLedgerInfo(byte[] data)
- throws ParseException, InvalidProtocolBufferException {
- // First try binary format, then fallback to text
- try {
- return ManagedLedgerInfo.parseFrom(data);
- } catch (InvalidProtocolBufferException e) {
- // Fallback to parsing protobuf text format
- ManagedLedgerInfo.Builder builder = ManagedLedgerInfo.newBuilder();
- TextFormat.merge(new String(data, Encoding), builder);
- return builder.build();
- }
- }
-
- private ManagedCursorInfo parseManagedCursorInfo(byte[] data)
- throws ParseException, InvalidProtocolBufferException {
- // First try binary format, then fallback to text
- try {
- return ManagedCursorInfo.parseFrom(data);
- } catch (InvalidProtocolBufferException e) {
- // Fallback to parsing protobuf text format
- ManagedCursorInfo.Builder builder = ManagedCursorInfo.newBuilder();
- TextFormat.merge(new String(data, Encoding), builder);
- return builder.build();
- }
-
- }
-
- void asyncCreateFullPathOptimistic(
- final String basePath, final String nodePath, final byte[] data,
- final List acl, final CreateMode createMode, final StringCallback callback) {
- String fullPath = basePath + "/" + nodePath;
-
- zk.create(fullPath, data, acl, createMode,
- (rc, path, ignoreCtx1, name) -> {
- Runnable retry = () -> {
- asyncCreateFullPathOptimistic(basePath, nodePath, data,
- acl, createMode, callback);
- };
-
- Consumer complete = (finalrc) -> {
- executor.executeOrdered(nodePath, safeRun(() -> {
- callback.processResult(finalrc, path, null, name);
- }));
- };
-
- if (rc != Code.NONODE.intValue()) {
- complete.accept(rc);
- return;
- }
-
- // Since I got a nonode, it means that my parents don't exist
- // create mode is persistent since ephemeral nodes can't be
- // parents
- String nodeParent = new File(nodePath).getParent();
- if (nodeParent == null) {
- zk.exists(basePath, false,
- (existsRc, existsPath, ignoreCtx2, stat) -> {
- if (existsRc == Code.OK.intValue()) {
- if (stat != null) {
- retry.run();
- } else {
- complete.accept(Code.NONODE.intValue());
- }
- } else {
- complete.accept(existsRc);
- }
- }, null);
- } else {
- nodeParent = nodeParent.replace("\\", "/");
- asyncCreateFullPathOptimistic(
- basePath, nodeParent, new byte[0], acl, CreateMode.PERSISTENT,
- (parentRc, parentPath, ignoreCtx3, parentName) -> {
- if (parentRc == Code.OK.intValue() || parentRc == Code.NODEEXISTS.intValue()) {
- retry.run();
- } else {
- complete.accept(parentRc);
- }
- });
- }
- }, null);
- }
-
- private static final Logger log = LoggerFactory.getLogger(MetaStoreImplZookeeper.class);
-}
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyManagedLedgerImpl.java
index 9721b15bd4280..621131752720a 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyManagedLedgerImpl.java
@@ -34,9 +34,9 @@
import org.apache.bookkeeper.mledger.ManagedLedgerException.MetadataNotFoundException;
import org.apache.bookkeeper.mledger.ReadOnlyCursor;
import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback;
-import org.apache.bookkeeper.mledger.impl.MetaStore.Stat;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
+import org.apache.pulsar.metadata.api.Stat;
@Slf4j
public class ReadOnlyManagedLedgerImpl extends ManagedLedgerImpl {
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index b0db9264d8f91..c3a1acd42b326 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -77,10 +77,10 @@
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.VoidCallback;
import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback;
-import org.apache.bookkeeper.mledger.impl.MetaStore.Stat;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
+import org.apache.pulsar.metadata.api.Stat;
import org.apache.zookeeper.KeeperException.Code;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@@ -92,7 +92,7 @@
public class ManagedCursorTest extends MockedBookKeeperTestCase {
private static final Charset Encoding = Charsets.UTF_8;
-
+
@DataProvider(name = "useOpenRangeSet")
public static Object[][] useOpenRangeSet() {
return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } };
@@ -2873,7 +2873,7 @@ public void testRecoverCursorAheadOfLastPosition() throws Exception {
final long markDeleteLedgerId = 2L;
final long markDeleteEntryId = -1L;
- MetaStoreImplZookeeper mockMetaStore = mock(MetaStoreImplZookeeper.class);
+ MetaStore mockMetaStore = mock(MetaStore.class);
doAnswer(new Answer