Skip to content

Commit

Permalink
fix: handle BerkeleyJE DB interruption [tp-tests]
Browse files Browse the repository at this point in the history
  • Loading branch information
tien committed Apr 29, 2024
1 parent 1c53402 commit d17a1fe
Show file tree
Hide file tree
Showing 2 changed files with 136 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.sleepycat.je.Environment;
import com.sleepycat.je.EnvironmentConfig;
import com.sleepycat.je.LockMode;
import com.sleepycat.je.ThreadInterruptedException;
import com.sleepycat.je.Transaction;
import com.sleepycat.je.TransactionConfig;
import org.janusgraph.diskstorage.BackendException;
Expand Down Expand Up @@ -97,10 +98,7 @@ public BerkeleyJEStoreManager(Configuration configuration) throws BackendExcepti
super(configuration);
stores = new HashMap<>();

int cachePercentage = configuration.get(JVM_CACHE);
boolean sharedCache = configuration.get(SHARED_CACHE);
CacheMode cacheMode = ConfigOption.getEnumValue(configuration.get(CACHE_MODE), CacheMode.class);
initialize(cachePercentage, sharedCache, cacheMode);
initialize();

features = new StandardStoreFeatures.Builder()
.orderedScan(true)
Expand All @@ -117,8 +115,22 @@ public BerkeleyJEStoreManager(Configuration configuration) throws BackendExcepti
.build();
}

private void initialize(int cachePercent, final boolean sharedCache, final CacheMode cacheMode) throws BackendException {
private synchronized void initialize() throws BackendException {
try {
if (environment != null) {
if (environment.isValid()) {
return;
}

if (!environment.isClosed()) {
environment.close();
}
}

int cachePercent = storageConfig.get(JVM_CACHE);
boolean sharedCache = storageConfig.get(SHARED_CACHE);
CacheMode cacheMode = ConfigOption.getEnumValue(storageConfig.get(CACHE_MODE), CacheMode.class);

EnvironmentConfig envConfig = new EnvironmentConfig();
envConfig.setAllowCreate(true);
envConfig.setTransactional(transactional);
Expand All @@ -134,6 +146,9 @@ private void initialize(int cachePercent, final boolean sharedCache, final Cache
//Open the environment
environment = new Environment(directory, envConfig);

for (Map.Entry<String, BerkeleyJEKeyValueStore> store : stores.entrySet()) {
openDatabase(store.getKey(), true);
}
} catch (DatabaseException e) {
throw new PermanentBackendException("Error during BerkeleyJE initialization: ", e);
}
Expand All @@ -150,13 +165,12 @@ public List<KeyRange> getLocalKeyPartition() throws BackendException {
throw new UnsupportedOperationException();
}

@Override
public BerkeleyJETx beginTransaction(final BaseTransactionConfig txCfg) throws BackendException {
public BerkeleyJETx beginTransaction(final BaseTransactionConfig txCfg, boolean retry) throws BackendException {
try {
Transaction tx = null;

Configuration effectiveCfg =
new MergedConfiguration(txCfg.getCustomOptions(), getStorageConfig());
new MergedConfiguration(txCfg.getCustomOptions(), getStorageConfig());

if (transactional) {
TransactionConfig txnConfig = new TransactionConfig();
Expand All @@ -182,15 +196,32 @@ public BerkeleyJETx beginTransaction(final BaseTransactionConfig txCfg) throws B
}

return btx;
} catch (ThreadInterruptedException e) {
if (!retry) {
throw e;
}

initialize();

return beginTransaction(txCfg, false);
} catch (DatabaseException e) {
throw new PermanentBackendException("Could not start BerkeleyJE transaction", e);
}
}

@Override
public BerkeleyJETx beginTransaction(final BaseTransactionConfig txCfg) throws BackendException {
return beginTransaction(txCfg, true);
}

@Override
public BerkeleyJEKeyValueStore openDatabase(String name) throws BackendException {
return openDatabase(name, false);
}

private BerkeleyJEKeyValueStore openDatabase(String name, boolean force) throws BackendException {
Preconditions.checkNotNull(name);
if (stores.containsKey(name)) {
if (!force && stores.containsKey(name)) {
return stores.get(name);
}
try {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
// Copyright 2022 JanusGraph Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package org.janusgraph;

import org.apache.commons.io.FileUtils;
import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalInterruptedException;
import org.janusgraph.core.JanusGraph;
import org.janusgraph.core.JanusGraphException;
import org.janusgraph.core.JanusGraphFactory;
import org.janusgraph.diskstorage.configuration.ModifiableConfiguration;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.RepeatedTest;

import java.io.File;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.STORAGE_DIRECTORY;

@Disabled
public class BerkeleyInterruptionTest {
JanusGraph graph;

@BeforeEach
void setUp() {
final ModifiableConfiguration config = BerkeleyStorageSetup.getBerkeleyJEConfiguration();
final String dir = config.get(STORAGE_DIRECTORY);
FileUtils.deleteQuietly(new File(dir));
graph = JanusGraphFactory.open(config);
}

@AfterEach
void tearDown() {
graph.close();
}

@RepeatedTest(5)
public void test() throws InterruptedException {
for (int i = 0; i < 5000; i++) {
graph.traversal()
.addV("V")
.property("a", "bb" + i)
.property("b", "bb" + i)
.addV("V")
.property("a", "bb" + i)
.property("b", "bb" + i)
.addV("V")
.property("a", "bb" + i)
.property("b", "bb" + i)
.iterate();

if (i % 10_000 == 0) {
graph.tx().commit();
}
}
graph.tx().commit();

final ExecutorService executorService = Executors.newSingleThreadExecutor();
final CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
graph.traversal().V().elementMap().count().next();
}, executorService);

Thread.sleep(200);
executorService.shutdownNow();

try {
future.get();
} catch (ExecutionException e) {
Assertions.assertEquals(TraversalInterruptedException.class, e.getCause().getClass(), e.getMessage());
}

try {
Assertions.assertEquals(15000, graph.traversal().V().count().next());
} catch (JanusGraphException e) {
Assertions.fail("bdb should be reopened");
}

Assertions.assertEquals(15000, graph.traversal().V().count().next());
}
}

0 comments on commit d17a1fe

Please sign in to comment.