Skip to content

Commit

Permalink
Optimization for adding new vertex and updating vertex properties
Browse files Browse the repository at this point in the history
Signed-off-by: ntisseyre <ntisseyre@apple.com>
  • Loading branch information
ntisseyre committed Mar 6, 2024
1 parent c9e0e27 commit d3f2975
Show file tree
Hide file tree
Showing 20 changed files with 719 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,10 @@ public KCVSConfiguration getUserConfiguration() {
return userConfig;
}

public int getBufferSize() {
return this.bufferSize;
}

private String getMetricsCacheName(String storeName) {
if (!configuration.get(BASIC_METRICS)) return null;
return configuration.get(METRICS_MERGE_STORES) ? METRICS_MERGED_CACHE : storeName + METRICS_CACHE_SUFFIX;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import org.janusgraph.diskstorage.util.BufferUtil;
import org.janusgraph.graphdb.database.idhandling.VariableLong;
import org.janusgraph.graphdb.database.serialize.DataOutput;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.ArrayList;
Expand All @@ -42,6 +44,8 @@
*/
public class CacheTransaction implements StoreTransaction, LoggableTransaction {

private final Logger log = LoggerFactory.getLogger(CacheTransaction.class);

private final StoreTransaction tx;
private final KeyColumnValueStoreManager manager;
private final boolean batchLoading;
Expand Down Expand Up @@ -88,6 +92,7 @@ void mutate(KCVSCache store, StaticBuffer key, List<Entry> additions, List<Entry
numMutations += m.getTotalMutations();

if (batchLoading && numMutations >= persistChunkSize) {
log.info("Flushing mutations to storage numMutations={}, persistChunkSize={}", numMutations, persistChunkSize);
flushInternal();
}
}
Expand Down Expand Up @@ -129,9 +134,7 @@ private KCVMutation convert(KCVEntryMutation mutation) {
private void flushInternal() throws BackendException {
if (numMutations > 0) {
//Consolidate all mutations prior to persistence to ensure that no addition accidentally gets swallowed by a delete
for (Map<StaticBuffer, KCVEntryMutation> store : mutations.values()) {
for (KCVEntryMutation mut : store.values()) mut.consolidate();
}
mutations.values().parallelStream().forEach(store -> store.values().forEach(KCVEntryMutation::consolidate));

//Chunk up mutations
final Map<String, Map<StaticBuffer, KCVMutation>> subMutations = new HashMap<>(mutations.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import javax.script.Bindings;
import javax.script.ScriptException;

Expand All @@ -141,6 +142,8 @@ public class StandardJanusGraph extends JanusGraphBlueprintsGraph {
private static final Logger log =
LoggerFactory.getLogger(StandardJanusGraph.class);

private static final Logger logForPrepareCommit =
LoggerFactory.getLogger(StandardJanusGraph.class.getName()+".prepareCommit");

static {
TraversalStrategies graphStrategies =
Expand Down Expand Up @@ -696,13 +699,47 @@ public ModificationSummary prepareCommit(final Collection<InternalRelation> adde
ListMultimap<InternalVertex, InternalRelation> mutatedProperties = ArrayListMultimap.create();
List<IndexUpdate> indexUpdates = new ArrayList<>();

long startTimeStamp = System.currentTimeMillis();
int totalMutations = addedRelations.size() + deletedRelations.size();
boolean isBigDataSet = totalMutations >= backend.getBufferSize();
if(isBigDataSet) {
logForPrepareCommit.debug("0. Prepare commit for mutations count=" + totalMutations);
}

if(isBigDataSet) {
logForPrepareCommit.debug("1. Collect deleted edges and their index updates and acquire edge locks");
}
prepareCommitDeletes(deletedRelations, filter, mutator, tx, acquireLocks, mutations, mutatedProperties, indexUpdates);

if(isBigDataSet) {
logForPrepareCommit.debug("2. Collect added edges and their index updates and acquire edge locks");
}
prepareCommitAdditions(addedRelations, filter, mutator, tx, acquireLocks, mutations, mutatedProperties, indexUpdates);

if(isBigDataSet) {
logForPrepareCommit.debug("3. Collect all index update for vertices");
}
prepareCommitVertexIndexUpdates(mutatedProperties, indexUpdates);

if(isBigDataSet) {
logForPrepareCommit.debug("4. Acquire index locks (deletions first)");
}
prepareCommitAcquireIndexLocks(indexUpdates, mutator, acquireLocks);

if(isBigDataSet) {
logForPrepareCommit.debug("5. Add relation mutations");
}
prepareCommitAddRelationMutations(mutations, mutator, tx);

if(isBigDataSet) {
logForPrepareCommit.debug("6. Add index updates");
}
boolean has2iMods = prepareCommitIndexUpdatesAndCheckIfAnyMixedIndexUsed(indexUpdates, mutator);

if(isBigDataSet) {
long duration = System.currentTimeMillis() - startTimeStamp;
logForPrepareCommit.debug("7. Prepare commit is done with mutated vertex count=" + mutations.size() + " in duration=" + duration);
}
return new ModificationSummary(!mutations.isEmpty(),has2iMods);
}

Expand Down Expand Up @@ -772,10 +809,11 @@ private void prepareCommitAdditions(final Collection<InternalRelation> addedRela
* Collect all index update for vertices
*/
private void prepareCommitVertexIndexUpdates(final ListMultimap<InternalVertex, InternalRelation> mutatedProperties,
final List<IndexUpdate> indexUpdates){
for (InternalVertex v : mutatedProperties.keySet()) {
indexUpdates.addAll(indexSerializer.getIndexUpdates(v,mutatedProperties.get(v)));
}
final List<IndexUpdate> indexUpdates) {
mutatedProperties.keySet().parallelStream()
.map(v -> indexSerializer.getIndexUpdates(v, mutatedProperties.get(v)))
.collect(Collectors.toList())
.forEach(indexUpdates::addAll);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterators;
import org.apache.commons.lang.StringUtils;
import org.janusgraph.core.Cardinality;
import org.janusgraph.core.JanusGraphElement;
Expand Down Expand Up @@ -60,6 +61,7 @@

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;

import static org.janusgraph.util.encoding.LongEncoding.STRING_ENCODING_MARKER;
Expand Down Expand Up @@ -248,18 +250,19 @@ public static void indexMatches(JanusGraphVertex vertex, IndexRecordEntry[] curr
values = ImmutableList.of(replaceValue);
} else {
values = new ArrayList<>();
Iterable<JanusGraphVertexProperty> props;
Iterator<JanusGraphVertexProperty> props;
if (onlyLoaded ||
(!vertex.isNew() && IDManager.VertexIDType.PartitionedVertex.is(vertex.id()))) {
//going through transaction so we can query deleted vertices
final VertexCentricQueryBuilder qb = ((InternalVertex)vertex).tx().query(vertex);
qb.noPartitionRestriction().type(key);
if (onlyLoaded) qb.queryOnlyLoaded();
props = qb.properties();
props = qb.properties().iterator();
} else {
props = vertex.query().keys(key.name()).properties();
props = Iterators.transform(vertex.properties(key.name()), i -> (JanusGraphVertexProperty) i);
}
for (final JanusGraphVertexProperty p : props) {
while (props.hasNext()) {
JanusGraphVertexProperty p = props.next();
assert !onlyLoaded || p.isLoaded() || p.isRemoved();
assert key.dataType().equals(p.value().getClass()) : key + " -> " + p;
values.add(new IndexRecordEntry(p));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import com.google.common.base.Predicate;
import org.janusgraph.core.JanusGraphVertex;
import org.janusgraph.core.PropertyKey;
import org.janusgraph.diskstorage.EntryList;
import org.janusgraph.diskstorage.keycolumnvalue.SliceQuery;
import org.janusgraph.graphdb.query.vertex.VertexCentricQueryBuilder;
Expand Down Expand Up @@ -57,6 +58,14 @@ public interface InternalVertex extends JanusGraphVertex, InternalElement {
*/
Iterable<InternalRelation> getAddedRelations(Predicate<InternalRelation> query);

Iterable<InternalRelation> findPreviousRelation(long id);

Iterable<InternalRelation> findAddedProperty(Predicate<InternalRelation> query);

default Iterable<InternalRelation> getDuplicatedAddedRelation(PropertyKey key, Object value) {
return this.getAddedRelations(p -> p.getType().equals(key));
}

/**
* Returns all relations that match the given query. If these matching relations are not currently
* held in memory, it uses the given {@link Retriever} to retrieve the edges from backend storage.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,9 @@
import org.janusgraph.graphdb.internal.InternalRelation;
import org.janusgraph.graphdb.internal.InternalVertex;
import org.janusgraph.graphdb.transaction.RelationConstructor;
import org.janusgraph.graphdb.transaction.StandardJanusGraphTx;
import org.janusgraph.graphdb.types.system.ImplicitKey;

import java.util.ArrayList;
import java.util.List;

/**
* @author Matthias Broecheler (me@matthiasb.com)
*/
Expand Down Expand Up @@ -62,8 +60,7 @@ public InternalRelation it() {
if (startVertex.hasAddedRelations() && startVertex.hasRemovedRelations()) {
//Test whether this relation has been replaced
final long id = super.longId();
final Iterable<InternalRelation> added = startVertex.getAddedRelations(
internalRelation -> (internalRelation instanceof StandardEdge) && ((StandardEdge) internalRelation).getPreviousID() == id);
final Iterable<InternalRelation> added = startVertex.findPreviousRelation(id);
assert Iterables.size(added) <= 1 || (isLoop() && Iterables.size(added) == 2);
it = Iterables.getFirst(added, null);
}
Expand All @@ -88,8 +85,7 @@ private synchronized InternalRelation update() {
copy.remove();

Long id = type.getConsistencyModifier() != ConsistencyModifier.FORK ? longId() : null;
StandardEdge u = (StandardEdge) tx().addEdge(id, getVertex(0), getVertex(1), edgeLabel());
u.setPreviousID(longId());
StandardEdge u = (StandardEdge) tx().addEdge(id, getVertex(0), getVertex(1), edgeLabel(), longId());
copyProperties(u);
return u;
}
Expand All @@ -110,13 +106,8 @@ public <O> O getValueDirect(PropertyKey key) {
@Override
public Iterable<PropertyKey> getPropertyKeysDirect() {
RelationCache map = getPropertyMap();
List<PropertyKey> types = new ArrayList<>(map.numProperties());

for (LongObjectCursor<Object> entry : map) {
types.add(tx().getExistingPropertyKey(entry.key));
}

return types;
StandardJanusGraphTx currentTx = tx();
return Iterables.transform(map, entry -> currentTx.getExistingPropertyKey(entry.key));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,9 @@
import org.janusgraph.graphdb.internal.InternalRelation;
import org.janusgraph.graphdb.internal.InternalVertex;
import org.janusgraph.graphdb.transaction.RelationConstructor;
import org.janusgraph.graphdb.transaction.StandardJanusGraphTx;
import org.janusgraph.graphdb.types.system.ImplicitKey;

import java.util.ArrayList;
import java.util.List;

/**
* @author Matthias Broecheler (me@matthiasb.com)
*/
Expand All @@ -51,8 +49,7 @@ public InternalRelation it() {
if (startVertex.hasAddedRelations() && startVertex.hasRemovedRelations()) {
//Test whether this relation has been replaced
final long id = longId();
it = Iterables.getOnlyElement(startVertex.getAddedRelations(
internalRelation -> (internalRelation instanceof StandardVertexProperty) && ((StandardVertexProperty) internalRelation).getPreviousID() == id), null);
it = Iterables.getOnlyElement(startVertex.findPreviousRelation(id), null);
}

return (it != null) ? it : super.it();
Expand All @@ -72,8 +69,7 @@ private synchronized InternalRelation update() {
copy.remove();

Long id = type.getConsistencyModifier() != ConsistencyModifier.FORK ? longId() : null;
StandardVertexProperty u = (StandardVertexProperty) tx().addProperty(getVertex(0), propertyKey(), value(), id);
u.setPreviousID(longId());
StandardVertexProperty u = (StandardVertexProperty) tx().addProperty(getVertex(0), propertyKey(), value(), id, longId());
copyProperties(u);
return u;
}
Expand All @@ -94,12 +90,8 @@ public <O> O getValueDirect(PropertyKey key) {
@Override
public Iterable<PropertyKey> getPropertyKeysDirect() {
RelationCache map = getPropertyMap();
List<PropertyKey> types = new ArrayList<>(map.numProperties());

for (LongObjectCursor<Object> entry : map) {
types.add(tx().getExistingPropertyKey(entry.key));
}
return types;
StandardJanusGraphTx currentTx = tx();
return Iterables.transform(map, entry -> currentTx.getExistingPropertyKey(entry.key));
}

@Override
Expand Down
Loading

0 comments on commit d3f2975

Please sign in to comment.