Skip to content

Commit

Permalink
feat: Add PARTITION_UNAVAILABLE error code when partition pauses proc…
Browse files Browse the repository at this point in the history
…essing (#23654)

## Description
Create a new `PARTITION_UNAVAILABLE` error code corresponding to when a
partition pauses processing requests.

## Checklist

- [x] Add a new `PARTITION_UNAVAILABLE` error code (as in, to the SBE
generated `ErrorCode` enum), which is documented as meaning that the
command cannot be processed because the processor is temporarily
unavailable.
- [x] Update the `CommandApiRequestHandler` to return this error,
instead of the current `INTERNAL_ERROR`.
- [x] Map `PARTITION_UNAVAILABLE` error code in `GrpcErrorMapper` such
that the error is logged as debug, and the mapped gRPC error
is `UNAVAILABLE`.
- [x] Map `PARTITION_UNAVAILABLE` error code in `RestErrorMapper` such
that the error is logged as debug, and the mapped HTTP code is 503
(`SERVICE_UNAVAILABLE`).
- [x]  Write test for `GrpcErrorMapper`
- [x]  Write test for `RestErrorMapper`  in `ErrorMapperTest`
- [x]  Updated integration/QA regression tests

## Related issues

closes #22928
  • Loading branch information
filipecampos authored Oct 22, 2024
2 parents b1c6e99 + 520253c commit 0572f28
Show file tree
Hide file tree
Showing 9 changed files with 88 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ public Stream<? extends Arguments> provideArguments(final ExtensionContext exten
Arguments.of(
new BrokerErrorException(new BrokerError(ErrorCode.RESOURCE_EXHAUSTED, "failure")),
503),
Arguments.of(
new BrokerErrorException(new BrokerError(ErrorCode.PARTITION_UNAVAILABLE, "failure")),
500),
Arguments.of(
new BrokerErrorException(new BrokerError(ErrorCode.INTERNAL_ERROR, "failure")), 500),
Arguments.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public final class ErrorResponseWriter implements BufferWriter {
private static final String PROCESS_NOT_FOUND_FORMAT =
"Expected to get process with %s, but no such process found";
private static final String RESOURCE_EXHAUSTED = "Reached maximum capacity of requests handled";
private static final String PARTITION_UNAVAILABLE = "Cannot accept requests for partition %d.";
private static final String OUT_OF_DISK_SPACE =
"Cannot accept requests for partition %d. Broker is out of disk space";

Expand Down Expand Up @@ -88,6 +89,15 @@ public ErrorResponseWriter resourceExhausted(final String message) {
return errorCode(ErrorCode.RESOURCE_EXHAUSTED).errorMessage(message);
}

public ErrorResponseWriter partitionUnavailable(final int partitionId) {
return errorCode(ErrorCode.PARTITION_UNAVAILABLE)
.errorMessage(String.format(PARTITION_UNAVAILABLE, partitionId));
}

public ErrorResponseWriter partitionUnavailable(final String message) {
return errorCode(ErrorCode.PARTITION_UNAVAILABLE).errorMessage(message);
}

public ErrorResponseWriter outOfDiskSpace(final int partitionId) {
return errorCode(ErrorCode.RESOURCE_EXHAUSTED)
.errorMessage(String.format(OUT_OF_DISK_SPACE, partitionId));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ private Either<ErrorResponseWriter, CommandApiResponseWriter> handleExecuteComma

if (processingPaused.getOrDefault(partitionId, false)) {
return Either.left(
errorWriter.internalError("Processing paused for partition '%s'", partitionId));
errorWriter.partitionUnavailable(
String.format("Processing paused for partition '%s'", partitionId)));
}

final var command = reader.getMessageDecoder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,10 @@ private Status mapBrokerErrorToStatus(
builder.setCode(Code.UNAVAILABLE_VALUE);
}
case MALFORMED_REQUEST -> builder.setCode(Code.INVALID_ARGUMENT_VALUE);
case PARTITION_UNAVAILABLE -> {
logger.debug("Partition is currently unavailable: {}", error, rootError);
builder.setCode(Code.UNAVAILABLE_VALUE);
}
default -> {
// all the following are for cases where retrying (with the same gateway) is not expected
// to solve anything
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,4 +180,22 @@ void shouldReturnAbortedOnConnectionClosed() {
// then
assertThat(statusException.getStatus().getCode()).isEqualTo(Code.ABORTED);
}

@Test
void shouldLogPartitionUnavailableErrorOnDebug() {
// given
final var brokerError =
new BrokerError(ErrorCode.PARTITION_UNAVAILABLE, "Partition unavailable");
final BrokerErrorException exception = new BrokerErrorException(brokerError);

// when
log.setLevel(Level.DEBUG);
final StatusRuntimeException statusException = errorMapper.mapError(exception, logger);

// then
assertThat(statusException.getStatus().getCode()).isEqualTo(Code.UNAVAILABLE);
assertThat(recorder.getAppendedEvents()).hasSize(1);
final LogEvent event = recorder.getAppendedEvents().getFirst();
assertThat(event.getLevel()).isEqualTo(Level.DEBUG);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,11 @@ private static ProblemDetail mapBrokerErrorToProblem(
"Target broker was not the leader of the partition: {}", error, rootError);
yield createProblemDetail(HttpStatus.SERVICE_UNAVAILABLE, message, title);
}
case PARTITION_UNAVAILABLE -> {
REST_GATEWAY_LOGGER.debug(
"Partition in target broker is currently unavailable: {}", error, rootError);
yield createProblemDetail(HttpStatus.SERVICE_UNAVAILABLE, message, title);
}
default -> {
// all the following are for cases where retrying (with the same gateway) is not
// expected
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -521,4 +521,33 @@ public void shouldReturnTooManyRequestsOnRequestRetriesExhaustedException() {
.expectBody(ProblemDetail.class)
.isEqualTo(expectedBody);
}

@Test
void shouldYieldUnavailableWhenPartitionPausesProcessing() {
// given
Mockito.when(userTaskServices.completeUserTask(anyLong(), any(), anyString()))
.thenReturn(
CompletableFuture.failedFuture(
new CamundaBrokerException(
new BrokerError(ErrorCode.PARTITION_UNAVAILABLE, "Just an error"))));

final var request = new UserTaskCompletionRequest();
final var expectedBody =
ProblemDetail.forStatusAndDetail(HttpStatus.SERVICE_UNAVAILABLE, "Just an error");
expectedBody.setTitle(ErrorCode.PARTITION_UNAVAILABLE.name());
expectedBody.setInstance(URI.create(USER_TASKS_BASE_URL + "/1/completion"));

// when / then
webClient
.post()
.uri(USER_TASKS_BASE_URL + "/1/completion")
.accept(MediaType.APPLICATION_JSON)
.contentType(MediaType.APPLICATION_JSON)
.body(Mono.just(request), UserTaskCompletionRequest.class)
.exchange()
.expectStatus()
.isEqualTo(HttpStatus.SERVICE_UNAVAILABLE)
.expectBody(ProblemDetail.class)
.isEqualTo(expectedBody);
}
}
1 change: 1 addition & 0 deletions zeebe/protocol/src/main/resources/protocol.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
<validValue name="INVALID_DEPLOYMENT_PARTITION">6</validValue>
<validValue name="PROCESS_NOT_FOUND">7</validValue>
<validValue name="RESOURCE_EXHAUSTED">8</validValue>
<validValue name="PARTITION_UNAVAILABLE">9</validValue>
</enum>

<enum name="ValueType" encodingType="uint8" description="The type of a record value">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,16 @@
import io.camunda.zeebe.test.util.junit.AutoCloseResources.AutoCloseResource;
import io.camunda.zeebe.test.util.record.RecordingExporter;
import io.camunda.zeebe.util.buffer.BufferWriter;
import io.grpc.Status;
import io.grpc.Status.Code;
import io.grpc.StatusRuntimeException;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Optional;
import java.util.concurrent.Future;
import org.agrona.DirectBuffer;
import org.assertj.core.api.InstanceOfAssertFactories;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -88,6 +92,13 @@ void shouldResumeStreamProcessorWhenRequested() {
final var status = partitions.resumeProcessing();

// then
try (final var client = zeebe.newClientBuilder().build()) {
final Future<?> response =
client.newPublishMessageCommand().messageName("test2").correlationKey("test-key").send();

assertThat(response).isNotNull().succeedsWithin(Duration.ofSeconds(5));
}

assertThat(status.get(1).streamProcessorPhase()).isEqualTo(Phase.PROCESSING.toString());
}

Expand All @@ -107,7 +118,11 @@ void shouldReturnProcessingPausedInsteadOfMessageTimeout() {
.failsWithin(Duration.ofSeconds(5))
.withThrowableThat()
.havingCause()
.withMessageContaining("Processing paused for partition");
.withMessageContaining("UNAVAILABLE: Processing paused for partition")
.asInstanceOf(InstanceOfAssertFactories.throwable(StatusRuntimeException.class))
.extracting(StatusRuntimeException::getStatus)
.extracting(Status::getCode)
.isEqualTo(Code.UNAVAILABLE);
}
}

Expand Down

0 comments on commit 0572f28

Please sign in to comment.