Skip to content

Commit

Permalink
Optimization for adding new vertex and updating vertex properties
Browse files Browse the repository at this point in the history
Signed-off-by: ntisseyre <ntisseyre@apple.com>
  • Loading branch information
ntisseyre committed Mar 7, 2024
1 parent c9e0e27 commit 9d463b1
Show file tree
Hide file tree
Showing 25 changed files with 763 additions and 86 deletions.
1 change: 1 addition & 0 deletions docs/configs/janusgraph-cfg.md
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,7 @@ Configuration options for the storage backend. Some options are applicable only
| storage.directory | Storage directory for those storage backends that require local storage. | String | (no default value) | LOCAL |
| storage.drop-on-clear | Whether to drop the graph database (true) or delete rows (false) when clearing storage. Note that some backends always drop the graph database when clearing storage. Also note that indices are always dropped when clearing storage. | Boolean | true | MASKABLE |
| storage.hostname | The hostname or comma-separated list of hostnames of storage backend servers. This is only applicable to some storage backends, such as cassandra and hbase. | String[] | 127.0.0.1 | LOCAL |
| storage.num-mutations-parallel-threshold | This parameter determines the minimum number of mutations a transaction must have before parallel processing is applied during aggregation. Leveraging parallel processing can enhance the commit times for transactions involving a large number of mutations. However, it is advisable not to set the threshold too low (e.g., 0 or 1) due to the overhead associated with parallelism synchronization. This overhead is more efficiently offset in the context of larger transactions. | Integer | 100 | MASKABLE |
| storage.page-size | JanusGraph break requests that may return many results from distributed storage backends into a series of requests for small chunks/pages of results, where each chunk contains up to this many elements. | Integer | 100 | MASKABLE |
| storage.parallel-backend-ops | Whether JanusGraph should attempt to parallelize storage operations | Boolean | true | MASKABLE |
| storage.password | Password to authenticate against backend | String | (no default value) | LOCAL |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ public abstract class MultiWriteKeyColumnValueStoreTest extends AbstractKCVSTest

final int bufferSize = 20;

final int numMutationsParallelThreshold = 20;

protected final String storeName1 = "testStore1";
private KCVSCache store1;
protected final String storeName2 = "testStore2";
Expand Down Expand Up @@ -84,7 +86,7 @@ public void tearDown() throws Exception {

public void open() throws BackendException {
manager = openStorageManager();
tx = new CacheTransaction(manager.beginTransaction(getTxConfig()), manager, bufferSize, Duration.ofMillis(100), true);
tx = new CacheTransaction(manager.beginTransaction(getTxConfig()), manager, bufferSize, numMutationsParallelThreshold, Duration.ofMillis(100), true);
store1 = new NoKCVSCache(manager.openDatabase(storeName1));
store2 = new NoKCVSCache(manager.openDatabase(storeName2));

Expand All @@ -104,7 +106,7 @@ public void clopen() throws BackendException {

public void newTx() throws BackendException {
if (tx!=null) tx.commit();
tx = new CacheTransaction(manager.beginTransaction(getTxConfig()), manager, bufferSize, Duration.ofMillis(100), true);
tx = new CacheTransaction(manager.beginTransaction(getTxConfig()), manager, bufferSize, numMutationsParallelThreshold, Duration.ofMillis(100), true);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@
import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.MANAGEMENT_LOG;
import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.METRICS_MERGE_STORES;
import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.METRICS_PREFIX;
import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.NUM_MUTATIONS_PARALLEL_THRESHOLD;
import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.PAGE_SIZE;
import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.PARALLEL_BACKEND_EXECUTOR_SERVICE_CLASS;
import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.PARALLEL_BACKEND_EXECUTOR_SERVICE_CORE_POOL_SIZE;
Expand Down Expand Up @@ -221,6 +222,7 @@ public Locker apply(String lockerName) {
private final Map<String, IndexProvider> indexes;

private final int bufferSize;
private final int numMutationsParallelThreshold;
private final Duration maxWriteTime;
private final Duration maxReadTime;
private final boolean allowCustomVertexIdType;
Expand Down Expand Up @@ -259,6 +261,9 @@ public Backend(Configuration configuration) {
bufferSize = Integer.MAX_VALUE;
} else bufferSize = bufferSizeTmp;

this.numMutationsParallelThreshold = configuration.get(NUM_MUTATIONS_PARALLEL_THRESHOLD);
Preconditions.checkArgument(numMutationsParallelThreshold > 0, "Parallel-threshold for number of mutations should be positive");

maxWriteTime = configuration.get(STORAGE_WRITE_WAITTIME);
maxReadTime = configuration.get(STORAGE_READ_WAITTIME);

Expand Down Expand Up @@ -477,6 +482,14 @@ public KCVSConfiguration getUserConfiguration() {
return userConfig;
}

public int getBufferSize() {
return this.bufferSize;

Check warning on line 486 in janusgraph-core/src/main/java/org/janusgraph/diskstorage/Backend.java

View check run for this annotation

Codecov / codecov/patch

janusgraph-core/src/main/java/org/janusgraph/diskstorage/Backend.java#L486

Added line #L486 was not covered by tests
}

public int getNumMutationsParallelThreshold() {
return this.numMutationsParallelThreshold;

Check warning on line 490 in janusgraph-core/src/main/java/org/janusgraph/diskstorage/Backend.java

View check run for this annotation

Codecov / codecov/patch

janusgraph-core/src/main/java/org/janusgraph/diskstorage/Backend.java#L490

Added line #L490 was not covered by tests
}

private String getMetricsCacheName(String storeName) {
if (!configuration.get(BASIC_METRICS)) return null;
return configuration.get(METRICS_MERGE_STORES) ? METRICS_MERGED_CACHE : storeName + METRICS_CACHE_SUFFIX;
Expand Down Expand Up @@ -593,7 +606,8 @@ public BackendTransaction beginTransaction(TransactionConfiguration configuratio
StoreTransaction tx = storeManagerLocking.beginTransaction(configuration);

// Cache
CacheTransaction cacheTx = new CacheTransaction(tx, storeManagerLocking, bufferSize, maxWriteTime, configuration.hasEnabledBatchLoading());
CacheTransaction cacheTx = new CacheTransaction(tx, storeManagerLocking, bufferSize, numMutationsParallelThreshold,
maxWriteTime, configuration.hasEnabledBatchLoading());

// Index transactions
final Map<String, IndexTransaction> indexTx = new HashMap<>(indexes.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import org.janusgraph.diskstorage.util.BufferUtil;
import org.janusgraph.graphdb.database.idhandling.VariableLong;
import org.janusgraph.graphdb.database.serialize.DataOutput;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.ArrayList;
Expand All @@ -42,28 +44,35 @@
*/
public class CacheTransaction implements StoreTransaction, LoggableTransaction {

private final Logger log = LoggerFactory.getLogger(CacheTransaction.class);

private final StoreTransaction tx;
private final KeyColumnValueStoreManager manager;
private final boolean batchLoading;
private final int persistChunkSize;

private final int numMutationsParallelThreshold;
private final Duration maxWriteTime;

private int numMutations;
private final Map<KCVSCache, Map<StaticBuffer, KCVEntryMutation>> mutations;

public CacheTransaction(StoreTransaction tx, KeyColumnValueStoreManager manager,
int persistChunkSize, Duration maxWriteTime, boolean batchLoading) {
this(tx, manager, persistChunkSize, maxWriteTime, batchLoading, 2);
int persistChunkSize, int numMutationsParallelThreshold,
Duration maxWriteTime, boolean batchLoading) {
this(tx, manager, persistChunkSize, numMutationsParallelThreshold, maxWriteTime, batchLoading, 2);
}

public CacheTransaction(StoreTransaction tx, KeyColumnValueStoreManager manager, int persistChunkSize,
public CacheTransaction(StoreTransaction tx, KeyColumnValueStoreManager manager,
int persistChunkSize, int numMutationsParallelThreshold,
Duration maxWriteTime, boolean batchLoading, int expectedNumStores) {
Preconditions.checkArgument(tx != null && manager != null && persistChunkSize > 0);
this.tx = tx;
this.manager = manager;
this.batchLoading = batchLoading;
this.numMutations = 0;
this.persistChunkSize = persistChunkSize;
this.numMutationsParallelThreshold = numMutationsParallelThreshold;
this.maxWriteTime = maxWriteTime;
this.mutations = new HashMap<>(expectedNumStores);
}
Expand All @@ -88,6 +97,7 @@ void mutate(KCVSCache store, StaticBuffer key, List<Entry> additions, List<Entry
numMutations += m.getTotalMutations();

if (batchLoading && numMutations >= persistChunkSize) {
log.debug("Flushing mutations to storage numMutations={}, persistChunkSize={}", numMutations, persistChunkSize);
flushInternal();
}
}
Expand Down Expand Up @@ -129,8 +139,12 @@ private KCVMutation convert(KCVEntryMutation mutation) {
private void flushInternal() throws BackendException {
if (numMutations > 0) {
//Consolidate all mutations prior to persistence to ensure that no addition accidentally gets swallowed by a delete
for (Map<StaticBuffer, KCVEntryMutation> store : mutations.values()) {
for (KCVEntryMutation mut : store.values()) mut.consolidate();
if(numMutations < numMutationsParallelThreshold) {
for (Map<StaticBuffer, KCVEntryMutation> store : mutations.values()) {
for (KCVEntryMutation mut : store.values()) mut.consolidate();
}
} else {
mutations.values().parallelStream().forEach(store -> store.values().forEach(KCVEntryMutation::consolidate));
}

//Chunk up mutations
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -604,6 +604,18 @@ public boolean apply(@Nullable String s) {
"Size of the batch in which mutations are persisted",
ConfigOption.Type.MASKABLE, 1024, ConfigOption.positiveInt());

/**
* Number of mutations in a transaction after which use parallel processing for transaction aggregations.
* This might give a boost in transaction commit time.
* Default value is 100.
*/
public static final ConfigOption<Integer> NUM_MUTATIONS_PARALLEL_THRESHOLD = new ConfigOption<>(STORAGE_NS,"num-mutations-parallel-threshold",
"This parameter determines the minimum number of mutations a transaction must have before parallel processing is applied during aggregation. " +
"Leveraging parallel processing can enhance the commit times for transactions involving a large number of mutations. " +
"However, it is advisable not to set the threshold too low (e.g., 0 or 1) due to the overhead associated with parallelism synchronization. " +
"This overhead is more efficiently offset in the context of larger transactions.",
ConfigOption.Type.MASKABLE, 100, ConfigOption.positiveInt());

public static final ConfigOption<Duration> STORAGE_WRITE_WAITTIME = new ConfigOption<>(STORAGE_NS,"write-time",
"Maximum time (in ms) to wait for a backend write operation to complete successfully. If a backend write operation " +
"fails temporarily, JanusGraph will backoff exponentially and retry the operation until the wait time has been exhausted. ",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import javax.script.Bindings;
import javax.script.ScriptException;

Expand All @@ -141,6 +142,8 @@ public class StandardJanusGraph extends JanusGraphBlueprintsGraph {
private static final Logger log =
LoggerFactory.getLogger(StandardJanusGraph.class);

private static final Logger logForPrepareCommit =
LoggerFactory.getLogger(StandardJanusGraph.class.getName()+".prepareCommit");

static {
TraversalStrategies graphStrategies =
Expand Down Expand Up @@ -696,14 +699,48 @@ public ModificationSummary prepareCommit(final Collection<InternalRelation> adde
ListMultimap<InternalVertex, InternalRelation> mutatedProperties = ArrayListMultimap.create();
List<IndexUpdate> indexUpdates = new ArrayList<>();

long startTimeStamp = System.currentTimeMillis();
int totalMutations = addedRelations.size() + deletedRelations.size();
boolean isBigDataSetLoggingEnabled = logForPrepareCommit.isDebugEnabled() && totalMutations >= backend.getBufferSize();
if (isBigDataSetLoggingEnabled) {
logForPrepareCommit.debug("0. Prepare commit for mutations count={}", totalMutations);

Check warning on line 706 in janusgraph-core/src/main/java/org/janusgraph/graphdb/database/StandardJanusGraph.java

View check run for this annotation

Codecov / codecov/patch

janusgraph-core/src/main/java/org/janusgraph/graphdb/database/StandardJanusGraph.java#L706

Added line #L706 was not covered by tests
}

if (isBigDataSetLoggingEnabled) {
logForPrepareCommit.debug("1. Collect deleted edges and their index updates and acquire edge locks");

Check warning on line 710 in janusgraph-core/src/main/java/org/janusgraph/graphdb/database/StandardJanusGraph.java

View check run for this annotation

Codecov / codecov/patch

janusgraph-core/src/main/java/org/janusgraph/graphdb/database/StandardJanusGraph.java#L710

Added line #L710 was not covered by tests
}
prepareCommitDeletes(deletedRelations, filter, mutator, tx, acquireLocks, mutations, mutatedProperties, indexUpdates);

if (isBigDataSetLoggingEnabled) {
logForPrepareCommit.debug("2. Collect added edges and their index updates and acquire edge locks");

Check warning on line 715 in janusgraph-core/src/main/java/org/janusgraph/graphdb/database/StandardJanusGraph.java

View check run for this annotation

Codecov / codecov/patch

janusgraph-core/src/main/java/org/janusgraph/graphdb/database/StandardJanusGraph.java#L715

Added line #L715 was not covered by tests
}
prepareCommitAdditions(addedRelations, filter, mutator, tx, acquireLocks, mutations, mutatedProperties, indexUpdates);

if (isBigDataSetLoggingEnabled) {
logForPrepareCommit.debug("3. Collect all index update for vertices");

Check warning on line 720 in janusgraph-core/src/main/java/org/janusgraph/graphdb/database/StandardJanusGraph.java

View check run for this annotation

Codecov / codecov/patch

janusgraph-core/src/main/java/org/janusgraph/graphdb/database/StandardJanusGraph.java#L720

Added line #L720 was not covered by tests
}
prepareCommitVertexIndexUpdates(mutatedProperties, indexUpdates);

if (isBigDataSetLoggingEnabled) {
logForPrepareCommit.debug("4. Acquire index locks (deletions first)");

Check warning on line 725 in janusgraph-core/src/main/java/org/janusgraph/graphdb/database/StandardJanusGraph.java

View check run for this annotation

Codecov / codecov/patch

janusgraph-core/src/main/java/org/janusgraph/graphdb/database/StandardJanusGraph.java#L725

Added line #L725 was not covered by tests
}
prepareCommitAcquireIndexLocks(indexUpdates, mutator, acquireLocks);

if (isBigDataSetLoggingEnabled) {
logForPrepareCommit.debug("5. Add relation mutations");

Check warning on line 730 in janusgraph-core/src/main/java/org/janusgraph/graphdb/database/StandardJanusGraph.java

View check run for this annotation

Codecov / codecov/patch

janusgraph-core/src/main/java/org/janusgraph/graphdb/database/StandardJanusGraph.java#L730

Added line #L730 was not covered by tests
}
prepareCommitAddRelationMutations(mutations, mutator, tx);

if (isBigDataSetLoggingEnabled) {
logForPrepareCommit.debug("6. Add index updates");

Check warning on line 735 in janusgraph-core/src/main/java/org/janusgraph/graphdb/database/StandardJanusGraph.java

View check run for this annotation

Codecov / codecov/patch

janusgraph-core/src/main/java/org/janusgraph/graphdb/database/StandardJanusGraph.java#L735

Added line #L735 was not covered by tests
}
boolean has2iMods = prepareCommitIndexUpdatesAndCheckIfAnyMixedIndexUsed(indexUpdates, mutator);

return new ModificationSummary(!mutations.isEmpty(),has2iMods);
if (isBigDataSetLoggingEnabled) {
long duration = System.currentTimeMillis() - startTimeStamp;
logForPrepareCommit.debug("7. Prepare commit is done with mutated vertex count={} in duration={}", mutations.size(), duration);

Check warning on line 741 in janusgraph-core/src/main/java/org/janusgraph/graphdb/database/StandardJanusGraph.java

View check run for this annotation

Codecov / codecov/patch

janusgraph-core/src/main/java/org/janusgraph/graphdb/database/StandardJanusGraph.java#L740-L741

Added lines #L740 - L741 were not covered by tests
}
return new ModificationSummary(!mutations.isEmpty(), has2iMods);
}

/**
Expand Down Expand Up @@ -772,10 +809,14 @@ private void prepareCommitAdditions(final Collection<InternalRelation> addedRela
* Collect all index update for vertices
*/
private void prepareCommitVertexIndexUpdates(final ListMultimap<InternalVertex, InternalRelation> mutatedProperties,
final List<IndexUpdate> indexUpdates){
for (InternalVertex v : mutatedProperties.keySet()) {
indexUpdates.addAll(indexSerializer.getIndexUpdates(v,mutatedProperties.get(v)));
}
final List<IndexUpdate> indexUpdates) {
mutatedProperties.keySet().parallelStream()
.map(v -> indexSerializer.getIndexUpdates(v, mutatedProperties.get(v)))
// Note: due to usage of parallel stream, the collector is used to synchronize insertions
// into `indexUpdates` for thread safety reasons.
// Using `forEach` directly isn't thread safe.
.collect(Collectors.toList())
.forEach(indexUpdates::addAll);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterators;
import org.apache.commons.lang.StringUtils;
import org.janusgraph.core.Cardinality;
import org.janusgraph.core.JanusGraphElement;
Expand Down Expand Up @@ -60,6 +61,7 @@

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;

import static org.janusgraph.util.encoding.LongEncoding.STRING_ENCODING_MARKER;
Expand Down Expand Up @@ -248,18 +250,19 @@ public static void indexMatches(JanusGraphVertex vertex, IndexRecordEntry[] curr
values = ImmutableList.of(replaceValue);
} else {
values = new ArrayList<>();
Iterable<JanusGraphVertexProperty> props;
Iterator<JanusGraphVertexProperty> props;
if (onlyLoaded ||
(!vertex.isNew() && IDManager.VertexIDType.PartitionedVertex.is(vertex.id()))) {
//going through transaction so we can query deleted vertices
final VertexCentricQueryBuilder qb = ((InternalVertex)vertex).tx().query(vertex);
qb.noPartitionRestriction().type(key);
if (onlyLoaded) qb.queryOnlyLoaded();
props = qb.properties();
props = qb.properties().iterator();
} else {
props = vertex.query().keys(key.name()).properties();
props = Iterators.transform(vertex.properties(key.name()), i -> (JanusGraphVertexProperty) i);
}
for (final JanusGraphVertexProperty p : props) {
while (props.hasNext()) {
JanusGraphVertexProperty p = props.next();
assert !onlyLoaded || p.isLoaded() || p.isRemoved();
assert key.dataType().equals(p.value().getClass()) : key + " -> " + p;
values.add(new IndexRecordEntry(p));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import com.google.common.base.Predicate;
import org.janusgraph.core.JanusGraphVertex;
import org.janusgraph.core.PropertyKey;
import org.janusgraph.diskstorage.EntryList;
import org.janusgraph.diskstorage.keycolumnvalue.SliceQuery;
import org.janusgraph.graphdb.query.vertex.VertexCentricQueryBuilder;
Expand Down Expand Up @@ -57,6 +58,14 @@ public interface InternalVertex extends JanusGraphVertex, InternalElement {
*/
Iterable<InternalRelation> getAddedRelations(Predicate<InternalRelation> query);

Iterable<InternalRelation> findPreviousRelation(long id);

Iterable<InternalRelation> findAddedProperty(Predicate<InternalRelation> query);

default Iterable<InternalRelation> getDuplicatedAddedRelation(PropertyKey key, Object value) {
return this.getAddedRelations(p -> p.getType().equals(key));

Check warning on line 66 in janusgraph-core/src/main/java/org/janusgraph/graphdb/internal/InternalVertex.java

View check run for this annotation

Codecov / codecov/patch

janusgraph-core/src/main/java/org/janusgraph/graphdb/internal/InternalVertex.java#L66

Added line #L66 was not covered by tests
}

/**
* Returns all relations that match the given query. If these matching relations are not currently
* held in memory, it uses the given {@link Retriever} to retrieve the edges from backend storage.
Expand Down
Loading

0 comments on commit 9d463b1

Please sign in to comment.