Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker] Throw AlreadyClosedException while request to a closed metadata store #19055

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -545,7 +545,14 @@ public CompletableFuture<Void> closeAsync() {
offloadersCache.close();

if (coordinationService != null) {
coordinationService.close();
try {
coordinationService.close();
} catch (Exception e) {
Throwable cause = FutureUtil.unwrapCompletionException(e);
if (!(cause instanceof MetadataStoreException.AlreadyClosedException)) {
throw e;
}
}
}

closeLocalMetadataStore();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,10 @@ public LockBusyException(String msg) {
* The store was already closed.
*/
public static class AlreadyClosedException extends MetadataStoreException {

public AlreadyClosedException() {
super("The metadata store is closed");
}
public AlreadyClosedException(Throwable t) {
super(t);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.common.util.SafeRunnable;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.MetadataCache;
Expand Down Expand Up @@ -178,10 +177,8 @@ public CompletableFuture<Void> asyncClose() {
this.state = State.Closed;
}

return FutureUtils.collect(
locks.values().stream()
.map(ResourceLock::release)
.collect(Collectors.toList()))
.thenApply(x -> null);
return FutureUtil.waitForAll(locks.values().stream()
.map(ResourceLock::release)
.collect(Collectors.toList()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import lombok.Getter;
Expand Down Expand Up @@ -79,6 +80,8 @@ public abstract class AbstractMetadataStore implements MetadataStoreExtended, Co
@Getter
private boolean isConnected = true;

protected final AtomicBoolean isClosed = new AtomicBoolean(false);

protected abstract CompletableFuture<List<String>> getChildrenFromStore(String path);

protected abstract CompletableFuture<Boolean> existsFromStore(String path);
Expand Down Expand Up @@ -240,6 +243,10 @@ public <T> MetadataCache<T> getMetadataCache(MetadataSerde<T> serde, MetadataCac

@Override
public CompletableFuture<Optional<GetResult>> get(String path) {
if (isClosed()) {
return FutureUtil.failedFuture(
new MetadataStoreException.AlreadyClosedException());
}
long start = System.currentTimeMillis();
if (!isValidPath(path)) {
metadataStoreStats.recordGetOpsFailed(System.currentTimeMillis() - start);
Expand All @@ -265,6 +272,10 @@ public CompletableFuture<Stat> put(String path, byte[] value, Optional<Long> exp

@Override
public final CompletableFuture<List<String>> getChildren(String path) {
if (isClosed()) {
return FutureUtil.failedFuture(
new MetadataStoreException.AlreadyClosedException());
}
if (!isValidPath(path)) {
return FutureUtil.failedFuture(new MetadataStoreException.InvalidPathException(path));
}
Expand All @@ -273,6 +284,10 @@ public final CompletableFuture<List<String>> getChildren(String path) {

@Override
public final CompletableFuture<Boolean> exists(String path) {
if (isClosed()) {
return FutureUtil.failedFuture(
new MetadataStoreException.AlreadyClosedException());
}
if (!isValidPath(path)) {
return FutureUtil.failedFuture(new MetadataStoreException.InvalidPathException(path));
}
Expand All @@ -281,7 +296,10 @@ public final CompletableFuture<Boolean> exists(String path) {

@Override
public void registerListener(Consumer<Notification> listener) {
listeners.add(listener);
// If the metadata store is closed, do nothing here.
if (!isClosed()) {
listeners.add(listener);
}
}

protected CompletableFuture<Void> receivedNotification(Notification notification) {
Expand Down Expand Up @@ -328,6 +346,10 @@ public void accept(Notification n) {

@Override
public final CompletableFuture<Void> delete(String path, Optional<Long> expectedVersion) {
if (isClosed()) {
return FutureUtil.failedFuture(
new MetadataStoreException.AlreadyClosedException());
}
long start = System.currentTimeMillis();
if (!isValidPath(path)) {
metadataStoreStats.recordDelOpsFailed(System.currentTimeMillis() - start);
Expand Down Expand Up @@ -373,6 +395,10 @@ private CompletableFuture<Void> deleteInternal(String path, Optional<Long> expec

@Override
public CompletableFuture<Void> deleteRecursive(String path) {
if (isClosed()) {
return FutureUtil.failedFuture(
new MetadataStoreException.AlreadyClosedException());
}
return getChildren(path)
.thenCompose(children -> FutureUtil.waitForAll(
children.stream()
Expand All @@ -394,6 +420,10 @@ protected abstract CompletableFuture<Stat> storePut(String path, byte[] data, Op
@Override
public final CompletableFuture<Stat> put(String path, byte[] data, Optional<Long> optExpectedVersion,
EnumSet<CreateOption> options) {
if (isClosed()) {
return FutureUtil.failedFuture(
new MetadataStoreException.AlreadyClosedException());
}
long start = System.currentTimeMillis();
if (!isValidPath(path)) {
metadataStoreStats.recordPutOpsFailed(System.currentTimeMillis() - start);
Expand Down Expand Up @@ -472,6 +502,10 @@ protected void receivedSessionEvent(SessionEvent event) {
}
}

private boolean isClosed() {
return isClosed.get();
}

@Override
public void close() throws Exception {
executor.shutdownNow();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,22 +159,24 @@ private File readFile(String path) {

@Override
public void close() throws Exception {
super.close();
if (isClosed.compareAndSet(false, true)) {
super.close();

if (sessionWatcher != null) {
sessionWatcher.close();
}
if (sessionWatcher != null) {
sessionWatcher.close();
}

if (leaseClient != null) {
leaseClient.close();
}
if (leaseClient != null) {
leaseClient.close();
}

if (leaseId != 0) {
client.getLeaseClient().revoke(leaseId);
}
if (leaseId != 0) {
client.getLeaseClient().revoke(leaseId);
}

kv.close();
client.close();
kv.close();
client.close();
}
}

private static final GetOption EXISTS_GET_OPTION = GetOption.newBuilder().withCountOnly(true).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,4 +229,11 @@ public CompletableFuture<Void> storeDelete(String path, Optional<Long> optExpect
public Optional<MetadataEventSynchronizer> getMetadataEventSynchronizer() {
return Optional.ofNullable(synchronizer);
}

@Override
public void close() throws Exception {
if (isClosed.compareAndSet(false, true)) {
super.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,17 +85,10 @@ public class RocksdbMetadataStore extends AbstractMetadataStore {

private final TransactionDB db;
private final ReentrantReadWriteLock dbStateLock;
private volatile State state;

private final WriteOptions writeOptions;
private final ReadOptions optionCache;
private final ReadOptions optionDontCache;
private MetadataEventSynchronizer synchronizer;

enum State {
RUNNING, CLOSED
}

private int referenceCount = 1;

private static final Map<String, RocksdbMetadataStore> instancesCache = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -246,7 +239,6 @@ private RocksdbMetadataStore(String metadataURL, MetadataStoreConfig metadataSto
close();
throw new MetadataStoreException("Error init metastore state", exception);
}
state = State.RUNNING;
dbStateLock = new ReentrantReadWriteLock();
log.info("new RocksdbMetadataStore,url={},instanceId={}", metadataStoreConfig, instanceId);
}
Expand Down Expand Up @@ -357,24 +349,20 @@ public synchronized void close() throws MetadataStoreException {
}

instancesCache.remove(this.metadataUrl, this);

if (state == State.CLOSED) {
//already closed.
return;
}
try {
dbStateLock.writeLock().lock();
state = State.CLOSED;
log.info("close.instanceId={}", instanceId);
db.close();
writeOptions.close();
optionCache.close();
optionDontCache.close();
super.close();
} catch (Throwable throwable) {
throw MetadataStoreException.wrap(throwable);
} finally {
dbStateLock.writeLock().unlock();
if (isClosed.compareAndSet(false, true)) {
try {
dbStateLock.writeLock().lock();
log.info("close.instanceId={}", instanceId);
db.close();
writeOptions.close();
optionCache.close();
optionDontCache.close();
super.close();
} catch (Throwable throwable) {
throw MetadataStoreException.wrap(throwable);
} finally {
dbStateLock.writeLock().unlock();
}
}
}

Expand All @@ -385,9 +373,6 @@ public CompletableFuture<Optional<GetResult>> storeGet(String path) {
}
try {
dbStateLock.readLock().lock();
if (state == State.CLOSED) {
throw new MetadataStoreException.AlreadyClosedException("");
}
byte[] value = db.get(optionCache, toBytes(path));
if (value == null) {
return CompletableFuture.completedFuture(Optional.empty());
Expand Down Expand Up @@ -420,9 +405,6 @@ protected CompletableFuture<List<String>> getChildrenFromStore(String path) {
}
try {
dbStateLock.readLock().lock();
if (state == State.CLOSED) {
throw new MetadataStoreException.AlreadyClosedException("");
}
try (RocksIterator iterator = db.newIterator(optionDontCache)) {
Set<String> result = new HashSet<>();
String firstKey = path.equals("/") ? path : path + "/";
Expand Down Expand Up @@ -465,9 +447,6 @@ protected CompletableFuture<Boolean> existsFromStore(String path) {
}
try {
dbStateLock.readLock().lock();
if (state == State.CLOSED) {
throw new MetadataStoreException.AlreadyClosedException("");
}
byte[] value = db.get(optionDontCache, toBytes(path));
if (log.isDebugEnabled()) {
if (value != null) {
Expand All @@ -490,9 +469,6 @@ protected CompletableFuture<Void> storeDelete(String path, Optional<Long> expect
}
try {
dbStateLock.readLock().lock();
if (state == State.CLOSED) {
throw new MetadataStoreException.AlreadyClosedException("");
}
try (Transaction transaction = db.beginTransaction(writeOptions)) {
byte[] pathBytes = toBytes(path);
byte[] oldValueData = transaction.getForUpdate(optionDontCache, pathBytes, true);
Expand Down Expand Up @@ -529,9 +505,6 @@ protected CompletableFuture<Stat> storePut(String path, byte[] data, Optional<Lo
}
try {
dbStateLock.readLock().lock();
if (state == State.CLOSED) {
throw new MetadataStoreException.AlreadyClosedException("");
}
try (Transaction transaction = db.beginTransaction(writeOptions)) {
byte[] pathBytes = toBytes(path);
byte[] oldValueData = transaction.getForUpdate(optionDontCache, pathBytes, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -424,13 +424,15 @@ private void internalStorePut(OpPut opPut) {

@Override
public void close() throws Exception {
if (isZkManaged) {
zkc.close();
}
if (sessionWatcher != null) {
sessionWatcher.close();
if (isClosed.compareAndSet(false, true)) {
if (isZkManaged) {
zkc.close();
}
if (sessionWatcher != null) {
sessionWatcher.close();
}
super.close();
}
super.close();
}

private Stat getStat(String path, org.apache.zookeeper.data.Stat zkStat) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -543,4 +543,48 @@ public void testGetChildren(String provider, Supplier<String> urlSupplier) throw
assertTrue(expectedSet3.contains(subPath));
}
}

@Test(dataProvider = "impl")
public void testClosedMetadataStore(String provider, Supplier<String> urlSupplier) throws Exception {
@Cleanup
MetadataStore store = MetadataStoreFactory.create(urlSupplier.get(),
MetadataStoreConfig.builder().fsyncEnable(false).build());
store.close();
try {
store.get("/a").get();
fail();
} catch (Exception e) {
assertTrue(e.getCause() instanceof MetadataStoreException.AlreadyClosedException);
}
try {
store.put("/a", new byte[0], Optional.empty()).get();
fail();
} catch (Exception e) {
assertTrue(e.getCause() instanceof MetadataStoreException.AlreadyClosedException);
}
try {
store.delete("/a", Optional.empty()).get();
fail();
} catch (Exception e) {
assertTrue(e.getCause() instanceof MetadataStoreException.AlreadyClosedException);
}
try {
store.deleteRecursive("/a").get();
fail();
} catch (Exception e) {
assertTrue(e.getCause() instanceof MetadataStoreException.AlreadyClosedException);
}
try {
store.getChildren("/a").get();
fail();
} catch (Exception e) {
assertTrue(e.getCause() instanceof MetadataStoreException.AlreadyClosedException);
}
try {
store.exists("/a").get();
fail();
} catch (Exception e) {
assertTrue(e.getCause() instanceof MetadataStoreException.AlreadyClosedException);
}
}
}