Skip to content

Commit

Permalink
Optimization for adding new vertex and updating vertex properties
Browse files Browse the repository at this point in the history
Signed-off-by: ntisseyre <ntisseyre@apple.com>
  • Loading branch information
ntisseyre committed Mar 7, 2024
1 parent c9e0e27 commit c7cd4f7
Show file tree
Hide file tree
Showing 21 changed files with 753 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@
import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.MANAGEMENT_LOG;
import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.METRICS_MERGE_STORES;
import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.METRICS_PREFIX;
import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.NUM_MUTATIONS_PARALLEL_THRESHOLD;
import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.PAGE_SIZE;
import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.PARALLEL_BACKEND_EXECUTOR_SERVICE_CLASS;
import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.PARALLEL_BACKEND_EXECUTOR_SERVICE_CORE_POOL_SIZE;
Expand Down Expand Up @@ -221,6 +222,7 @@ public Locker apply(String lockerName) {
private final Map<String, IndexProvider> indexes;

private final int bufferSize;
private final int numMutationsParallelThreshold;
private final Duration maxWriteTime;
private final Duration maxReadTime;
private final boolean allowCustomVertexIdType;
Expand Down Expand Up @@ -259,6 +261,9 @@ public Backend(Configuration configuration) {
bufferSize = Integer.MAX_VALUE;
} else bufferSize = bufferSizeTmp;

this.numMutationsParallelThreshold = configuration.get(NUM_MUTATIONS_PARALLEL_THRESHOLD);
Preconditions.checkArgument(numMutationsParallelThreshold > 0, "Parallel-threshold for number of mutations should be positive");

maxWriteTime = configuration.get(STORAGE_WRITE_WAITTIME);
maxReadTime = configuration.get(STORAGE_READ_WAITTIME);

Expand Down Expand Up @@ -477,6 +482,14 @@ public KCVSConfiguration getUserConfiguration() {
return userConfig;
}

public int getBufferSize() {
return this.bufferSize;
}

public int getNumMutationsParallelThreshold() {
return this.numMutationsParallelThreshold;
}

private String getMetricsCacheName(String storeName) {
if (!configuration.get(BASIC_METRICS)) return null;
return configuration.get(METRICS_MERGE_STORES) ? METRICS_MERGED_CACHE : storeName + METRICS_CACHE_SUFFIX;
Expand Down Expand Up @@ -593,7 +606,8 @@ public BackendTransaction beginTransaction(TransactionConfiguration configuratio
StoreTransaction tx = storeManagerLocking.beginTransaction(configuration);

// Cache
CacheTransaction cacheTx = new CacheTransaction(tx, storeManagerLocking, bufferSize, maxWriteTime, configuration.hasEnabledBatchLoading());
CacheTransaction cacheTx = new CacheTransaction(tx, storeManagerLocking, bufferSize, numMutationsParallelThreshold,
maxWriteTime, configuration.hasEnabledBatchLoading());

// Index transactions
final Map<String, IndexTransaction> indexTx = new HashMap<>(indexes.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import org.janusgraph.diskstorage.util.BufferUtil;
import org.janusgraph.graphdb.database.idhandling.VariableLong;
import org.janusgraph.graphdb.database.serialize.DataOutput;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.ArrayList;
Expand All @@ -42,28 +44,35 @@
*/
public class CacheTransaction implements StoreTransaction, LoggableTransaction {

private final Logger log = LoggerFactory.getLogger(CacheTransaction.class);

private final StoreTransaction tx;
private final KeyColumnValueStoreManager manager;
private final boolean batchLoading;
private final int persistChunkSize;

private final int numMutationsParallelThreshold;
private final Duration maxWriteTime;

private int numMutations;
private final Map<KCVSCache, Map<StaticBuffer, KCVEntryMutation>> mutations;

public CacheTransaction(StoreTransaction tx, KeyColumnValueStoreManager manager,
int persistChunkSize, Duration maxWriteTime, boolean batchLoading) {
this(tx, manager, persistChunkSize, maxWriteTime, batchLoading, 2);
int persistChunkSize, int numMutationsParallelThreshold,
Duration maxWriteTime, boolean batchLoading) {
this(tx, manager, persistChunkSize, numMutationsParallelThreshold, maxWriteTime, batchLoading, 2);
}

public CacheTransaction(StoreTransaction tx, KeyColumnValueStoreManager manager, int persistChunkSize,
public CacheTransaction(StoreTransaction tx, KeyColumnValueStoreManager manager,
int persistChunkSize, int numMutationsParallelThreshold,
Duration maxWriteTime, boolean batchLoading, int expectedNumStores) {
Preconditions.checkArgument(tx != null && manager != null && persistChunkSize > 0);
this.tx = tx;
this.manager = manager;
this.batchLoading = batchLoading;
this.numMutations = 0;
this.persistChunkSize = persistChunkSize;
this.numMutationsParallelThreshold = numMutationsParallelThreshold;
this.maxWriteTime = maxWriteTime;
this.mutations = new HashMap<>(expectedNumStores);
}
Expand All @@ -88,6 +97,7 @@ void mutate(KCVSCache store, StaticBuffer key, List<Entry> additions, List<Entry
numMutations += m.getTotalMutations();

if (batchLoading && numMutations >= persistChunkSize) {
log.debug("Flushing mutations to storage numMutations={}, persistChunkSize={}", numMutations, persistChunkSize);
flushInternal();
}
}
Expand Down Expand Up @@ -129,8 +139,12 @@ private KCVMutation convert(KCVEntryMutation mutation) {
private void flushInternal() throws BackendException {
if (numMutations > 0) {
//Consolidate all mutations prior to persistence to ensure that no addition accidentally gets swallowed by a delete
for (Map<StaticBuffer, KCVEntryMutation> store : mutations.values()) {
for (KCVEntryMutation mut : store.values()) mut.consolidate();
if(numMutations < numMutationsParallelThreshold) {
for (Map<StaticBuffer, KCVEntryMutation> store : mutations.values()) {
for (KCVEntryMutation mut : store.values()) mut.consolidate();
}
} else {
mutations.values().parallelStream().forEach(store -> store.values().forEach(KCVEntryMutation::consolidate));
}

//Chunk up mutations
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -604,6 +604,15 @@ public boolean apply(@Nullable String s) {
"Size of the batch in which mutations are persisted",
ConfigOption.Type.MASKABLE, 1024, ConfigOption.positiveInt());

/**
* Number of mutations in a transaction after which use parallel processing for transaction aggregations.
* This might give a boost in transaction commit time.
* Default value is 100.
*/
public static final ConfigOption<Integer> NUM_MUTATIONS_PARALLEL_THRESHOLD = new ConfigOption<>(STORAGE_NS,"num-mutations-parallel-threshold",
"Number of mutations after which use parallel processing for transaction aggregations.",
ConfigOption.Type.MASKABLE, 100, ConfigOption.positiveInt());

public static final ConfigOption<Duration> STORAGE_WRITE_WAITTIME = new ConfigOption<>(STORAGE_NS,"write-time",
"Maximum time (in ms) to wait for a backend write operation to complete successfully. If a backend write operation " +
"fails temporarily, JanusGraph will backoff exponentially and retry the operation until the wait time has been exhausted. ",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import javax.script.Bindings;
import javax.script.ScriptException;

Expand All @@ -141,6 +142,8 @@ public class StandardJanusGraph extends JanusGraphBlueprintsGraph {
private static final Logger log =
LoggerFactory.getLogger(StandardJanusGraph.class);

private static final Logger logForPrepareCommit =
LoggerFactory.getLogger(StandardJanusGraph.class.getName()+".prepareCommit");

static {
TraversalStrategies graphStrategies =
Expand Down Expand Up @@ -696,14 +699,48 @@ public ModificationSummary prepareCommit(final Collection<InternalRelation> adde
ListMultimap<InternalVertex, InternalRelation> mutatedProperties = ArrayListMultimap.create();
List<IndexUpdate> indexUpdates = new ArrayList<>();

long startTimeStamp = System.currentTimeMillis();
int totalMutations = addedRelations.size() + deletedRelations.size();
boolean isBigDataSetLoggingEnabled = logForPrepareCommit.isDebugEnabled() && totalMutations >= backend.getBufferSize();
if (isBigDataSetLoggingEnabled) {
logForPrepareCommit.debug("0. Prepare commit for mutations count={}", totalMutations);
}

if (isBigDataSetLoggingEnabled) {
logForPrepareCommit.debug("1. Collect deleted edges and their index updates and acquire edge locks");
}
prepareCommitDeletes(deletedRelations, filter, mutator, tx, acquireLocks, mutations, mutatedProperties, indexUpdates);

if (isBigDataSetLoggingEnabled) {
logForPrepareCommit.debug("2. Collect added edges and their index updates and acquire edge locks");
}
prepareCommitAdditions(addedRelations, filter, mutator, tx, acquireLocks, mutations, mutatedProperties, indexUpdates);

if (isBigDataSetLoggingEnabled) {
logForPrepareCommit.debug("3. Collect all index update for vertices");
}
prepareCommitVertexIndexUpdates(mutatedProperties, indexUpdates);

if (isBigDataSetLoggingEnabled) {
logForPrepareCommit.debug("4. Acquire index locks (deletions first)");
}
prepareCommitAcquireIndexLocks(indexUpdates, mutator, acquireLocks);

if (isBigDataSetLoggingEnabled) {
logForPrepareCommit.debug("5. Add relation mutations");
}
prepareCommitAddRelationMutations(mutations, mutator, tx);

if (isBigDataSetLoggingEnabled) {
logForPrepareCommit.debug("6. Add index updates");
}
boolean has2iMods = prepareCommitIndexUpdatesAndCheckIfAnyMixedIndexUsed(indexUpdates, mutator);

return new ModificationSummary(!mutations.isEmpty(),has2iMods);
if (isBigDataSetLoggingEnabled) {
long duration = System.currentTimeMillis() - startTimeStamp;
logForPrepareCommit.debug("7. Prepare commit is done with mutated vertex count={} in duration={}", mutations.size(), duration);
}
return new ModificationSummary(!mutations.isEmpty(), has2iMods);
}

/**
Expand Down Expand Up @@ -772,10 +809,14 @@ private void prepareCommitAdditions(final Collection<InternalRelation> addedRela
* Collect all index update for vertices
*/
private void prepareCommitVertexIndexUpdates(final ListMultimap<InternalVertex, InternalRelation> mutatedProperties,
final List<IndexUpdate> indexUpdates){
for (InternalVertex v : mutatedProperties.keySet()) {
indexUpdates.addAll(indexSerializer.getIndexUpdates(v,mutatedProperties.get(v)));
}
final List<IndexUpdate> indexUpdates) {
mutatedProperties.keySet().parallelStream()
.map(v -> indexSerializer.getIndexUpdates(v, mutatedProperties.get(v)))
// Note: due to usage of parallel stream, the collector is used to synchronize insertions
// into `indexUpdates` for thread safety reasons.
// Using `forEach` directly isn't thread safe.
.collect(Collectors.toList())
.forEach(indexUpdates::addAll);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterators;
import org.apache.commons.lang.StringUtils;
import org.janusgraph.core.Cardinality;
import org.janusgraph.core.JanusGraphElement;
Expand Down Expand Up @@ -60,6 +61,7 @@

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;

import static org.janusgraph.util.encoding.LongEncoding.STRING_ENCODING_MARKER;
Expand Down Expand Up @@ -248,18 +250,19 @@ public static void indexMatches(JanusGraphVertex vertex, IndexRecordEntry[] curr
values = ImmutableList.of(replaceValue);
} else {
values = new ArrayList<>();
Iterable<JanusGraphVertexProperty> props;
Iterator<JanusGraphVertexProperty> props;
if (onlyLoaded ||
(!vertex.isNew() && IDManager.VertexIDType.PartitionedVertex.is(vertex.id()))) {
//going through transaction so we can query deleted vertices
final VertexCentricQueryBuilder qb = ((InternalVertex)vertex).tx().query(vertex);
qb.noPartitionRestriction().type(key);
if (onlyLoaded) qb.queryOnlyLoaded();
props = qb.properties();
props = qb.properties().iterator();
} else {
props = vertex.query().keys(key.name()).properties();
props = Iterators.transform(vertex.properties(key.name()), i -> (JanusGraphVertexProperty) i);
}
for (final JanusGraphVertexProperty p : props) {
while (props.hasNext()) {
JanusGraphVertexProperty p = props.next();
assert !onlyLoaded || p.isLoaded() || p.isRemoved();
assert key.dataType().equals(p.value().getClass()) : key + " -> " + p;
values.add(new IndexRecordEntry(p));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import com.google.common.base.Predicate;
import org.janusgraph.core.JanusGraphVertex;
import org.janusgraph.core.PropertyKey;
import org.janusgraph.diskstorage.EntryList;
import org.janusgraph.diskstorage.keycolumnvalue.SliceQuery;
import org.janusgraph.graphdb.query.vertex.VertexCentricQueryBuilder;
Expand Down Expand Up @@ -57,6 +58,14 @@ public interface InternalVertex extends JanusGraphVertex, InternalElement {
*/
Iterable<InternalRelation> getAddedRelations(Predicate<InternalRelation> query);

Iterable<InternalRelation> findPreviousRelation(long id);

Iterable<InternalRelation> findAddedProperty(Predicate<InternalRelation> query);

default Iterable<InternalRelation> getDuplicatedAddedRelation(PropertyKey key, Object value) {
return this.getAddedRelations(p -> p.getType().equals(key));
}

/**
* Returns all relations that match the given query. If these matching relations are not currently
* held in memory, it uses the given {@link Retriever} to retrieve the edges from backend storage.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,9 @@
import org.janusgraph.graphdb.internal.InternalRelation;
import org.janusgraph.graphdb.internal.InternalVertex;
import org.janusgraph.graphdb.transaction.RelationConstructor;
import org.janusgraph.graphdb.transaction.StandardJanusGraphTx;
import org.janusgraph.graphdb.types.system.ImplicitKey;

import java.util.ArrayList;
import java.util.List;

/**
* @author Matthias Broecheler (me@matthiasb.com)
*/
Expand Down Expand Up @@ -62,8 +60,7 @@ public InternalRelation it() {
if (startVertex.hasAddedRelations() && startVertex.hasRemovedRelations()) {
//Test whether this relation has been replaced
final long id = super.longId();
final Iterable<InternalRelation> added = startVertex.getAddedRelations(
internalRelation -> (internalRelation instanceof StandardEdge) && ((StandardEdge) internalRelation).getPreviousID() == id);
final Iterable<InternalRelation> added = startVertex.findPreviousRelation(id);
assert Iterables.size(added) <= 1 || (isLoop() && Iterables.size(added) == 2);
it = Iterables.getFirst(added, null);
}
Expand All @@ -88,8 +85,7 @@ private synchronized InternalRelation update() {
copy.remove();

Long id = type.getConsistencyModifier() != ConsistencyModifier.FORK ? longId() : null;
StandardEdge u = (StandardEdge) tx().addEdge(id, getVertex(0), getVertex(1), edgeLabel());
u.setPreviousID(longId());
StandardEdge u = (StandardEdge) tx().addEdge(id, getVertex(0), getVertex(1), edgeLabel(), longId());
copyProperties(u);
return u;
}
Expand All @@ -110,13 +106,8 @@ public <O> O getValueDirect(PropertyKey key) {
@Override
public Iterable<PropertyKey> getPropertyKeysDirect() {
RelationCache map = getPropertyMap();
List<PropertyKey> types = new ArrayList<>(map.numProperties());

for (LongObjectCursor<Object> entry : map) {
types.add(tx().getExistingPropertyKey(entry.key));
}

return types;
StandardJanusGraphTx currentTx = tx();
return Iterables.transform(map, entry -> currentTx.getExistingPropertyKey(entry.key));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,9 @@
import org.janusgraph.graphdb.internal.InternalRelation;
import org.janusgraph.graphdb.internal.InternalVertex;
import org.janusgraph.graphdb.transaction.RelationConstructor;
import org.janusgraph.graphdb.transaction.StandardJanusGraphTx;
import org.janusgraph.graphdb.types.system.ImplicitKey;

import java.util.ArrayList;
import java.util.List;

/**
* @author Matthias Broecheler (me@matthiasb.com)
*/
Expand All @@ -51,8 +49,7 @@ public InternalRelation it() {
if (startVertex.hasAddedRelations() && startVertex.hasRemovedRelations()) {
//Test whether this relation has been replaced
final long id = longId();
it = Iterables.getOnlyElement(startVertex.getAddedRelations(
internalRelation -> (internalRelation instanceof StandardVertexProperty) && ((StandardVertexProperty) internalRelation).getPreviousID() == id), null);
it = Iterables.getOnlyElement(startVertex.findPreviousRelation(id), null);
}

return (it != null) ? it : super.it();
Expand All @@ -72,8 +69,7 @@ private synchronized InternalRelation update() {
copy.remove();

Long id = type.getConsistencyModifier() != ConsistencyModifier.FORK ? longId() : null;
StandardVertexProperty u = (StandardVertexProperty) tx().addProperty(getVertex(0), propertyKey(), value(), id);
u.setPreviousID(longId());
StandardVertexProperty u = (StandardVertexProperty) tx().addProperty(getVertex(0), propertyKey(), value(), id, longId());
copyProperties(u);
return u;
}
Expand All @@ -94,12 +90,8 @@ public <O> O getValueDirect(PropertyKey key) {
@Override
public Iterable<PropertyKey> getPropertyKeysDirect() {
RelationCache map = getPropertyMap();
List<PropertyKey> types = new ArrayList<>(map.numProperties());

for (LongObjectCursor<Object> entry : map) {
types.add(tx().getExistingPropertyKey(entry.key));
}
return types;
StandardJanusGraphTx currentTx = tx();
return Iterables.transform(map, entry -> currentTx.getExistingPropertyKey(entry.key));
}

@Override
Expand Down
Loading

0 comments on commit c7cd4f7

Please sign in to comment.