Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-26473 Introduce db.hbase.container_operations span attribute #3951

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,8 @@ public CompletableFuture<Boolean> thenPut(Put put) {
validatePut(put, conn.connConf.getMaxKeyValueSize());
preCheck();
final Supplier<Span> supplier = newTableOperationSpanBuilder()
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE)
.setContainerOperations(put);
return tracedFuture(
() -> RawAsyncTableImpl.this.<Boolean> newCaller(row, put.getPriority(), rpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, put,
Expand All @@ -363,7 +364,8 @@ public CompletableFuture<Boolean> thenPut(Put put) {
public CompletableFuture<Boolean> thenDelete(Delete delete) {
preCheck();
final Supplier<Span> supplier = newTableOperationSpanBuilder()
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE)
.setContainerOperations(delete);
return tracedFuture(
() -> RawAsyncTableImpl.this.<Boolean> newCaller(row, delete.getPriority(), rpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, delete,
Expand All @@ -379,7 +381,8 @@ public CompletableFuture<Boolean> thenMutate(RowMutations mutations) {
preCheck();
validatePutsInRowMutations(mutations, conn.connConf.getMaxKeyValueSize());
final Supplier<Span> supplier = newTableOperationSpanBuilder()
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE)
.setContainerOperations(mutations);
return tracedFuture(
() -> RawAsyncTableImpl.this
.<Boolean> newCaller(row, mutations.getMaxPriority(), rpcTimeoutNs)
Expand Down Expand Up @@ -422,7 +425,8 @@ public CheckAndMutateWithFilterBuilder timeRange(TimeRange timeRange) {
public CompletableFuture<Boolean> thenPut(Put put) {
validatePut(put, conn.connConf.getMaxKeyValueSize());
final Supplier<Span> supplier = newTableOperationSpanBuilder()
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE)
.setContainerOperations(put);
return tracedFuture(
() -> RawAsyncTableImpl.this.<Boolean> newCaller(row, put.getPriority(), rpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc,
Expand All @@ -437,7 +441,8 @@ public CompletableFuture<Boolean> thenPut(Put put) {
@Override
public CompletableFuture<Boolean> thenDelete(Delete delete) {
final Supplier<Span> supplier = newTableOperationSpanBuilder()
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE)
.setContainerOperations(delete);
return tracedFuture(
() -> RawAsyncTableImpl.this.<Boolean> newCaller(row, delete.getPriority(), rpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, delete,
Expand All @@ -452,7 +457,8 @@ public CompletableFuture<Boolean> thenDelete(Delete delete) {
public CompletableFuture<Boolean> thenMutate(RowMutations mutations) {
validatePutsInRowMutations(mutations, conn.connConf.getMaxKeyValueSize());
final Supplier<Span> supplier = newTableOperationSpanBuilder()
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE)
.setContainerOperations(mutations);
return tracedFuture(
() -> RawAsyncTableImpl.this
.<Boolean> newCaller(row, mutations.getMaxPriority(), rpcTimeoutNs)
Expand All @@ -474,7 +480,8 @@ public CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter)
@Override
public CompletableFuture<CheckAndMutateResult> checkAndMutate(CheckAndMutate checkAndMutate) {
final Supplier<Span> supplier = newTableOperationSpanBuilder()
.setOperation(checkAndMutate);
.setOperation(checkAndMutate)
.setContainerOperations(checkAndMutate.getAction());
return tracedFuture(() -> {
if (checkAndMutate.getAction() instanceof Put ||
checkAndMutate.getAction() instanceof Delete ||
Expand Down Expand Up @@ -527,7 +534,8 @@ public CompletableFuture<CheckAndMutateResult> checkAndMutate(CheckAndMutate che
public List<CompletableFuture<CheckAndMutateResult>>
checkAndMutate(List<CheckAndMutate> checkAndMutates) {
final Supplier<Span> supplier = newTableOperationSpanBuilder()
.setOperation(checkAndMutates);
.setOperation(checkAndMutates)
.setContainerOperations(checkAndMutates);
return tracedFutures(
() -> batch(checkAndMutates, rpcTimeoutNs).stream()
.map(f -> f.thenApply(r -> (CheckAndMutateResult) r)).collect(toList()),
Expand Down Expand Up @@ -583,7 +591,8 @@ public CompletableFuture<Result> mutateRow(RowMutations mutations) {
long nonceGroup = conn.getNonceGenerator().getNonceGroup();
long nonce = conn.getNonceGenerator().newNonce();
final Supplier<Span> supplier = newTableOperationSpanBuilder()
.setOperation(mutations);
.setOperation(mutations)
.setContainerOperations(mutations);
return tracedFuture(
() -> this
.<Result> newCaller(mutations.getRow(), mutations.getMaxPriority(), writeRpcTimeoutNs)
Expand Down Expand Up @@ -656,28 +665,32 @@ public void onComplete() {
@Override
public List<CompletableFuture<Result>> get(List<Get> gets) {
final Supplier<Span> supplier = newTableOperationSpanBuilder()
.setOperation(gets);
.setOperation(gets)
.setContainerOperations(HBaseSemanticAttributes.Operation.GET);
return tracedFutures(() -> batch(gets, readRpcTimeoutNs), supplier);
}

@Override
public List<CompletableFuture<Void>> put(List<Put> puts) {
final Supplier<Span> supplier = newTableOperationSpanBuilder()
.setOperation(puts);
.setOperation(puts)
.setContainerOperations(HBaseSemanticAttributes.Operation.PUT);
return tracedFutures(() -> voidMutate(puts), supplier);
}

@Override
public List<CompletableFuture<Void>> delete(List<Delete> deletes) {
final Supplier<Span> supplier = newTableOperationSpanBuilder()
.setOperation(deletes);
.setOperation(deletes)
.setContainerOperations(HBaseSemanticAttributes.Operation.DELETE);
return tracedFutures(() -> voidMutate(deletes), supplier);
}

@Override
public <T> List<CompletableFuture<T>> batch(List<? extends Row> actions) {
final Supplier<Span> supplier = newTableOperationSpanBuilder()
.setOperation(actions);
.setOperation(actions)
.setContainerOperations(actions);
return tracedFutures(() -> batch(actions, rpcTimeoutNs), supplier);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,22 @@

package org.apache.hadoop.hbase.client.trace;

import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.CONTAINER_DB_OPERATIONS_KEY;
import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.DB_OPERATION;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanBuilder;
import io.opentelemetry.api.trace.SpanKind;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.AsyncConnectionImpl;
Expand Down Expand Up @@ -85,6 +92,72 @@ public TableOperationSpanBuilder setOperation(final Operation operation) {
return this;
}

// `setContainerOperations` perform a recursive descent expansion of all the operations
// contained within the provided "batch" object.

public TableOperationSpanBuilder setContainerOperations(final RowMutations mutations) {
final Operation[] ops = mutations.getMutations()
.stream()
.flatMap(row -> Stream.concat(Stream.of(valueFrom(row)), unpackRowOperations(row).stream()))
.toArray(Operation[]::new);
return setContainerOperations(ops);
}

public TableOperationSpanBuilder setContainerOperations(final Row row) {
final Operation[] ops =
Stream.concat(Stream.of(valueFrom(row)), unpackRowOperations(row).stream())
.toArray(Operation[]::new);
return setContainerOperations(ops);
}

public TableOperationSpanBuilder setContainerOperations(
final Collection<? extends Row> operations
) {
final Operation[] ops = operations.stream()
.flatMap(row -> Stream.concat(Stream.of(valueFrom(row)), unpackRowOperations(row).stream()))
.toArray(Operation[]::new);
return setContainerOperations(ops);
}

private static Set<Operation> unpackRowOperations(final Row row) {
final Set<Operation> ops = new HashSet<>();
if (row instanceof CheckAndMutate) {
final CheckAndMutate cam = (CheckAndMutate) row;
ops.addAll(unpackRowOperations(cam));
}
if (row instanceof RowMutations) {
final RowMutations mutations = (RowMutations) row;
ops.addAll(unpackRowOperations(mutations));
}
return ops;
}

private static Set<Operation> unpackRowOperations(final CheckAndMutate cam) {
final Set<Operation> ops = new HashSet<>();
final Operation op = valueFrom(cam.getAction());
switch (op) {
case BATCH:
case CHECK_AND_MUTATE:
ops.addAll(unpackRowOperations(cam.getAction()));
break;
default:
ops.add(op);
}
return ops;
}

public TableOperationSpanBuilder setContainerOperations(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to store a key value pair here to also reflect the cout for each operation?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's an interesting idea! What do you have in mind, something like db.hbase.container_operations.get.count ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a simple idea, not sure what is the best way to implement it. Checked the code, seems not easy to have a key value structure for an attribute? If we just encode it with a String, like 'Put,1000', seems it will make searching this attribute a bit harder? But if we introduce another attribute to store the count, we need to make these two attributes aligned, which is also a pain...

Anyway, not a big problem for now, could consider it later when we learn opentelemetry more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's not a bad idea. I don't now how the telemetry services will handle these kinds of fields.

final Operation... operations
) {
final List<String> ops = Arrays.stream(operations)
.map(op -> op == null ? unknown : op.name())
.sorted()
.distinct()
.collect(Collectors.toList());
attributes.put(CONTAINER_DB_OPERATIONS_KEY, ops);
return this;
}

public TableOperationSpanBuilder setTableName(final TableName tableName) {
this.tableName = tableName;
TableSpanBuilder.populateTableNameAttributes(attributes, tableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hbase.client;

import static org.apache.hadoop.hbase.client.trace.hamcrest.AttributesMatchers.containsEntryWithStringValuesOf;
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasAttributes;
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasEnded;
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasKind;
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName;
Expand Down Expand Up @@ -333,15 +335,19 @@ public void testCheckAndMutateList() {
.ifEquals(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))
.build(new Delete(Bytes.toBytes(0))))).toArray(new CompletableFuture[0]))
.join();
assertTrace("BATCH");
assertTrace("BATCH", hasAttributes(
containsEntryWithStringValuesOf(
"db.hbase.container_operations", "CHECK_AND_MUTATE", "DELETE")));
}

@Test
public void testCheckAndMutateAll() {
table.checkAndMutateAll(Arrays.asList(CheckAndMutate.newBuilder(Bytes.toBytes(0))
.ifEquals(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))
.build(new Delete(Bytes.toBytes(0))))).join();
assertTrace("BATCH");
assertTrace("BATCH", hasAttributes(
containsEntryWithStringValuesOf(
"db.hbase.container_operations", "CHECK_AND_MUTATE", "DELETE")));
}

private void testCheckAndMutateBuilder(Row op) {
Expand Down Expand Up @@ -427,8 +433,13 @@ public void testCheckAndMutateWithFilterBuilderThenMutations() throws IOExceptio

@Test
public void testMutateRow() throws IOException {
table.mutateRow(new RowMutations(Bytes.toBytes(0)).add(new Delete(Bytes.toBytes(0))));
assertTrace("BATCH");
final RowMutations mutations = new RowMutations(Bytes.toBytes(0))
.add(new Put(Bytes.toBytes(0))
.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v")))
.add(new Delete(Bytes.toBytes(0)));
table.mutateRow(mutations).join();
assertTrace("BATCH", hasAttributes(
containsEntryWithStringValuesOf("db.hbase.container_operations", "DELETE", "PUT")));
}

@Test
Expand All @@ -443,27 +454,31 @@ public void testExistsList() {
.allOf(
table.exists(Arrays.asList(new Get(Bytes.toBytes(0)))).toArray(new CompletableFuture[0]))
.join();
assertTrace("BATCH");
assertTrace("BATCH", hasAttributes(
containsEntryWithStringValuesOf("db.hbase.container_operations", "GET")));
}

@Test
public void testExistsAll() {
table.existsAll(Arrays.asList(new Get(Bytes.toBytes(0)))).join();
assertTrace("BATCH");
assertTrace("BATCH", hasAttributes(
containsEntryWithStringValuesOf("db.hbase.container_operations", "GET")));
}

@Test
public void testGetList() {
CompletableFuture
.allOf(table.get(Arrays.asList(new Get(Bytes.toBytes(0)))).toArray(new CompletableFuture[0]))
.join();
assertTrace("BATCH");
assertTrace("BATCH", hasAttributes(
containsEntryWithStringValuesOf("db.hbase.container_operations", "GET")));
}

@Test
public void testGetAll() {
table.getAll(Arrays.asList(new Get(Bytes.toBytes(0)))).join();
assertTrace("BATCH");
assertTrace("BATCH", hasAttributes(
containsEntryWithStringValuesOf("db.hbase.container_operations", "GET")));
}

@Test
Expand All @@ -472,14 +487,16 @@ public void testPutList() {
.allOf(table.put(Arrays.asList(new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"),
Bytes.toBytes("cq"), Bytes.toBytes("v")))).toArray(new CompletableFuture[0]))
.join();
assertTrace("BATCH");
assertTrace("BATCH", hasAttributes(
containsEntryWithStringValuesOf("db.hbase.container_operations", "PUT")));
}

@Test
public void testPutAll() {
table.putAll(Arrays.asList(new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"),
Bytes.toBytes("cq"), Bytes.toBytes("v")))).join();
assertTrace("BATCH");
assertTrace("BATCH", hasAttributes(
containsEntryWithStringValuesOf("db.hbase.container_operations", "PUT")));
}

@Test
Expand All @@ -488,13 +505,15 @@ public void testDeleteList() {
.allOf(
table.delete(Arrays.asList(new Delete(Bytes.toBytes(0)))).toArray(new CompletableFuture[0]))
.join();
assertTrace("BATCH");
assertTrace("BATCH", hasAttributes(
containsEntryWithStringValuesOf("db.hbase.container_operations", "DELETE")));
}

@Test
public void testDeleteAll() {
table.deleteAll(Arrays.asList(new Delete(Bytes.toBytes(0)))).join();
assertTrace("BATCH");
assertTrace("BATCH", hasAttributes(
containsEntryWithStringValuesOf("db.hbase.container_operations", "DELETE")));
}

@Test
Expand All @@ -503,12 +522,14 @@ public void testBatch() {
.allOf(
table.batch(Arrays.asList(new Delete(Bytes.toBytes(0)))).toArray(new CompletableFuture[0]))
.join();
assertTrace("BATCH");
assertTrace("BATCH", hasAttributes(
containsEntryWithStringValuesOf("db.hbase.container_operations", "DELETE")));
}

@Test
public void testBatchAll() {
table.batchAll(Arrays.asList(new Delete(Bytes.toBytes(0)))).join();
assertTrace("BATCH");
assertTrace("BATCH", hasAttributes(
containsEntryWithStringValuesOf("db.hbase.container_operations", "DELETE")));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@ public final class HBaseSemanticAttributes {
public static final AttributeKey<String> DB_NAME = SemanticAttributes.DB_NAME;
public static final AttributeKey<String> DB_OPERATION = SemanticAttributes.DB_OPERATION;
public static final AttributeKey<String> TABLE_KEY = AttributeKey.stringKey("db.hbase.table");
/**
* For operations that themselves ship one or more operations, such as
* {@link Operation#BATCH} and {@link Operation#CHECK_AND_MUTATE}.
*/
public static final AttributeKey<List<String>> CONTAINER_DB_OPERATIONS_KEY =
AttributeKey.stringArrayKey("db.hbase.container_operations");
public static final AttributeKey<List<String>> REGION_NAMES_KEY =
AttributeKey.stringArrayKey("db.hbase.regions");
public static final AttributeKey<String> RPC_SERVICE_KEY =
Expand Down