Skip to content

Commit

Permalink
Revert "Handle interruption for BerkeleyJE backend [tp-tests]"
Browse files Browse the repository at this point in the history
This reverts commit cdea0d7.
  • Loading branch information
li-boxuan authored Oct 6, 2023
1 parent cdea0d7 commit 20f4c94
Show file tree
Hide file tree
Showing 10 changed files with 164 additions and 250 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import com.sleepycat.je.OperationStatus;
import com.sleepycat.je.Put;
import com.sleepycat.je.ReadOptions;
import com.sleepycat.je.ThreadInterruptedException;
import com.sleepycat.je.Transaction;
import com.sleepycat.je.WriteOptions;
import org.janusgraph.diskstorage.BackendException;
Expand All @@ -49,11 +48,8 @@
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;

import static org.janusgraph.diskstorage.berkeleyje.BerkeleyJEStoreManager.convertThreadInterruptedException;

public class BerkeleyJEKeyValueStore implements OrderedKeyValueStore {

private static final Logger log = LoggerFactory.getLogger(BerkeleyJEKeyValueStore.class);
Expand All @@ -64,21 +60,21 @@ public class BerkeleyJEKeyValueStore implements OrderedKeyValueStore {
public static Function<Integer, Integer> ttlConverter = ttl -> (int) Math.max(1, Duration.of(ttl, ChronoUnit.SECONDS).toHours());


private final AtomicReference<Database> db = new AtomicReference<>();
private final Database db;
private final String name;
private final BerkeleyJEStoreManager manager;
private boolean isOpen;

public BerkeleyJEKeyValueStore(String n, Database data, BerkeleyJEStoreManager m) {
db.set(data);
db = data;
name = n;
manager = m;
isOpen = true;
}

public DatabaseConfig getConfiguration() throws BackendException {
try {
return db.get().getConfig();
return db.getConfig();
} catch (DatabaseException e) {
throw new PermanentBackendException(e);
}
Expand All @@ -96,24 +92,18 @@ private static Transaction getTransaction(StoreTransaction txh) {

private Cursor openCursor(StoreTransaction txh) throws BackendException {
Preconditions.checkArgument(txh!=null);
return ((BerkeleyJETx) txh).openCursor(db.get());
return ((BerkeleyJETx) txh).openCursor(db);
}

private static void closeCursor(StoreTransaction txh, Cursor cursor) {
Preconditions.checkArgument(txh!=null);
((BerkeleyJETx) txh).closeCursor(cursor);
}

void reopen(final Database db) {
this.db.set(db);
}

@Override
public synchronized void close() throws BackendException {
try {
if (isOpen) db.get().close();
} catch (ThreadInterruptedException ignored) {
// environment will be closed
if(isOpen) db.close();
} catch (DatabaseException e) {
throw new PermanentBackendException(e);
}
Expand All @@ -130,7 +120,7 @@ public StaticBuffer get(StaticBuffer key, StoreTransaction txh) throws BackendEx

log.trace("db={}, op=get, tx={}", name, txh);

OperationResult result = db.get().get(tx, databaseKey, data, Get.SEARCH, getReadOptions(txh));
OperationResult result = db.get(tx, databaseKey, data, Get.SEARCH, getReadOptions(txh));

if (result != null) {
return getBuffer(data);
Expand Down Expand Up @@ -163,7 +153,6 @@ public RecordIterator<KeyValueEntry> getSlice(KVQuery query, StoreTransaction tx
final DatabaseEntry foundKey = keyStart.as(ENTRY_FACTORY);
final DatabaseEntry foundData = new DatabaseEntry();
final Cursor cursor = openCursor(txh);
final ReadOptions readOptions = getReadOptions(txh);

return new RecordIterator<KeyValueEntry>() {
private OperationStatus status;
Expand Down Expand Up @@ -193,9 +182,9 @@ private KeyValueEntry getNextEntry() {
}
while (!selector.reachedLimit()) {
if (status == null) {
status = get(Get.SEARCH_GTE, readOptions);
status = cursor.get(foundKey, foundData, Get.SEARCH_GTE, getReadOptions(txh)) == null ? OperationStatus.NOTFOUND : OperationStatus.SUCCESS;
} else {
status = get(Get.NEXT, readOptions);
status = cursor.get(foundKey, foundData, Get.NEXT, getReadOptions(txh)) == null ? OperationStatus.NOTFOUND : OperationStatus.SUCCESS;
}
if (status != OperationStatus.SUCCESS) {
break;
Expand All @@ -214,16 +203,6 @@ private KeyValueEntry getNextEntry() {
return null;
}

private OperationStatus get(Get get, ReadOptions readOptions) {
try {
return cursor.get(foundKey, foundData, get, readOptions) == null
? OperationStatus.NOTFOUND
: OperationStatus.SUCCESS;
} catch (ThreadInterruptedException e) {
throw convertThreadInterruptedException(e);
}
}

@Override
public void close() {
closeCursor(txh, cursor);
Expand Down Expand Up @@ -258,17 +237,13 @@ public void insert(StaticBuffer key, StaticBuffer value, StoreTransaction txh, b
int convertedTtl = ttlConverter.apply(ttl);
writeOptions.setTTL(convertedTtl, TimeUnit.HOURS);
}
try {
if (allowOverwrite) {
OperationResult result = db.get().put(tx, key.as(ENTRY_FACTORY), value.as(ENTRY_FACTORY), Put.OVERWRITE, writeOptions);
EnvironmentFailureException.assertState(result != null);
status = OperationStatus.SUCCESS;
} else {
OperationResult result = db.get().put(tx, key.as(ENTRY_FACTORY), value.as(ENTRY_FACTORY), Put.NO_OVERWRITE, writeOptions);
status = result == null ? OperationStatus.KEYEXIST : OperationStatus.SUCCESS;
}
} catch (ThreadInterruptedException e) {
throw convertThreadInterruptedException(e);
if (allowOverwrite) {
OperationResult result = db.put(tx, key.as(ENTRY_FACTORY), value.as(ENTRY_FACTORY), Put.OVERWRITE, writeOptions);
EnvironmentFailureException.assertState(result != null);
status = OperationStatus.SUCCESS;
} else {
OperationResult result = db.put(tx, key.as(ENTRY_FACTORY), value.as(ENTRY_FACTORY), Put.NO_OVERWRITE, writeOptions);
status = result == null ? OperationStatus.KEYEXIST : OperationStatus.SUCCESS;
}

if (status != OperationStatus.SUCCESS) {
Expand All @@ -282,12 +257,10 @@ public void delete(StaticBuffer key, StoreTransaction txh) throws BackendExcepti
Transaction tx = getTransaction(txh);
try {
log.trace("db={}, op=delete, tx={}", name, txh);
OperationStatus status = db.get().delete(tx, key.as(ENTRY_FACTORY));
OperationStatus status = db.delete(tx, key.as(ENTRY_FACTORY));
if (status != OperationStatus.SUCCESS && status != OperationStatus.NOTFOUND) {
throw new PermanentBackendException("Could not remove: " + status);
}
} catch (ThreadInterruptedException e) {
throw convertThreadInterruptedException(e);
} catch (DatabaseException e) {
throw new PermanentBackendException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

package org.janusgraph.diskstorage.berkeleyje;


import com.google.common.base.Preconditions;
import com.sleepycat.je.CacheMode;
import com.sleepycat.je.Database;
Expand All @@ -22,10 +23,8 @@
import com.sleepycat.je.Environment;
import com.sleepycat.je.EnvironmentConfig;
import com.sleepycat.je.LockMode;
import com.sleepycat.je.ThreadInterruptedException;
import com.sleepycat.je.Transaction;
import com.sleepycat.je.TransactionConfig;
import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalInterruptedException;
import org.janusgraph.diskstorage.BackendException;
import org.janusgraph.diskstorage.BaseTransactionConfig;
import org.janusgraph.diskstorage.PermanentBackendException;
Expand All @@ -52,7 +51,6 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;

import static org.janusgraph.diskstorage.configuration.ConfigOption.disallowEmpty;

Expand Down Expand Up @@ -92,14 +90,17 @@ public class BerkeleyJEStoreManager extends LocalStoreManager implements Ordered

private final Map<String, BerkeleyJEKeyValueStore> stores;

protected AtomicReference<Environment> environment = new AtomicReference<>();
protected Environment environment;
protected final StoreFeatures features;

public BerkeleyJEStoreManager(Configuration configuration) throws BackendException {
super(configuration);
stores = new HashMap<>();

initialize(configuration);
int cachePercentage = configuration.get(JVM_CACHE);
boolean sharedCache = configuration.get(SHARED_CACHE);
CacheMode cacheMode = ConfigOption.getEnumValue(configuration.get(CACHE_MODE), CacheMode.class);
initialize(cachePercentage, sharedCache, cacheMode);

features = new StandardStoreFeatures.Builder()
.orderedScan(true)
Expand All @@ -116,10 +117,7 @@ public BerkeleyJEStoreManager(Configuration configuration) throws BackendExcepti
.build();
}

private void initialize(Configuration configuration) throws BackendException {
int cachePercent = configuration.get(JVM_CACHE);
boolean sharedCache = configuration.get(SHARED_CACHE);
CacheMode cacheMode = ConfigOption.getEnumValue(configuration.get(CACHE_MODE), CacheMode.class);
private void initialize(int cachePercent, final boolean sharedCache, final CacheMode cacheMode) throws BackendException {
try {
EnvironmentConfig envConfig = new EnvironmentConfig();
envConfig.setAllowCreate(true);
Expand All @@ -134,7 +132,7 @@ private void initialize(Configuration configuration) throws BackendException {
}

//Open the environment
environment.set(new Environment(directory, envConfig));
environment = new Environment(directory, envConfig);

} catch (DatabaseException e) {
throw new PermanentBackendException("Error during BerkeleyJE initialization: ", e);
Expand All @@ -154,56 +152,39 @@ public List<KeyRange> getLocalKeyPartition() throws BackendException {

@Override
public BerkeleyJETx beginTransaction(final BaseTransactionConfig txCfg) throws BackendException {
boolean interrupted = false;
do {
try {
Transaction tx = null;
try {
Transaction tx = null;

Configuration effectiveCfg =
Configuration effectiveCfg =
new MergedConfiguration(txCfg.getCustomOptions(), getStorageConfig());

if (transactional) {
TransactionConfig txnConfig = new TransactionConfig();
ConfigOption.getEnumValue(effectiveCfg.get(ISOLATION_LEVEL), IsolationLevel.class).configure(txnConfig);
tx = environment.get().beginTransaction(null, txnConfig);
} else {
if (txCfg instanceof TransactionConfiguration) {
if (!((TransactionConfiguration) txCfg).isSingleThreaded()) {
// Non-transactional cursors can't shared between threads, more info ThreadLocker.checkState
throw new PermanentBackendException("BerkeleyJE does not support non-transactional for multi threaded tx");
}
if (transactional) {
TransactionConfig txnConfig = new TransactionConfig();
ConfigOption.getEnumValue(effectiveCfg.get(ISOLATION_LEVEL), IsolationLevel.class).configure(txnConfig);
tx = environment.beginTransaction(null, txnConfig);
} else {
if (txCfg instanceof TransactionConfiguration) {
if (!((TransactionConfiguration) txCfg).isSingleThreaded()) {
// Non-transactional cursors can't shared between threads, more info ThreadLocker.checkState
throw new PermanentBackendException("BerkeleyJE does not support non-transactional for multi threaded tx");
}
}
BerkeleyJETx btx =
new BerkeleyJETx(
tx,
ConfigOption.getEnumValue(effectiveCfg.get(LOCK_MODE), LockMode.class),
ConfigOption.getEnumValue(effectiveCfg.get(CACHE_MODE), CacheMode.class),
txCfg);

if (log.isTraceEnabled()) {
log.trace("Berkeley tx created", new TransactionBegin(btx.toString()));
}

return btx;
} catch (ThreadInterruptedException e) {
log.error("BerkeleyJE backend is interrupted! Try to recreate environment", e);
environment.get().close();
initialize(storageConfig);
for (Map.Entry<String, BerkeleyJEKeyValueStore> entry : stores.entrySet()) {
final String name = entry.getKey();
final BerkeleyJEKeyValueStore store = entry.getValue();
store.reopen(openDb(name));
}
if (!interrupted) {
interrupted = true;
} else {
throw new PermanentBackendException("Could not start BerkeleyJE transaction", e);
}
} catch (DatabaseException e) {
throw new PermanentBackendException("Could not start BerkeleyJE transaction", e);
}
} while (true);
BerkeleyJETx btx =
new BerkeleyJETx(
tx,
ConfigOption.getEnumValue(effectiveCfg.get(LOCK_MODE), LockMode.class),
ConfigOption.getEnumValue(effectiveCfg.get(CACHE_MODE), CacheMode.class),
txCfg);

if (log.isTraceEnabled()) {
log.trace("Berkeley tx created", new TransactionBegin(btx.toString()));
}

return btx;
} catch (DatabaseException e) {
throw new PermanentBackendException("Could not start BerkeleyJE transaction", e);
}
}

@Override
Expand All @@ -213,8 +194,19 @@ public BerkeleyJEKeyValueStore openDatabase(String name) throws BackendException
return stores.get(name);
}
try {
Database db = openDb(name);
log.trace("Opened database {}", name);
DatabaseConfig dbConfig = new DatabaseConfig();
dbConfig.setReadOnly(false);
dbConfig.setAllowCreate(true);
dbConfig.setTransactional(transactional);
dbConfig.setKeyPrefixing(true);

if (batchLoading) {
dbConfig.setDeferredWrite(true);
}

Database db = environment.openDatabase(null, name, dbConfig);

log.debug("Opened database {}", name);

BerkeleyJEKeyValueStore store = new BerkeleyJEKeyValueStore(name, db, this);
stores.put(name, store);
Expand All @@ -224,20 +216,6 @@ public BerkeleyJEKeyValueStore openDatabase(String name) throws BackendException
}
}

private Database openDb(String name) {
DatabaseConfig dbConfig = new DatabaseConfig();
dbConfig.setReadOnly(false);
dbConfig.setAllowCreate(true);
dbConfig.setTransactional(transactional);
dbConfig.setKeyPrefixing(true);

if (batchLoading) {
dbConfig.setDeferredWrite(true);
}

return environment.get().openDatabase(null, name, dbConfig);
}

@Override
public void mutateMany(Map<String, KVMutation> mutations, StoreTransaction txh) throws BackendException {
for (Map.Entry<String,KVMutation> mutation : mutations.entrySet()) {
Expand Down Expand Up @@ -288,7 +266,7 @@ public void close() throws BackendException {
//Ignore
}
try {
environment.get().close();
environment.close();
} catch (DatabaseException e) {
throw new PermanentBackendException("Could not close BerkeleyJE database", e);
}
Expand All @@ -304,8 +282,8 @@ public void clearStorage() throws BackendException {
throw new IllegalStateException("Cannot delete store, since database is open: " + stores.keySet());
}

for (final String db : environment.get().getDatabaseNames()) {
environment.get().removeDatabase(NULL_TRANSACTION, db);
for (final String db : environment.getDatabaseNames()) {
environment.removeDatabase(NULL_TRANSACTION, db);
log.debug("Removed database {} (clearStorage)", db);
}
close();
Expand All @@ -314,7 +292,7 @@ public void clearStorage() throws BackendException {

@Override
public boolean exists() throws BackendException {
return !environment.get().getDatabaseNames().isEmpty();
return !environment.getDatabaseNames().isEmpty();
}

@Override
Expand Down Expand Up @@ -357,10 +335,4 @@ private TransactionBegin(String msg) {
super(msg);
}
}

static TraversalInterruptedException convertThreadInterruptedException(final ThreadInterruptedException e) {
final TraversalInterruptedException ex = new TraversalInterruptedException();
ex.initCause(e);
return ex;
}
}
Loading

1 comment on commit 20f4c94

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Benchmark

Benchmark suite Current: 20f4c94 Previous: 68f49a1 Ratio
org.janusgraph.JanusGraphSpeedBenchmark.basicAddAndDelete 14934.679793904561 ms/op 14750.912757292574 ms/op 1.01
org.janusgraph.GraphCentricQueryBenchmark.getVertices 1428.1873661281045 ms/op 1346.1948471823591 ms/op 1.06
org.janusgraph.MgmtOlapJobBenchmark.runClearIndex 222.93320469565214 ms/op 221.0470166869565 ms/op 1.01
org.janusgraph.MgmtOlapJobBenchmark.runReindex 470.4419414733334 ms/op 463.2053476121212 ms/op 1.02
org.janusgraph.JanusGraphSpeedBenchmark.basicCount 497.88158612277726 ms/op 473.90261975756744 ms/op 1.05
org.janusgraph.CQLMultiQueryMultiSlicesBenchmark.getValuesAllPropertiesWithAllMultiQuerySlicesUnderMaxRequestsPerConnection 9057.625873561452 ms/op 8776.908210320456 ms/op 1.03
org.janusgraph.CQLMultiQueryBenchmark.getElementsWithUsingEmitRepeatSteps 36780.610213832384 ms/op 29926.15626700794 ms/op 1.23
org.janusgraph.CQLMultiQueryMultiSlicesBenchmark.getValuesMultiplePropertiesWithSmallBatch 35036.15464053095 ms/op 32526.844515654997 ms/op 1.08
org.janusgraph.CQLMultiQueryMultiSlicesBenchmark.vertexCentricPropertiesFetching 70731.1898656 ms/op 59550.41166966667 ms/op 1.19
org.janusgraph.CQLMultiQueryBenchmark.getAllElementsTraversedFromOuterVertex 18496.81956011055 ms/op 15006.601503888596 ms/op 1.23
org.janusgraph.CQLMultiQueryBenchmark.getVerticesWithDoubleUnion 618.4323577149672 ms/op 610.4456304918209 ms/op 1.01
org.janusgraph.CQLMultiQueryMultiSlicesBenchmark.getValuesAllPropertiesWithUnlimitedBatch 8985.753287520201 ms/op 8351.21971247489 ms/op 1.08
org.janusgraph.CQLMultiQueryBenchmark.getNames 16996.07324085399 ms/op 14737.422644903432 ms/op 1.15
org.janusgraph.CQLMultiQueryMultiSlicesBenchmark.getValuesThreePropertiesWithAllMultiQuerySlicesUnderMaxRequestsPerConnection 12150.044089459476 ms/op 10903.708326936881 ms/op 1.11
org.janusgraph.CQLMultiQueryBenchmark.getLabels 15119.950477315078 ms/op 13305.68422999396 ms/op 1.14
org.janusgraph.CQLMultiQueryBenchmark.getVerticesFilteredByAndStep 658.3156636333395 ms/op 672.9530191288869 ms/op 0.98
org.janusgraph.CQLMultiQueryBenchmark.getVerticesFromMultiNestedRepeatStepStartingFromSingleVertex 23132.486862596827 ms/op 21151.69997790227 ms/op 1.09
org.janusgraph.CQLMultiQueryBenchmark.getVerticesWithCoalesceUsage 599.3944353408849 ms/op 566.2220666199714 ms/op 1.06
org.janusgraph.CQLMultiQueryMultiSlicesBenchmark.getValuesMultiplePropertiesWithAllMultiQuerySlicesUnderMaxRequestsPerConnection 30735.764473079522 ms/op 25822.55861829052 ms/op 1.19
org.janusgraph.CQLMultiQueryBenchmark.getIdToOutVerticesProjection 405.6192508098821 ms/op 411.4887692411499 ms/op 0.99
org.janusgraph.CQLMultiQueryMultiSlicesBenchmark.getValuesMultiplePropertiesWithUnlimitedBatch 36103.92531241318 ms/op 29794.881563733332 ms/op 1.21
org.janusgraph.CQLMultiQueryBenchmark.getNeighborNames 16766.190522448793 ms/op 14451.119747276667 ms/op 1.16
org.janusgraph.CQLMultiQueryBenchmark.getElementsWithUsingRepeatUntilSteps 18889.653544606255 ms/op 16086.213028835715 ms/op 1.17
org.janusgraph.CQLMultiQueryBenchmark.getAdjacentVerticesLocalCounts 18242.807930044797 ms/op 14981.480702516508 ms/op 1.22

This comment was automatically generated by workflow using github-action-benchmark.

Please sign in to comment.