Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Mergeability column to support automatic merges #5187

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.core.client.admin;

import java.io.Serializable;
import java.time.Duration;
import java.util.Objects;

import com.google.common.base.Preconditions;

public class TabletMergeability implements Serializable {
private static final long serialVersionUID = 1L;

public static final TabletMergeability NEVER = new TabletMergeability(Duration.ofNanos(-1));
public static final TabletMergeability NOW = new TabletMergeability(Duration.ofNanos(0));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public static final TabletMergeability NOW = new TabletMergeability(Duration.ofNanos(0));
public static final TabletMergeability NOW = new TabletMergeability(Duration.ZERO);


private final Duration delay;

private TabletMergeability(Duration delay) {
this.delay = Objects.requireNonNull(delay);
}

public boolean isNever() {
return this.delay.isNegative();
}

public boolean isNow() {
return this.delay.isZero();
}

public Duration getDelay() {
return delay;
}

@Override
public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) {
return false;
}
TabletMergeability that = (TabletMergeability) o;
return Objects.equals(delay, that.delay);
}

@Override
public int hashCode() {
return Objects.hashCode(delay);
}

@Override
public String toString() {
if (isNow()) {
return "TabletMergeability=NOW";
} else if (isNever()) {
return "TabletMergeability=NEVER";
}
return "TabletMergeability=AFTER:" + delay.toNanos();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may want to convert this to ms or s in the toString method for ease of understanding.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, not a bad idea, nanoseconds would be pretty tough to understand in a log.

}

public static TabletMergeability from(Duration delay) {
Preconditions.checkArgument(delay.toNanos() >= -1,
"Duration of delay must be -1, 0, or a positive offset.");
return new TabletMergeability(delay);
}

public static TabletMergeability after(Duration delay) {
Preconditions.checkArgument(delay.toNanos() > 0, "Duration of delay must be greater than 0.");
return new TabletMergeability(delay);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.core.metadata;

import java.time.Duration;

import org.apache.accumulo.core.client.admin.TabletMergeability;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.util.time.SteadyTime;

public class TabletMergeabilityUtil {

public static Value toValue(TabletMergeability tabletMergeability) {
return new Value(Long.toString(tabletMergeability.getDelay().toNanos()));
}

public static TabletMergeability fromValue(Value value) {
return TabletMergeability.from(Duration.ofNanos(Long.parseLong(value.toString())));
}

public boolean isMergeable(TabletMergeability mergeability, SteadyTime currentTime) {
if (mergeability.isNever()) {
return false;
}
return currentTime.getDuration().compareTo(mergeability.getDelay()) >= 0;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import org.apache.accumulo.core.client.ConditionalWriter;
import org.apache.accumulo.core.client.admin.TabletAvailability;
import org.apache.accumulo.core.client.admin.TabletMergeability;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.TableId;
Expand Down Expand Up @@ -392,6 +393,8 @@ interface TabletUpdates<T> {

T putCloned();

T putTabletMergeability(TabletMergeability tabletMergeability);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Eventually we will need a steady time to evaluate if something should merge. Seems like the steady time should always be set when the mergability is set. Although its only needed when the duration is > 0. The steady time could be encoded in the same columns value.

Suggested change
T putTabletMergeability(TabletMergeability tabletMergeability);
T putTabletMergeability(TabletMergeability tabletMergeability, SteadyTime steadTime);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Steady time could be added in later PRs. Just wondering how it will be persisted.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My original plan was to persist everything as one value by adding the current manager time to the value. So basically to compute the mergability duration you would read the current SteadyTime then add the offset to it and store it and then later you could compare to the current time by taking a diff.

However, I was thinking more about it and I think that storing it as two separate values makes sense because it allows you to do better debugging (logigng) plus you can see extra information such as when the value was created. It also allows doing other things like update/replacing the original SteadyTime, etc and other metric calculations if we keep it separate so I'll change it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My original plan was to persist everything as one value by adding the current manager time to the value.

Ok I had thought this would store the exact duration specified by the user. Summing the steady time and duration from the user would work. There may be a slight advantage for keeping them separate for debugging purposes as mentioned.


/**
* By default the server lock is automatically added to mutations unless this method is set to
* false.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,10 @@ public static class TabletColumnFamily {
public static final String REQUESTED_QUAL = "requestToHost";
public static final ColumnFQ REQUESTED_COLUMN = new ColumnFQ(NAME, new Text(REQUESTED_QUAL));

public static final String MERGEABILITY_QUAL = "mergeability";
public static final ColumnFQ MERGEABILITY_COLUMN =
new ColumnFQ(NAME, new Text(MERGEABILITY_QUAL));

public static Value encodePrevEndRow(Text per) {
if (per == null) {
return new Value(new byte[] {0});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.SELECTED_QUAL;
import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.TIME_QUAL;
import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily.AVAILABILITY_QUAL;
import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily.MERGEABILITY_QUAL;
import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily.PREV_ROW_QUAL;
import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily.REQUESTED_QUAL;

Expand All @@ -41,6 +42,7 @@
import java.util.Set;

import org.apache.accumulo.core.client.admin.TabletAvailability;
import org.apache.accumulo.core.client.admin.TabletMergeability;
import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.core.clientImpl.TabletAvailabilityUtil;
import org.apache.accumulo.core.data.ByteSequence;
Expand All @@ -58,6 +60,7 @@
import org.apache.accumulo.core.metadata.StoredTabletFile;
import org.apache.accumulo.core.metadata.SuspendingTServer;
import org.apache.accumulo.core.metadata.TServerInstance;
import org.apache.accumulo.core.metadata.TabletMergeabilityUtil;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.BulkFileColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ClonedColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CompactedColumnFamily;
Expand Down Expand Up @@ -123,6 +126,7 @@ public class TabletMetadata {
private final Set<FateId> compacted;
private final Set<FateId> userCompactionsRequested;
private final UnSplittableMetadata unSplittableMetadata;
private final TabletMergeability mergeability;
private final Supplier<Long> fileSize;

private TabletMetadata(Builder tmBuilder) {
Expand Down Expand Up @@ -155,6 +159,7 @@ private TabletMetadata(Builder tmBuilder) {
this.compacted = tmBuilder.compacted.build();
this.userCompactionsRequested = tmBuilder.userCompactionsRequested.build();
this.unSplittableMetadata = tmBuilder.unSplittableMetadata;
this.mergeability = Objects.requireNonNull(tmBuilder.mergeability);
this.fileSize = Suppliers.memoize(() -> {
// This code was using a java stream. While profiling SplitMillionIT, the stream was showing
// up as hot when scanning 1 million tablets. Converted to a for loop to improve performance.
Expand Down Expand Up @@ -198,7 +203,8 @@ public enum ColumnType {
SELECTED,
COMPACTED,
USER_COMPACTION_REQUESTED,
UNSPLITTABLE
UNSPLITTABLE,
MERGEABILITY
}

public static class Location {
Expand Down Expand Up @@ -439,6 +445,11 @@ public UnSplittableMetadata getUnSplittable() {
return unSplittableMetadata;
}

public TabletMergeability getTabletMergeability() {
ensureFetched(ColumnType.MERGEABILITY);
return mergeability;
}

@Override
public String toString() {
return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE).append("tableId", tableId)
Expand All @@ -453,7 +464,8 @@ public String toString() {
.append("operationId", operationId).append("selectedFiles", selectedFiles)
.append("futureAndCurrentLocationSet", futureAndCurrentLocationSet)
.append("userCompactionsRequested", userCompactionsRequested)
.append("unSplittableMetadata", unSplittableMetadata).toString();
.append("unSplittableMetadata", unSplittableMetadata).append("mergeability", mergeability)
.toString();
}

public List<Entry<Key,Value>> getKeyValues() {
Expand Down Expand Up @@ -527,6 +539,9 @@ public static <E extends Entry<Key,Value>> TabletMetadata convertRow(Iterator<E>
case REQUESTED_QUAL:
tmBuilder.onDemandHostingRequested(true);
break;
case MERGEABILITY_QUAL:
tmBuilder.mergeability(TabletMergeabilityUtil.fromValue(kv.getValue()));
break;
default:
throw new IllegalStateException("Unexpected TabletColumnFamily qualifier: " + qual);
}
Expand Down Expand Up @@ -689,7 +704,7 @@ static class Builder {
private final ImmutableSet.Builder<FateId> compacted = ImmutableSet.builder();
private final ImmutableSet.Builder<FateId> userCompactionsRequested = ImmutableSet.builder();
private UnSplittableMetadata unSplittableMetadata;
// private Supplier<Long> fileSize;
private TabletMergeability mergeability = TabletMergeability.NEVER;

void table(TableId tableId) {
this.tableId = tableId;
Expand Down Expand Up @@ -799,6 +814,10 @@ void unSplittableMetadata(UnSplittableMetadata unSplittableMetadata) {
this.unSplittableMetadata = unSplittableMetadata;
}

void mergeability(TabletMergeability mergeability) {
this.mergeability = mergeability;
}

void keyValue(Entry<Key,Value> kv) {
if (this.keyValues == null) {
this.keyValues = ImmutableList.builder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOADED;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOCATION;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOGS;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.MERGEABILITY;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.MERGED;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.OPID;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW;
Expand All @@ -48,6 +49,7 @@
import java.util.TreeMap;

import org.apache.accumulo.core.client.admin.TabletAvailability;
import org.apache.accumulo.core.client.admin.TabletMergeability;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Value;
Expand Down Expand Up @@ -312,6 +314,13 @@ public TabletMetadataBuilder putCloned() {
return this;
}

@Override
public TabletMetadataBuilder putTabletMergeability(TabletMergeability tabletMergeability) {
fetched.add(MERGEABILITY);
internalBuilder.putTabletMergeability(tabletMergeability);
return this;
}

@Override
public TabletMetadataBuilder automaticallyPutServerLock(boolean b) {
throw new UnsupportedOperationException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Map;

import org.apache.accumulo.core.client.admin.TabletAvailability;
import org.apache.accumulo.core.client.admin.TabletMergeability;
import org.apache.accumulo.core.clientImpl.TabletAvailabilityUtil;
import org.apache.accumulo.core.data.ArrayByteSequence;
import org.apache.accumulo.core.data.ByteSequence;
Expand All @@ -35,6 +36,7 @@
import org.apache.accumulo.core.metadata.StoredTabletFile;
import org.apache.accumulo.core.metadata.SuspendingTServer;
import org.apache.accumulo.core.metadata.TServerInstance;
import org.apache.accumulo.core.metadata.TabletMergeabilityUtil;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.BulkFileColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ClonedColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CompactedColumnFamily;
Expand Down Expand Up @@ -390,6 +392,13 @@ public T automaticallyPutServerLock(boolean b) {
return getThis();
}

@Override
public T putTabletMergeability(TabletMergeability tabletMergeability) {
TabletColumnFamily.MERGEABILITY_COLUMN.put(mutation,
TabletMergeabilityUtil.toValue(tabletMergeability));
return getThis();
}

public void setCloseAfterMutate(AutoCloseable closeable) {
this.closeAfterMutate = closeable;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ScanFileColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.SplitColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.SuspendLocationColumn;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.UserCompactionRequestedColumnFamily;
import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
import org.apache.accumulo.core.metadata.schema.filters.TabletMetadataFilter;
Expand Down Expand Up @@ -394,6 +395,9 @@ public Options fetch(ColumnType... colsToFetch) {
case UNSPLITTABLE:
qualifiers.add(SplitColumnFamily.UNSPLITTABLE_COLUMN);
break;
case MERGEABILITY:
qualifiers.add(TabletColumnFamily.MERGEABILITY_COLUMN);
break;
default:
throw new IllegalArgumentException("Unknown col type " + colToFetch);
}
Expand Down
Loading
Loading