diff --git a/janusgraph-berkeleyje/src/main/java/org/janusgraph/diskstorage/berkeleyje/BerkeleyJEKeyValueStore.java b/janusgraph-berkeleyje/src/main/java/org/janusgraph/diskstorage/berkeleyje/BerkeleyJEKeyValueStore.java index 9df3d9b802..d0befe1195 100644 --- a/janusgraph-berkeleyje/src/main/java/org/janusgraph/diskstorage/berkeleyje/BerkeleyJEKeyValueStore.java +++ b/janusgraph-berkeleyje/src/main/java/org/janusgraph/diskstorage/berkeleyje/BerkeleyJEKeyValueStore.java @@ -27,6 +27,7 @@ import com.sleepycat.je.OperationStatus; import com.sleepycat.je.Put; import com.sleepycat.je.ReadOptions; +import com.sleepycat.je.ThreadInterruptedException; import com.sleepycat.je.Transaction; import com.sleepycat.je.WriteOptions; import org.janusgraph.diskstorage.BackendException; @@ -48,8 +49,11 @@ import java.util.Map; import java.util.NoSuchElementException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; +import static org.janusgraph.diskstorage.berkeleyje.BerkeleyJEStoreManager.convertThreadInterruptedException; + public class BerkeleyJEKeyValueStore implements OrderedKeyValueStore { private static final Logger log = LoggerFactory.getLogger(BerkeleyJEKeyValueStore.class); @@ -60,13 +64,13 @@ public class BerkeleyJEKeyValueStore implements OrderedKeyValueStore { public static Function ttlConverter = ttl -> (int) Math.max(1, Duration.of(ttl, ChronoUnit.SECONDS).toHours()); - private final Database db; + private final AtomicReference db = new AtomicReference<>(); private final String name; private final BerkeleyJEStoreManager manager; private boolean isOpen; public BerkeleyJEKeyValueStore(String n, Database data, BerkeleyJEStoreManager m) { - db = data; + db.set(data); name = n; manager = m; isOpen = true; @@ -74,7 +78,7 @@ public BerkeleyJEKeyValueStore(String n, Database data, BerkeleyJEStoreManager m public DatabaseConfig getConfiguration() throws BackendException { try { - return db.getConfig(); + return db.get().getConfig(); } catch (DatabaseException e) { throw new PermanentBackendException(e); } @@ -92,7 +96,7 @@ private static Transaction getTransaction(StoreTransaction txh) { private Cursor openCursor(StoreTransaction txh) throws BackendException { Preconditions.checkArgument(txh!=null); - return ((BerkeleyJETx) txh).openCursor(db); + return ((BerkeleyJETx) txh).openCursor(db.get()); } private static void closeCursor(StoreTransaction txh, Cursor cursor) { @@ -100,10 +104,16 @@ private static void closeCursor(StoreTransaction txh, Cursor cursor) { ((BerkeleyJETx) txh).closeCursor(cursor); } + void reopen(final Database db) { + this.db.set(db); + } + @Override public synchronized void close() throws BackendException { try { - if(isOpen) db.close(); + if (isOpen) db.get().close(); + } catch (ThreadInterruptedException ignored) { + // environment will be closed } catch (DatabaseException e) { throw new PermanentBackendException(e); } @@ -120,7 +130,7 @@ public StaticBuffer get(StaticBuffer key, StoreTransaction txh) throws BackendEx log.trace("db={}, op=get, tx={}", name, txh); - OperationResult result = db.get(tx, databaseKey, data, Get.SEARCH, getReadOptions(txh)); + OperationResult result = db.get().get(tx, databaseKey, data, Get.SEARCH, getReadOptions(txh)); if (result != null) { return getBuffer(data); @@ -153,6 +163,7 @@ public RecordIterator getSlice(KVQuery query, StoreTransaction tx final DatabaseEntry foundKey = keyStart.as(ENTRY_FACTORY); final DatabaseEntry foundData = new DatabaseEntry(); final Cursor cursor = openCursor(txh); + final ReadOptions readOptions = getReadOptions(txh); return new RecordIterator() { private OperationStatus status; @@ -182,9 +193,9 @@ private KeyValueEntry getNextEntry() { } while (!selector.reachedLimit()) { if (status == null) { - status = cursor.get(foundKey, foundData, Get.SEARCH_GTE, getReadOptions(txh)) == null ? OperationStatus.NOTFOUND : OperationStatus.SUCCESS; + status = get(Get.SEARCH_GTE, readOptions); } else { - status = cursor.get(foundKey, foundData, Get.NEXT, getReadOptions(txh)) == null ? OperationStatus.NOTFOUND : OperationStatus.SUCCESS; + status = get(Get.NEXT, readOptions); } if (status != OperationStatus.SUCCESS) { break; @@ -203,6 +214,16 @@ private KeyValueEntry getNextEntry() { return null; } + private OperationStatus get(Get get, ReadOptions readOptions) { + try { + return cursor.get(foundKey, foundData, get, readOptions) == null + ? OperationStatus.NOTFOUND + : OperationStatus.SUCCESS; + } catch (ThreadInterruptedException e) { + throw convertThreadInterruptedException(e); + } + } + @Override public void close() { closeCursor(txh, cursor); @@ -237,13 +258,17 @@ public void insert(StaticBuffer key, StaticBuffer value, StoreTransaction txh, b int convertedTtl = ttlConverter.apply(ttl); writeOptions.setTTL(convertedTtl, TimeUnit.HOURS); } - if (allowOverwrite) { - OperationResult result = db.put(tx, key.as(ENTRY_FACTORY), value.as(ENTRY_FACTORY), Put.OVERWRITE, writeOptions); - EnvironmentFailureException.assertState(result != null); - status = OperationStatus.SUCCESS; - } else { - OperationResult result = db.put(tx, key.as(ENTRY_FACTORY), value.as(ENTRY_FACTORY), Put.NO_OVERWRITE, writeOptions); - status = result == null ? OperationStatus.KEYEXIST : OperationStatus.SUCCESS; + try { + if (allowOverwrite) { + OperationResult result = db.get().put(tx, key.as(ENTRY_FACTORY), value.as(ENTRY_FACTORY), Put.OVERWRITE, writeOptions); + EnvironmentFailureException.assertState(result != null); + status = OperationStatus.SUCCESS; + } else { + OperationResult result = db.get().put(tx, key.as(ENTRY_FACTORY), value.as(ENTRY_FACTORY), Put.NO_OVERWRITE, writeOptions); + status = result == null ? OperationStatus.KEYEXIST : OperationStatus.SUCCESS; + } + } catch (ThreadInterruptedException e) { + throw convertThreadInterruptedException(e); } if (status != OperationStatus.SUCCESS) { @@ -257,10 +282,12 @@ public void delete(StaticBuffer key, StoreTransaction txh) throws BackendExcepti Transaction tx = getTransaction(txh); try { log.trace("db={}, op=delete, tx={}", name, txh); - OperationStatus status = db.delete(tx, key.as(ENTRY_FACTORY)); + OperationStatus status = db.get().delete(tx, key.as(ENTRY_FACTORY)); if (status != OperationStatus.SUCCESS && status != OperationStatus.NOTFOUND) { throw new PermanentBackendException("Could not remove: " + status); } + } catch (ThreadInterruptedException e) { + throw convertThreadInterruptedException(e); } catch (DatabaseException e) { throw new PermanentBackendException(e); } diff --git a/janusgraph-berkeleyje/src/main/java/org/janusgraph/diskstorage/berkeleyje/BerkeleyJEStoreManager.java b/janusgraph-berkeleyje/src/main/java/org/janusgraph/diskstorage/berkeleyje/BerkeleyJEStoreManager.java index 632d37914e..e20d6cceb6 100644 --- a/janusgraph-berkeleyje/src/main/java/org/janusgraph/diskstorage/berkeleyje/BerkeleyJEStoreManager.java +++ b/janusgraph-berkeleyje/src/main/java/org/janusgraph/diskstorage/berkeleyje/BerkeleyJEStoreManager.java @@ -14,7 +14,6 @@ package org.janusgraph.diskstorage.berkeleyje; - import com.google.common.base.Preconditions; import com.sleepycat.je.CacheMode; import com.sleepycat.je.Database; @@ -23,8 +22,10 @@ import com.sleepycat.je.Environment; import com.sleepycat.je.EnvironmentConfig; import com.sleepycat.je.LockMode; +import com.sleepycat.je.ThreadInterruptedException; import com.sleepycat.je.Transaction; import com.sleepycat.je.TransactionConfig; +import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalInterruptedException; import org.janusgraph.diskstorage.BackendException; import org.janusgraph.diskstorage.BaseTransactionConfig; import org.janusgraph.diskstorage.PermanentBackendException; @@ -51,6 +52,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; import static org.janusgraph.diskstorage.configuration.ConfigOption.disallowEmpty; @@ -90,17 +92,14 @@ public class BerkeleyJEStoreManager extends LocalStoreManager implements Ordered private final Map stores; - protected Environment environment; + protected AtomicReference environment = new AtomicReference<>(); protected final StoreFeatures features; public BerkeleyJEStoreManager(Configuration configuration) throws BackendException { super(configuration); stores = new HashMap<>(); - int cachePercentage = configuration.get(JVM_CACHE); - boolean sharedCache = configuration.get(SHARED_CACHE); - CacheMode cacheMode = ConfigOption.getEnumValue(configuration.get(CACHE_MODE), CacheMode.class); - initialize(cachePercentage, sharedCache, cacheMode); + initialize(configuration); features = new StandardStoreFeatures.Builder() .orderedScan(true) @@ -117,7 +116,10 @@ public BerkeleyJEStoreManager(Configuration configuration) throws BackendExcepti .build(); } - private void initialize(int cachePercent, final boolean sharedCache, final CacheMode cacheMode) throws BackendException { + private void initialize(Configuration configuration) throws BackendException { + int cachePercent = configuration.get(JVM_CACHE); + boolean sharedCache = configuration.get(SHARED_CACHE); + CacheMode cacheMode = ConfigOption.getEnumValue(configuration.get(CACHE_MODE), CacheMode.class); try { EnvironmentConfig envConfig = new EnvironmentConfig(); envConfig.setAllowCreate(true); @@ -132,7 +134,7 @@ private void initialize(int cachePercent, final boolean sharedCache, final Cache } //Open the environment - environment = new Environment(directory, envConfig); + environment.set(new Environment(directory, envConfig)); } catch (DatabaseException e) { throw new PermanentBackendException("Error during BerkeleyJE initialization: ", e); @@ -152,39 +154,56 @@ public List getLocalKeyPartition() throws BackendException { @Override public BerkeleyJETx beginTransaction(final BaseTransactionConfig txCfg) throws BackendException { - try { - Transaction tx = null; + boolean interrupted = false; + do { + try { + Transaction tx = null; - Configuration effectiveCfg = + Configuration effectiveCfg = new MergedConfiguration(txCfg.getCustomOptions(), getStorageConfig()); - if (transactional) { - TransactionConfig txnConfig = new TransactionConfig(); - ConfigOption.getEnumValue(effectiveCfg.get(ISOLATION_LEVEL), IsolationLevel.class).configure(txnConfig); - tx = environment.beginTransaction(null, txnConfig); - } else { - if (txCfg instanceof TransactionConfiguration) { - if (!((TransactionConfiguration) txCfg).isSingleThreaded()) { - // Non-transactional cursors can't shared between threads, more info ThreadLocker.checkState - throw new PermanentBackendException("BerkeleyJE does not support non-transactional for multi threaded tx"); + if (transactional) { + TransactionConfig txnConfig = new TransactionConfig(); + ConfigOption.getEnumValue(effectiveCfg.get(ISOLATION_LEVEL), IsolationLevel.class).configure(txnConfig); + tx = environment.get().beginTransaction(null, txnConfig); + } else { + if (txCfg instanceof TransactionConfiguration) { + if (!((TransactionConfiguration) txCfg).isSingleThreaded()) { + // Non-transactional cursors can't shared between threads, more info ThreadLocker.checkState + throw new PermanentBackendException("BerkeleyJE does not support non-transactional for multi threaded tx"); + } } } - } - BerkeleyJETx btx = - new BerkeleyJETx( - tx, - ConfigOption.getEnumValue(effectiveCfg.get(LOCK_MODE), LockMode.class), - ConfigOption.getEnumValue(effectiveCfg.get(CACHE_MODE), CacheMode.class), - txCfg); - - if (log.isTraceEnabled()) { - log.trace("Berkeley tx created", new TransactionBegin(btx.toString())); - } + BerkeleyJETx btx = + new BerkeleyJETx( + tx, + ConfigOption.getEnumValue(effectiveCfg.get(LOCK_MODE), LockMode.class), + ConfigOption.getEnumValue(effectiveCfg.get(CACHE_MODE), CacheMode.class), + txCfg); + + if (log.isTraceEnabled()) { + log.trace("Berkeley tx created", new TransactionBegin(btx.toString())); + } - return btx; - } catch (DatabaseException e) { - throw new PermanentBackendException("Could not start BerkeleyJE transaction", e); - } + return btx; + } catch (ThreadInterruptedException e) { + log.error("BerkeleyJE backend is interrupted! Try to recreate environment", e); + environment.get().close(); + initialize(storageConfig); + for (Map.Entry entry : stores.entrySet()) { + final String name = entry.getKey(); + final BerkeleyJEKeyValueStore store = entry.getValue(); + store.reopen(openDb(name)); + } + if (!interrupted) { + interrupted = true; + } else { + throw new PermanentBackendException("Could not start BerkeleyJE transaction", e); + } + } catch (DatabaseException e) { + throw new PermanentBackendException("Could not start BerkeleyJE transaction", e); + } + } while (true); } @Override @@ -194,19 +213,8 @@ public BerkeleyJEKeyValueStore openDatabase(String name) throws BackendException return stores.get(name); } try { - DatabaseConfig dbConfig = new DatabaseConfig(); - dbConfig.setReadOnly(false); - dbConfig.setAllowCreate(true); - dbConfig.setTransactional(transactional); - dbConfig.setKeyPrefixing(true); - - if (batchLoading) { - dbConfig.setDeferredWrite(true); - } - - Database db = environment.openDatabase(null, name, dbConfig); - - log.debug("Opened database {}", name); + Database db = openDb(name); + log.trace("Opened database {}", name); BerkeleyJEKeyValueStore store = new BerkeleyJEKeyValueStore(name, db, this); stores.put(name, store); @@ -216,6 +224,20 @@ public BerkeleyJEKeyValueStore openDatabase(String name) throws BackendException } } + private Database openDb(String name) { + DatabaseConfig dbConfig = new DatabaseConfig(); + dbConfig.setReadOnly(false); + dbConfig.setAllowCreate(true); + dbConfig.setTransactional(transactional); + dbConfig.setKeyPrefixing(true); + + if (batchLoading) { + dbConfig.setDeferredWrite(true); + } + + return environment.get().openDatabase(null, name, dbConfig); + } + @Override public void mutateMany(Map mutations, StoreTransaction txh) throws BackendException { for (Map.Entry mutation : mutations.entrySet()) { @@ -266,7 +288,7 @@ public void close() throws BackendException { //Ignore } try { - environment.close(); + environment.get().close(); } catch (DatabaseException e) { throw new PermanentBackendException("Could not close BerkeleyJE database", e); } @@ -282,8 +304,8 @@ public void clearStorage() throws BackendException { throw new IllegalStateException("Cannot delete store, since database is open: " + stores.keySet()); } - for (final String db : environment.getDatabaseNames()) { - environment.removeDatabase(NULL_TRANSACTION, db); + for (final String db : environment.get().getDatabaseNames()) { + environment.get().removeDatabase(NULL_TRANSACTION, db); log.debug("Removed database {} (clearStorage)", db); } close(); @@ -292,7 +314,7 @@ public void clearStorage() throws BackendException { @Override public boolean exists() throws BackendException { - return !environment.getDatabaseNames().isEmpty(); + return !environment.get().getDatabaseNames().isEmpty(); } @Override @@ -335,4 +357,10 @@ private TransactionBegin(String msg) { super(msg); } } + + static TraversalInterruptedException convertThreadInterruptedException(final ThreadInterruptedException e) { + final TraversalInterruptedException ex = new TraversalInterruptedException(); + ex.initCause(e); + return ex; + } } diff --git a/janusgraph-berkeleyje/src/main/java/org/janusgraph/diskstorage/berkeleyje/BerkeleyJETx.java b/janusgraph-berkeleyje/src/main/java/org/janusgraph/diskstorage/berkeleyje/BerkeleyJETx.java index 43387f6e70..f4ed0bfd26 100644 --- a/janusgraph-berkeleyje/src/main/java/org/janusgraph/diskstorage/berkeleyje/BerkeleyJETx.java +++ b/janusgraph-berkeleyje/src/main/java/org/janusgraph/diskstorage/berkeleyje/BerkeleyJETx.java @@ -20,6 +20,7 @@ import com.sleepycat.je.Database; import com.sleepycat.je.DatabaseException; import com.sleepycat.je.LockMode; +import com.sleepycat.je.ThreadInterruptedException; import com.sleepycat.je.Transaction; import org.janusgraph.diskstorage.BackendException; import org.janusgraph.diskstorage.BaseTransactionConfig; @@ -31,6 +32,8 @@ import java.util.ArrayList; import java.util.List; +import static org.janusgraph.diskstorage.berkeleyje.BerkeleyJEStoreManager.convertThreadInterruptedException; + public class BerkeleyJETx extends AbstractStoreTransaction { private static final Logger log = LoggerFactory.getLogger(BerkeleyJETx.class); @@ -60,16 +63,25 @@ Cursor openCursor(Database db) throws BackendException { if (!isOpen) { throw new PermanentBackendException("Transaction already closed"); } - Cursor cursor = db.openCursor(tx, null); - openCursors.add(cursor); - return cursor; + try { + Cursor cursor = db.openCursor(tx, null); + openCursors.add(cursor); + return cursor; + } catch (ThreadInterruptedException e) { + throw convertThreadInterruptedException(e); + } } } void closeCursor(Cursor cursor) { synchronized (openCursors) { - cursor.close(); - openCursors.remove(cursor); + try { + cursor.close(); + } catch (ThreadInterruptedException e) { + throw convertThreadInterruptedException(e); + } finally { + openCursors.remove(cursor); + } } } @@ -98,6 +110,14 @@ public synchronized void rollback() throws BackendException { closeOpenCursors(); tx.abort(); tx = null; + } catch (ThreadInterruptedException ignored) { + // Ignore for avoid issues when backend is closing + } catch (IllegalStateException e) { + // Ignore for avoid issues when backend is closing + if (!"Database was closed.".equals(e.getMessage()) + && !"Environment is closed.".equals(e.getMessage())) { + throw e; + } } catch (DatabaseException e) { throw new PermanentBackendException(e); } @@ -114,6 +134,8 @@ public synchronized void commit() throws BackendException { closeOpenCursors(); tx.commit(); tx = null; + } catch (ThreadInterruptedException e) { + throw convertThreadInterruptedException(e); } catch (DatabaseException e) { throw new PermanentBackendException(e); } diff --git a/janusgraph-berkeleyje/src/test/java/org/janusgraph/BerkeleyInterruptionTest.java b/janusgraph-berkeleyje/src/test/java/org/janusgraph/BerkeleyInterruptionTest.java new file mode 100644 index 0000000000..d6e42a1486 --- /dev/null +++ b/janusgraph-berkeleyje/src/test/java/org/janusgraph/BerkeleyInterruptionTest.java @@ -0,0 +1,91 @@ +// Copyright 2022 JanusGraph Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package org.janusgraph; + +import org.apache.commons.io.FileUtils; +import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalInterruptedException; +import org.janusgraph.core.JanusGraph; +import org.janusgraph.core.JanusGraphException; +import org.janusgraph.core.JanusGraphFactory; +import org.janusgraph.diskstorage.configuration.ModifiableConfiguration; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.RepeatedTest; + +import java.io.File; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.STORAGE_DIRECTORY; + +@Disabled +public class BerkeleyInterruptionTest { + JanusGraph graph; + + @BeforeEach + void setUp() { + final ModifiableConfiguration config = BerkeleyStorageSetup.getBerkeleyJEConfiguration(); + final String dir = config.get(STORAGE_DIRECTORY); + FileUtils.deleteQuietly(new File(dir)); + graph = JanusGraphFactory.open(config); + } + + @AfterEach + void tearDown() { + graph.close(); + } + + @RepeatedTest(5) + public void test() throws InterruptedException { + for (int i = 0; i < 5000; i++) { + graph.traversal() + .addV("V").property("a", "bb" + i).property("b", "bb" + i) + .addV("V").property("a", "bb" + i).property("b", "bb" + i) + .addV("V").property("a", "bb" + i).property("b", "bb" + i) + .iterate(); + if (i % 10_000 == 0) { + graph.tx().commit(); + } + } + graph.tx().commit(); + + final ExecutorService executorService = Executors.newSingleThreadExecutor(); + final CompletableFuture future = CompletableFuture.runAsync(() -> { + graph.traversal().V() + .elementMap() + .count().next(); + }, executorService); + + Thread.sleep(200); + executorService.shutdownNow(); + + try { + future.get(); + } catch (ExecutionException e) { + Assertions.assertEquals(TraversalInterruptedException.class, e.getCause().getClass(), e.getMessage()); + } + + try { + Assertions.assertEquals(15000, graph.traversal().V().count().next()); + } catch (JanusGraphException e) { + Assertions.fail("bdb should be reopened"); + } + + Assertions.assertEquals(15000, graph.traversal().V().count().next()); + } +} diff --git a/janusgraph-berkeleyje/src/test/java/org/janusgraph/blueprints/process/BerkeleyJanusGraphComputerTest.java b/janusgraph-berkeleyje/src/test/java/org/janusgraph/blueprints/process/BerkeleyJanusGraphComputerTest.java index 5e406cda19..9845ca484f 100644 --- a/janusgraph-berkeleyje/src/test/java/org/janusgraph/blueprints/process/BerkeleyJanusGraphComputerTest.java +++ b/janusgraph-berkeleyje/src/test/java/org/janusgraph/blueprints/process/BerkeleyJanusGraphComputerTest.java @@ -15,6 +15,7 @@ package org.janusgraph.blueprints.process; import org.apache.tinkerpop.gremlin.GraphProviderClass; +import org.apache.tinkerpop.gremlin.process.ProcessComputerSuite; import org.janusgraph.blueprints.BerkeleyGraphComputerProvider; import org.janusgraph.core.JanusGraph; import org.junit.runner.RunWith; @@ -22,7 +23,7 @@ /** * @author Matthias Broecheler (me@matthiasb.com) */ -@RunWith(BerkeleyProcessComputerSuite.class) +@RunWith(ProcessComputerSuite.class) @GraphProviderClass(provider = BerkeleyGraphComputerProvider.class, graph = JanusGraph.class) public class BerkeleyJanusGraphComputerTest { } diff --git a/janusgraph-berkeleyje/src/test/java/org/janusgraph/blueprints/process/BerkeleyJanusGraphProcessTest.java b/janusgraph-berkeleyje/src/test/java/org/janusgraph/blueprints/process/BerkeleyJanusGraphProcessTest.java index 46a05ea15b..0c80a3079a 100644 --- a/janusgraph-berkeleyje/src/test/java/org/janusgraph/blueprints/process/BerkeleyJanusGraphProcessTest.java +++ b/janusgraph-berkeleyje/src/test/java/org/janusgraph/blueprints/process/BerkeleyJanusGraphProcessTest.java @@ -15,6 +15,7 @@ package org.janusgraph.blueprints.process; import org.apache.tinkerpop.gremlin.GraphProviderClass; +import org.apache.tinkerpop.gremlin.process.ProcessStandardSuite; import org.janusgraph.blueprints.BerkeleyGraphProvider; import org.janusgraph.core.JanusGraph; import org.junit.runner.RunWith; @@ -22,7 +23,7 @@ /** * @author Matthias Broecheler (me@matthiasb.com) */ -@RunWith(BerkeleyProcessStandardSuite.class) +@RunWith(ProcessStandardSuite.class) @GraphProviderClass(provider = BerkeleyGraphProvider.class, graph = JanusGraph.class) public class BerkeleyJanusGraphProcessTest { } diff --git a/janusgraph-berkeleyje/src/test/java/org/janusgraph/blueprints/process/BerkeleyMultiQueryJanusGraphProcessTest.java b/janusgraph-berkeleyje/src/test/java/org/janusgraph/blueprints/process/BerkeleyMultiQueryJanusGraphProcessTest.java index b9b8507abe..81f00e9294 100644 --- a/janusgraph-berkeleyje/src/test/java/org/janusgraph/blueprints/process/BerkeleyMultiQueryJanusGraphProcessTest.java +++ b/janusgraph-berkeleyje/src/test/java/org/janusgraph/blueprints/process/BerkeleyMultiQueryJanusGraphProcessTest.java @@ -15,6 +15,7 @@ package org.janusgraph.blueprints.process; import org.apache.tinkerpop.gremlin.GraphProviderClass; +import org.apache.tinkerpop.gremlin.process.ProcessStandardSuite; import org.janusgraph.blueprints.BerkeleyMultiQueryGraphProvider; import org.janusgraph.core.JanusGraph; import org.junit.runner.RunWith; @@ -22,7 +23,7 @@ /** * @author Ted Wilmes (twilmes@gmail.com) */ -@RunWith(BerkeleyProcessStandardSuite.class) +@RunWith(ProcessStandardSuite.class) @GraphProviderClass(provider = BerkeleyMultiQueryGraphProvider.class, graph = JanusGraph.class) public class BerkeleyMultiQueryJanusGraphProcessTest { } diff --git a/janusgraph-berkeleyje/src/test/java/org/janusgraph/blueprints/process/BerkeleyProcessComputerSuite.java b/janusgraph-berkeleyje/src/test/java/org/janusgraph/blueprints/process/BerkeleyProcessComputerSuite.java deleted file mode 100644 index 632481024a..0000000000 --- a/janusgraph-berkeleyje/src/test/java/org/janusgraph/blueprints/process/BerkeleyProcessComputerSuite.java +++ /dev/null @@ -1,44 +0,0 @@ -// Copyright 2019 JanusGraph Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package org.janusgraph.blueprints.process; - -import org.apache.commons.lang3.ArrayUtils; -import org.apache.tinkerpop.gremlin.process.ProcessComputerSuite; -import org.apache.tinkerpop.gremlin.process.traversal.TraversalInterruptionComputerTest; -import org.junit.runners.model.InitializationError; -import org.junit.runners.model.RunnerBuilder; - -import java.lang.reflect.Field; - -/** - * Custom TinkerPop {@link ProcessComputerSuite} that excludes {@link TraversalInterruptionComputerTest} for compatibility with - * BerkeleyDB JE, which does not support thread interrupts. - */ -public class BerkeleyProcessComputerSuite extends ProcessComputerSuite { - - public BerkeleyProcessComputerSuite(final Class classToTest, final RunnerBuilder builder) throws InitializationError { - super(classToTest, builder, getTestList()); - } - - private static Class[] getTestList() throws InitializationError { - try { - final Field field = ProcessComputerSuite.class.getDeclaredField("allTests"); - field.setAccessible(true); - return (Class[]) ArrayUtils.removeElement((Class[]) field.get(null), TraversalInterruptionComputerTest.class); - } catch (ReflectiveOperationException e) { - throw new InitializationError("Unable to create test list"); - } - } -} diff --git a/janusgraph-berkeleyje/src/test/java/org/janusgraph/blueprints/process/BerkeleyProcessStandardSuite.java b/janusgraph-berkeleyje/src/test/java/org/janusgraph/blueprints/process/BerkeleyProcessStandardSuite.java deleted file mode 100644 index 554887de5e..0000000000 --- a/janusgraph-berkeleyje/src/test/java/org/janusgraph/blueprints/process/BerkeleyProcessStandardSuite.java +++ /dev/null @@ -1,44 +0,0 @@ -// Copyright 2017 JanusGraph Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package org.janusgraph.blueprints.process; - -import org.apache.commons.lang3.ArrayUtils; -import org.apache.tinkerpop.gremlin.process.ProcessStandardSuite; -import org.apache.tinkerpop.gremlin.process.traversal.TraversalInterruptionTest; -import org.junit.runners.model.InitializationError; -import org.junit.runners.model.RunnerBuilder; - -import java.lang.reflect.Field; - -/** - * Custom TinkerPop {@link ProcessStandardSuite} that excludes {@link TraversalInterruptionTest} for compatibility with - * BerkeleyDB JE, which does not support thread interrupts. - */ -public class BerkeleyProcessStandardSuite extends ProcessStandardSuite { - - public BerkeleyProcessStandardSuite(final Class classToTest, final RunnerBuilder builder) throws InitializationError { - super(classToTest, builder, getTestList()); - } - - private static Class[] getTestList() throws InitializationError { - try { - final Field field = ProcessStandardSuite.class.getDeclaredField("allTests"); - field.setAccessible(true); - return (Class[]) ArrayUtils.removeElement((Class[]) field.get(null), TraversalInterruptionTest.class); - } catch (ReflectiveOperationException e) { - throw new InitializationError("Unable to create test list"); - } - } -} diff --git a/janusgraph-core/src/main/java/org/janusgraph/diskstorage/util/BackendOperation.java b/janusgraph-core/src/main/java/org/janusgraph/diskstorage/util/BackendOperation.java index abc8cb4e39..8fbd9bcf39 100644 --- a/janusgraph-core/src/main/java/org/janusgraph/diskstorage/util/BackendOperation.java +++ b/janusgraph-core/src/main/java/org/janusgraph/diskstorage/util/BackendOperation.java @@ -21,6 +21,7 @@ import org.janusgraph.diskstorage.TemporaryBackendException; import org.janusgraph.diskstorage.keycolumnvalue.StoreTransaction; import org.janusgraph.diskstorage.util.time.TimestampProvider; +import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalInterruptedException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -75,6 +76,8 @@ public static V executeDirect(Callable exe, Duration totalWaitTime) throw lastException = storeEx; } else if (e instanceof BackendException) { throw (BackendException)e; + } else if (e instanceof TraversalInterruptedException) { + throw (TraversalInterruptedException)e; } else { throw new PermanentBackendException("Permanent exception while executing backend operation "+exe.toString(),e); }