Skip to content

Commit

Permalink
Handle interruption for BerkeleyJE backend [tp-tests]
Browse files Browse the repository at this point in the history
Fixes #2120

Signed-off-by: Pavel Ershov <owner.mad.epa@gmail.com>
  • Loading branch information
mad committed Sep 18, 2023
1 parent 6681799 commit 088e37d
Show file tree
Hide file tree
Showing 10 changed files with 250 additions and 164 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.sleepycat.je.OperationStatus;
import com.sleepycat.je.Put;
import com.sleepycat.je.ReadOptions;
import com.sleepycat.je.ThreadInterruptedException;
import com.sleepycat.je.Transaction;
import com.sleepycat.je.WriteOptions;
import org.janusgraph.diskstorage.BackendException;
Expand All @@ -48,8 +49,11 @@
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;

import static org.janusgraph.diskstorage.berkeleyje.BerkeleyJEStoreManager.convertThreadInterruptedException;

public class BerkeleyJEKeyValueStore implements OrderedKeyValueStore {

private static final Logger log = LoggerFactory.getLogger(BerkeleyJEKeyValueStore.class);
Expand All @@ -60,21 +64,21 @@ public class BerkeleyJEKeyValueStore implements OrderedKeyValueStore {
public static Function<Integer, Integer> ttlConverter = ttl -> (int) Math.max(1, Duration.of(ttl, ChronoUnit.SECONDS).toHours());


private final Database db;
private final AtomicReference<Database> db = new AtomicReference<>();
private final String name;
private final BerkeleyJEStoreManager manager;
private boolean isOpen;

public BerkeleyJEKeyValueStore(String n, Database data, BerkeleyJEStoreManager m) {
db = data;
db.set(data);
name = n;
manager = m;
isOpen = true;
}

public DatabaseConfig getConfiguration() throws BackendException {
try {
return db.getConfig();
return db.get().getConfig();
} catch (DatabaseException e) {
throw new PermanentBackendException(e);
}
Expand All @@ -92,18 +96,24 @@ private static Transaction getTransaction(StoreTransaction txh) {

private Cursor openCursor(StoreTransaction txh) throws BackendException {
Preconditions.checkArgument(txh!=null);
return ((BerkeleyJETx) txh).openCursor(db);
return ((BerkeleyJETx) txh).openCursor(db.get());
}

private static void closeCursor(StoreTransaction txh, Cursor cursor) {
Preconditions.checkArgument(txh!=null);
((BerkeleyJETx) txh).closeCursor(cursor);
}

void reopen(final Database db) {
this.db.set(db);
}

@Override
public synchronized void close() throws BackendException {
try {
if(isOpen) db.close();
if (isOpen) db.get().close();
} catch (ThreadInterruptedException ignored) {
// environment will be closed
} catch (DatabaseException e) {
throw new PermanentBackendException(e);
}
Expand All @@ -120,7 +130,7 @@ public StaticBuffer get(StaticBuffer key, StoreTransaction txh) throws BackendEx

log.trace("db={}, op=get, tx={}", name, txh);

OperationResult result = db.get(tx, databaseKey, data, Get.SEARCH, getReadOptions(txh));
OperationResult result = db.get().get(tx, databaseKey, data, Get.SEARCH, getReadOptions(txh));

if (result != null) {
return getBuffer(data);
Expand Down Expand Up @@ -153,6 +163,7 @@ public RecordIterator<KeyValueEntry> getSlice(KVQuery query, StoreTransaction tx
final DatabaseEntry foundKey = keyStart.as(ENTRY_FACTORY);
final DatabaseEntry foundData = new DatabaseEntry();
final Cursor cursor = openCursor(txh);
final ReadOptions readOptions = getReadOptions(txh);

return new RecordIterator<KeyValueEntry>() {
private OperationStatus status;
Expand Down Expand Up @@ -182,9 +193,9 @@ private KeyValueEntry getNextEntry() {
}
while (!selector.reachedLimit()) {
if (status == null) {
status = cursor.get(foundKey, foundData, Get.SEARCH_GTE, getReadOptions(txh)) == null ? OperationStatus.NOTFOUND : OperationStatus.SUCCESS;
status = get(Get.SEARCH_GTE, readOptions);
} else {
status = cursor.get(foundKey, foundData, Get.NEXT, getReadOptions(txh)) == null ? OperationStatus.NOTFOUND : OperationStatus.SUCCESS;
status = get(Get.NEXT, readOptions);
}
if (status != OperationStatus.SUCCESS) {
break;
Expand All @@ -203,6 +214,16 @@ private KeyValueEntry getNextEntry() {
return null;
}

private OperationStatus get(Get get, ReadOptions readOptions) {
try {
return cursor.get(foundKey, foundData, get, readOptions) == null
? OperationStatus.NOTFOUND
: OperationStatus.SUCCESS;
} catch (ThreadInterruptedException e) {
throw convertThreadInterruptedException(e);
}
}

@Override
public void close() {
closeCursor(txh, cursor);
Expand Down Expand Up @@ -237,13 +258,17 @@ public void insert(StaticBuffer key, StaticBuffer value, StoreTransaction txh, b
int convertedTtl = ttlConverter.apply(ttl);
writeOptions.setTTL(convertedTtl, TimeUnit.HOURS);
}
if (allowOverwrite) {
OperationResult result = db.put(tx, key.as(ENTRY_FACTORY), value.as(ENTRY_FACTORY), Put.OVERWRITE, writeOptions);
EnvironmentFailureException.assertState(result != null);
status = OperationStatus.SUCCESS;
} else {
OperationResult result = db.put(tx, key.as(ENTRY_FACTORY), value.as(ENTRY_FACTORY), Put.NO_OVERWRITE, writeOptions);
status = result == null ? OperationStatus.KEYEXIST : OperationStatus.SUCCESS;
try {
if (allowOverwrite) {
OperationResult result = db.get().put(tx, key.as(ENTRY_FACTORY), value.as(ENTRY_FACTORY), Put.OVERWRITE, writeOptions);
EnvironmentFailureException.assertState(result != null);
status = OperationStatus.SUCCESS;
} else {
OperationResult result = db.get().put(tx, key.as(ENTRY_FACTORY), value.as(ENTRY_FACTORY), Put.NO_OVERWRITE, writeOptions);
status = result == null ? OperationStatus.KEYEXIST : OperationStatus.SUCCESS;
}
} catch (ThreadInterruptedException e) {
throw convertThreadInterruptedException(e);
}

if (status != OperationStatus.SUCCESS) {
Expand All @@ -257,10 +282,12 @@ public void delete(StaticBuffer key, StoreTransaction txh) throws BackendExcepti
Transaction tx = getTransaction(txh);
try {
log.trace("db={}, op=delete, tx={}", name, txh);
OperationStatus status = db.delete(tx, key.as(ENTRY_FACTORY));
OperationStatus status = db.get().delete(tx, key.as(ENTRY_FACTORY));
if (status != OperationStatus.SUCCESS && status != OperationStatus.NOTFOUND) {
throw new PermanentBackendException("Could not remove: " + status);
}
} catch (ThreadInterruptedException e) {
throw convertThreadInterruptedException(e);
} catch (DatabaseException e) {
throw new PermanentBackendException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

package org.janusgraph.diskstorage.berkeleyje;


import com.google.common.base.Preconditions;
import com.sleepycat.je.CacheMode;
import com.sleepycat.je.Database;
Expand All @@ -23,8 +22,10 @@
import com.sleepycat.je.Environment;
import com.sleepycat.je.EnvironmentConfig;
import com.sleepycat.je.LockMode;
import com.sleepycat.je.ThreadInterruptedException;
import com.sleepycat.je.Transaction;
import com.sleepycat.je.TransactionConfig;
import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalInterruptedException;
import org.janusgraph.diskstorage.BackendException;
import org.janusgraph.diskstorage.BaseTransactionConfig;
import org.janusgraph.diskstorage.PermanentBackendException;
Expand All @@ -51,6 +52,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;

import static org.janusgraph.diskstorage.configuration.ConfigOption.disallowEmpty;

Expand Down Expand Up @@ -90,17 +92,14 @@ public class BerkeleyJEStoreManager extends LocalStoreManager implements Ordered

private final Map<String, BerkeleyJEKeyValueStore> stores;

protected Environment environment;
protected AtomicReference<Environment> environment = new AtomicReference<>();
protected final StoreFeatures features;

public BerkeleyJEStoreManager(Configuration configuration) throws BackendException {
super(configuration);
stores = new HashMap<>();

int cachePercentage = configuration.get(JVM_CACHE);
boolean sharedCache = configuration.get(SHARED_CACHE);
CacheMode cacheMode = ConfigOption.getEnumValue(configuration.get(CACHE_MODE), CacheMode.class);
initialize(cachePercentage, sharedCache, cacheMode);
initialize(configuration);

features = new StandardStoreFeatures.Builder()
.orderedScan(true)
Expand All @@ -117,7 +116,10 @@ public BerkeleyJEStoreManager(Configuration configuration) throws BackendExcepti
.build();
}

private void initialize(int cachePercent, final boolean sharedCache, final CacheMode cacheMode) throws BackendException {
private void initialize(Configuration configuration) throws BackendException {
int cachePercent = configuration.get(JVM_CACHE);
boolean sharedCache = configuration.get(SHARED_CACHE);
CacheMode cacheMode = ConfigOption.getEnumValue(configuration.get(CACHE_MODE), CacheMode.class);
try {
EnvironmentConfig envConfig = new EnvironmentConfig();
envConfig.setAllowCreate(true);
Expand All @@ -132,7 +134,7 @@ private void initialize(int cachePercent, final boolean sharedCache, final Cache
}

//Open the environment
environment = new Environment(directory, envConfig);
environment.set(new Environment(directory, envConfig));

} catch (DatabaseException e) {
throw new PermanentBackendException("Error during BerkeleyJE initialization: ", e);
Expand All @@ -152,39 +154,56 @@ public List<KeyRange> getLocalKeyPartition() throws BackendException {

@Override
public BerkeleyJETx beginTransaction(final BaseTransactionConfig txCfg) throws BackendException {
try {
Transaction tx = null;
boolean interrupted = false;
do {
try {
Transaction tx = null;

Configuration effectiveCfg =
Configuration effectiveCfg =
new MergedConfiguration(txCfg.getCustomOptions(), getStorageConfig());

if (transactional) {
TransactionConfig txnConfig = new TransactionConfig();
ConfigOption.getEnumValue(effectiveCfg.get(ISOLATION_LEVEL), IsolationLevel.class).configure(txnConfig);
tx = environment.beginTransaction(null, txnConfig);
} else {
if (txCfg instanceof TransactionConfiguration) {
if (!((TransactionConfiguration) txCfg).isSingleThreaded()) {
// Non-transactional cursors can't shared between threads, more info ThreadLocker.checkState
throw new PermanentBackendException("BerkeleyJE does not support non-transactional for multi threaded tx");
if (transactional) {
TransactionConfig txnConfig = new TransactionConfig();
ConfigOption.getEnumValue(effectiveCfg.get(ISOLATION_LEVEL), IsolationLevel.class).configure(txnConfig);
tx = environment.get().beginTransaction(null, txnConfig);
} else {
if (txCfg instanceof TransactionConfiguration) {
if (!((TransactionConfiguration) txCfg).isSingleThreaded()) {
// Non-transactional cursors can't shared between threads, more info ThreadLocker.checkState
throw new PermanentBackendException("BerkeleyJE does not support non-transactional for multi threaded tx");
}
}
}
}
BerkeleyJETx btx =
new BerkeleyJETx(
tx,
ConfigOption.getEnumValue(effectiveCfg.get(LOCK_MODE), LockMode.class),
ConfigOption.getEnumValue(effectiveCfg.get(CACHE_MODE), CacheMode.class),
txCfg);

if (log.isTraceEnabled()) {
log.trace("Berkeley tx created", new TransactionBegin(btx.toString()));
}
BerkeleyJETx btx =
new BerkeleyJETx(
tx,
ConfigOption.getEnumValue(effectiveCfg.get(LOCK_MODE), LockMode.class),
ConfigOption.getEnumValue(effectiveCfg.get(CACHE_MODE), CacheMode.class),
txCfg);

if (log.isTraceEnabled()) {
log.trace("Berkeley tx created", new TransactionBegin(btx.toString()));
}

return btx;
} catch (DatabaseException e) {
throw new PermanentBackendException("Could not start BerkeleyJE transaction", e);
}
return btx;
} catch (ThreadInterruptedException e) {
log.error("BerkeleyJE backend is interrupted! Try to recreate environment", e);
environment.get().close();
initialize(storageConfig);
for (Map.Entry<String, BerkeleyJEKeyValueStore> entry : stores.entrySet()) {
final String name = entry.getKey();
final BerkeleyJEKeyValueStore store = entry.getValue();
store.reopen(openDb(name));
}
if (!interrupted) {
interrupted = true;
} else {
throw new PermanentBackendException("Could not start BerkeleyJE transaction", e);
}
} catch (DatabaseException e) {
throw new PermanentBackendException("Could not start BerkeleyJE transaction", e);
}
} while (true);
}

@Override
Expand All @@ -194,19 +213,8 @@ public BerkeleyJEKeyValueStore openDatabase(String name) throws BackendException
return stores.get(name);
}
try {
DatabaseConfig dbConfig = new DatabaseConfig();
dbConfig.setReadOnly(false);
dbConfig.setAllowCreate(true);
dbConfig.setTransactional(transactional);
dbConfig.setKeyPrefixing(true);

if (batchLoading) {
dbConfig.setDeferredWrite(true);
}

Database db = environment.openDatabase(null, name, dbConfig);

log.debug("Opened database {}", name);
Database db = openDb(name);
log.trace("Opened database {}", name);

BerkeleyJEKeyValueStore store = new BerkeleyJEKeyValueStore(name, db, this);
stores.put(name, store);
Expand All @@ -216,6 +224,20 @@ public BerkeleyJEKeyValueStore openDatabase(String name) throws BackendException
}
}

private Database openDb(String name) {
DatabaseConfig dbConfig = new DatabaseConfig();
dbConfig.setReadOnly(false);
dbConfig.setAllowCreate(true);
dbConfig.setTransactional(transactional);
dbConfig.setKeyPrefixing(true);

if (batchLoading) {
dbConfig.setDeferredWrite(true);
}

return environment.get().openDatabase(null, name, dbConfig);
}

@Override
public void mutateMany(Map<String, KVMutation> mutations, StoreTransaction txh) throws BackendException {
for (Map.Entry<String,KVMutation> mutation : mutations.entrySet()) {
Expand Down Expand Up @@ -266,7 +288,7 @@ public void close() throws BackendException {
//Ignore
}
try {
environment.close();
environment.get().close();
} catch (DatabaseException e) {
throw new PermanentBackendException("Could not close BerkeleyJE database", e);
}
Expand All @@ -282,8 +304,8 @@ public void clearStorage() throws BackendException {
throw new IllegalStateException("Cannot delete store, since database is open: " + stores.keySet());
}

for (final String db : environment.getDatabaseNames()) {
environment.removeDatabase(NULL_TRANSACTION, db);
for (final String db : environment.get().getDatabaseNames()) {
environment.get().removeDatabase(NULL_TRANSACTION, db);
log.debug("Removed database {} (clearStorage)", db);
}
close();
Expand All @@ -292,7 +314,7 @@ public void clearStorage() throws BackendException {

@Override
public boolean exists() throws BackendException {
return !environment.getDatabaseNames().isEmpty();
return !environment.get().getDatabaseNames().isEmpty();
}

@Override
Expand Down Expand Up @@ -335,4 +357,10 @@ private TransactionBegin(String msg) {
super(msg);
}
}

static TraversalInterruptedException convertThreadInterruptedException(final ThreadInterruptedException e) {
final TraversalInterruptedException ex = new TraversalInterruptedException();
ex.initCause(e);
return ex;
}
}
Loading

0 comments on commit 088e37d

Please sign in to comment.