Skip to content

Commit

Permalink
feat: count row merging errors as internal errors (#2045)
Browse files Browse the repository at this point in the history
* feat: count row merging errors as internal errors

Currently they dont have a status associated and thus get counted as UNKOWN

Change-Id: Ida3470a0609f2e2ad51534eb3141db394af1dcdc

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* format

Change-Id: Iccb2a38b78e5f6c420cb1656887beebaecfa02d2

---------

Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
  • Loading branch information
igorbernstein2 and gcf-owl-bot[bot] authored Jan 9, 2024
1 parent 6b48606 commit fc7845b
Show file tree
Hide file tree
Showing 2 changed files with 130 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,16 @@
*/
package com.google.cloud.bigtable.data.v2.stub.readrows;

import com.google.api.gax.grpc.GrpcStatusCode;
import com.google.api.gax.rpc.InternalException;
import com.google.bigtable.v2.ReadRowsResponse.CellChunk;
import com.google.cloud.bigtable.data.v2.internal.ByteStringComparator;
import com.google.cloud.bigtable.data.v2.models.RowAdapter.RowBuilder;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.EvictingQueue;
import com.google.protobuf.ByteString;
import io.grpc.Status;
import java.util.List;

/**
Expand Down Expand Up @@ -252,6 +255,21 @@ State handleChunk(CellChunk chunk) {
new State() {
@Override
State handleLastScannedRow(ByteString rowKey) {
if (lastCompleteRowKey != null) {
int cmp = ByteStringComparator.INSTANCE.compare(lastCompleteRowKey, rowKey);
String direction = "increasing";
if (reversed) {
cmp *= -1;
direction = "decreasing";
}

validate(
cmp < 0,
"AWAITING_NEW_ROW: last scanned key must be strictly "
+ direction
+ ". New last scanned key="
+ rowKey);
}
completeRow = adapter.createScanMarkerRow(rowKey);
lastCompleteRowKey = rowKey;
return AWAITING_ROW_CONSUME;
Expand Down Expand Up @@ -468,9 +486,9 @@ private void validate(boolean condition, String message) {
}
}

static class InvalidInputException extends RuntimeException {
static class InvalidInputException extends InternalException {
InvalidInputException(String message) {
super(message);
super(message, null, GrpcStatusCode.of(Status.Code.INTERNAL), false);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Copyright 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigtable.data.v2.functional;

import com.google.api.gax.rpc.InternalException;
import com.google.bigtable.v2.BigtableGrpc;
import com.google.bigtable.v2.ReadRowsRequest;
import com.google.bigtable.v2.ReadRowsResponse;
import com.google.cloud.bigtable.data.v2.BigtableDataClient;
import com.google.cloud.bigtable.data.v2.BigtableDataSettings;
import com.google.cloud.bigtable.data.v2.FakeServiceBuilder;
import com.google.cloud.bigtable.data.v2.models.Query;
import com.google.cloud.bigtable.data.v2.models.Row;
import com.google.protobuf.ByteString;
import com.google.protobuf.BytesValue;
import com.google.protobuf.StringValue;
import io.grpc.Server;
import io.grpc.stub.StreamObserver;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
public class ReadRowsTest {
private FakeService service;
private Server server;

@Before
public void setUp() throws Exception {
service = new FakeService();
server = FakeServiceBuilder.create(service).start();
}

@After
public void tearDown() throws Exception {
server.shutdown();
}

@Test
public void rowMergingErrorsUseInternalStatus() throws Exception {
BigtableDataSettings settings =
BigtableDataSettings.newBuilderForEmulator(server.getPort())
.setProjectId("fake-project")
.setInstanceId("fake-instance")
.build();

service.readRowsResponses.add(
ReadRowsResponse.newBuilder()
.addChunks(
ReadRowsResponse.CellChunk.newBuilder()
.setRowKey(ByteString.copyFromUtf8("z"))
.setFamilyName(StringValue.newBuilder().setValue("f"))
.setQualifier(
BytesValue.newBuilder().setValue(ByteString.copyFromUtf8("q")).build())
.setTimestampMicros(1000)
.setValue(ByteString.copyFromUtf8("v"))
.setCommitRow(true))
.addChunks(
ReadRowsResponse.CellChunk.newBuilder()
.setRowKey(ByteString.copyFromUtf8("a"))
.setFamilyName(StringValue.newBuilder().setValue("f"))
.setQualifier(
BytesValue.newBuilder().setValue(ByteString.copyFromUtf8("q")).build())
.setTimestampMicros(1000)
.setValue(ByteString.copyFromUtf8("v"))
.setCommitRow(true))
.build());

try (BigtableDataClient client = BigtableDataClient.create(settings)) {
Assert.assertThrows(
InternalException.class,
() -> {
for (Row ignored : client.readRows(Query.create("fake-table"))) {}
});
}
}

static class FakeService extends BigtableGrpc.BigtableImplBase {
private List<ReadRowsResponse> readRowsResponses =
Collections.synchronizedList(new ArrayList<>());

@Override
public void readRows(
ReadRowsRequest request, StreamObserver<ReadRowsResponse> responseObserver) {
for (ReadRowsResponse r : readRowsResponses) {
responseObserver.onNext(r);
}
responseObserver.onCompleted();
}
}
}

0 comments on commit fc7845b

Please sign in to comment.