Skip to content

Commit

Permalink
Add more metrics and handle emr exception message
Browse files Browse the repository at this point in the history
Signed-off-by: Vamsi Manohar <reddyvam@amazon.com>
  • Loading branch information
vmmusings committed Nov 7, 2023
1 parent b8aa7ef commit 3ebd124
Show file tree
Hide file tree
Showing 5 changed files with 102 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -276,8 +276,10 @@ public void onFailure(Exception e) {

private void handleException(Exception e, RestChannel restChannel) {
if (e instanceof DataSourceNotFoundException) {
MetricUtils.incrementNumericalMetric(MetricName.DATASOURCE_FAILED_REQ_COUNT_CUS);
reportError(restChannel, e, NOT_FOUND);
} else if (e instanceof OpenSearchException) {
MetricUtils.incrementNumericalMetric(MetricName.DATASOURCE_FAILED_REQ_COUNT_SYS);
OpenSearchException exception = (OpenSearchException) e;
reportError(restChannel, exception, exception.status());
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ public enum MetricName {
DATASOURCE_DELETE_REQ_COUNT("datasource_delete_request_count"),
DATASOURCE_FAILED_REQ_COUNT_SYS("datasource_failed_request_count_syserr"),
DATASOURCE_FAILED_REQ_COUNT_CUS("datasource_failed_request_count_cuserr"),
ASYNC_QUERY_GET_API_FAILED_REQ_COUNT_SYS("async_query_get_api_failed_request_count_syserr"),
ASYNC_QUERY_GET_API_FAILED_REQ_COUNT_CUS("async_query_get_api_failed_request_count_cuserr"),
ASYNC_QUERY_CREATE_API_FAILED_REQ_COUNT_SYS("async_query_create_api_failed_request_count_syserr"),
ASYNC_QUERY_CREATE_API_FAILED_REQ_COUNT_CUS("async_query_create_api_failed_request_count_cuserr"),
ASYNC_QUERY_CANCEL_API_FAILED_REQ_COUNT_SYS("async_query_cancel_api_failed_request_count_syserr"),
ASYNC_QUERY_CANCEL_API_FAILED_REQ_COUNT_CUS("async_query_cancel_api_failed_request_count_cuserr"),
EMR_START_JOB_REQUEST_FAILURE_COUNT("emr_start_job_request_failure_count"),
EMR_GET_JOB_RESULT_FAILURE_COUNT("emr_get_job_request_failure_count"),
EMR_CANCEL_JOB_REQUEST_FAILURE_COUNT("emr_cancel_job_request_failure_count"),
Expand Down Expand Up @@ -73,6 +79,12 @@ public static List<String> getNames() {
.add(EMR_INTERACTIVE_QUERY_JOBS_CREATION_COUNT)
.add(EMR_STREAMING_QUERY_JOBS_CREATION_COUNT)
.add(EMR_BATCH_QUERY_JOBS_CREATION_COUNT)
.add(ASYNC_QUERY_CREATE_API_FAILED_REQ_COUNT_CUS)
.add(ASYNC_QUERY_CREATE_API_FAILED_REQ_COUNT_SYS)
.add(ASYNC_QUERY_CANCEL_API_FAILED_REQ_COUNT_CUS)
.add(ASYNC_QUERY_CANCEL_API_FAILED_REQ_COUNT_SYS)
.add(ASYNC_QUERY_GET_API_FAILED_REQ_COUNT_CUS)
.add(ASYNC_QUERY_GET_API_FAILED_REQ_COUNT_SYS)
.build();

public boolean isNumerical() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import com.amazonaws.services.emrserverless.model.SparkSubmit;
import com.amazonaws.services.emrserverless.model.StartJobRunRequest;
import com.amazonaws.services.emrserverless.model.StartJobRunResult;
import com.amazonaws.services.emrserverless.model.ValidationException;
import java.security.AccessController;
import java.security.PrivilegedAction;
import org.apache.logging.log4j.LogManager;
Expand All @@ -30,6 +29,8 @@ public class EmrServerlessClientImpl implements EMRServerlessClient {
private final AWSEMRServerless emrServerless;
private static final Logger logger = LogManager.getLogger(EmrServerlessClientImpl.class);

private static final String GENERIC_INTERNAL_SERVER_ERROR_MESSAGE = "Internal Server Error.";

public EmrServerlessClientImpl(AWSEMRServerless emrServerless) {
this.emrServerless = emrServerless;
}
Expand Down Expand Up @@ -62,9 +63,10 @@ public String startJobRun(StartJobRequest startJobRequest) {
try {
return emrServerless.startJobRun(request);
} catch (Throwable t) {
logger.error("Error while making start job request to emr:", t);
MetricUtils.incrementNumericalMetric(
MetricName.EMR_START_JOB_REQUEST_FAILURE_COUNT);
throw t;
throw new RuntimeException(GENERIC_INTERNAL_SERVER_ERROR_MESSAGE);
}
});
logger.info("Job Run ID: " + startJobRunResult.getJobRunId());
Expand All @@ -82,9 +84,10 @@ public GetJobRunResult getJobRunResult(String applicationId, String jobId) {
try {
return emrServerless.getJobRun(request);
} catch (Throwable t) {
logger.error("Error while making get job run request to emr:", t);
MetricUtils.incrementNumericalMetric(
MetricName.EMR_GET_JOB_RESULT_FAILURE_COUNT);
throw t;
throw new RuntimeException(GENERIC_INTERNAL_SERVER_ERROR_MESSAGE);
}
});
logger.info("Job Run state: " + getJobRunResult.getJobRun().getState());
Expand All @@ -95,24 +98,20 @@ public GetJobRunResult getJobRunResult(String applicationId, String jobId) {
public CancelJobRunResult cancelJobRun(String applicationId, String jobId) {
CancelJobRunRequest cancelJobRunRequest =
new CancelJobRunRequest().withJobRunId(jobId).withApplicationId(applicationId);
try {
CancelJobRunResult cancelJobRunResult =
AccessController.doPrivileged(
(PrivilegedAction<CancelJobRunResult>)
() -> {
try {
return emrServerless.cancelJobRun(cancelJobRunRequest);
} catch (Throwable t) {
MetricUtils.incrementNumericalMetric(
MetricName.EMR_CANCEL_JOB_REQUEST_FAILURE_COUNT);
throw t;
}
});
logger.info(String.format("Job : %s cancelled", cancelJobRunResult.getJobRunId()));
return cancelJobRunResult;
} catch (ValidationException e) {
throw new IllegalArgumentException(
String.format("Couldn't cancel the queryId: %s due to %s", jobId, e.getMessage()));
}
CancelJobRunResult cancelJobRunResult =
AccessController.doPrivileged(
(PrivilegedAction<CancelJobRunResult>)
() -> {
try {
return emrServerless.cancelJobRun(cancelJobRunRequest);
} catch (Throwable t) {
logger.error("Error while making cancel job request to emr:", t);
MetricUtils.incrementNumericalMetric(
MetricName.EMR_CANCEL_JOB_REQUEST_FAILURE_COUNT);
throw new RuntimeException(GENERIC_INTERNAL_SERVER_ERROR_MESSAGE);
}
});
logger.info(String.format("Job : %s cancelled", cancelJobRunResult.getJobRunId()));
return cancelJobRunResult;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import org.opensearch.rest.RestRequest;
import org.opensearch.sql.datasources.exceptions.ErrorMessage;
import org.opensearch.sql.datasources.utils.Scheduler;
import org.opensearch.sql.legacy.metrics.MetricName;
import org.opensearch.sql.legacy.utils.MetricUtils;
import org.opensearch.sql.spark.rest.model.CreateAsyncQueryRequest;
import org.opensearch.sql.spark.transport.TransportCancelAsyncQueryRequestAction;
import org.opensearch.sql.spark.transport.TransportCreateAsyncQueryRequestAction;
Expand Down Expand Up @@ -132,7 +134,7 @@ public void onResponse(

@Override
public void onFailure(Exception e) {
handleException(e, restChannel);
handleException(e, restChannel, restRequest.method());
}
}));
}
Expand Down Expand Up @@ -160,21 +162,25 @@ public void onResponse(

@Override
public void onFailure(Exception e) {
handleException(e, restChannel);
handleException(e, restChannel, restRequest.method());
}
}));
}

private void handleException(Exception e, RestChannel restChannel) {
private void handleException(
Exception e, RestChannel restChannel, RestRequest.Method requestMethod) {
if (e instanceof OpenSearchException) {
OpenSearchException exception = (OpenSearchException) e;
reportError(restChannel, exception, exception.status());
addCustomerErrorMetric(requestMethod);
} else {
LOG.error("Error happened during request handling", e);
if (isClientError(e)) {
reportError(restChannel, e, BAD_REQUEST);
addCustomerErrorMetric(requestMethod);
} else {
reportError(restChannel, e, SERVICE_UNAVAILABLE);
addSystemErrorMetric(requestMethod);
}
}
}
Expand All @@ -201,7 +207,7 @@ public void onResponse(

@Override
public void onFailure(Exception e) {
handleException(e, restChannel);
handleException(e, restChannel, restRequest.method());
}
}));
}
Expand All @@ -214,4 +220,36 @@ private void reportError(final RestChannel channel, final Exception e, final Res
private static boolean isClientError(Exception e) {
return e instanceof IllegalArgumentException || e instanceof IllegalStateException;
}

private void addSystemErrorMetric(RestRequest.Method requestMethod) {
switch (requestMethod) {
case POST:
MetricUtils.incrementNumericalMetric(
MetricName.ASYNC_QUERY_CREATE_API_FAILED_REQ_COUNT_SYS);
break;
case GET:
MetricUtils.incrementNumericalMetric(MetricName.ASYNC_QUERY_GET_API_FAILED_REQ_COUNT_SYS);
break;
case DELETE:
MetricUtils.incrementNumericalMetric(
MetricName.ASYNC_QUERY_CANCEL_API_FAILED_REQ_COUNT_SYS);
break;
}
}

private void addCustomerErrorMetric(RestRequest.Method requestMethod) {
switch (requestMethod) {
case POST:
MetricUtils.incrementNumericalMetric(
MetricName.ASYNC_QUERY_CREATE_API_FAILED_REQ_COUNT_CUS);
break;
case GET:
MetricUtils.incrementNumericalMetric(MetricName.ASYNC_QUERY_GET_API_FAILED_REQ_COUNT_CUS);
break;
case DELETE:
MetricUtils.incrementNumericalMetric(
MetricName.ASYNC_QUERY_CANCEL_API_FAILED_REQ_COUNT_CUS);
break;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,21 +88,23 @@ void testStartJobRun() {

@Test
void testStartJobRunWithErrorMetric() {
doThrow(new RuntimeException()).when(emrServerless).startJobRun(any());
doThrow(new ValidationException("Couldn't start job")).when(emrServerless).startJobRun(any());
EmrServerlessClientImpl emrServerlessClient = new EmrServerlessClientImpl(emrServerless);
Assertions.assertThrows(
RuntimeException.class,
() ->
emrServerlessClient.startJobRun(
new StartJobRequest(
QUERY,
EMRS_JOB_NAME,
EMRS_APPLICATION_ID,
EMRS_EXECUTION_ROLE,
SPARK_SUBMIT_PARAMETERS,
new HashMap<>(),
false,
null)));
RuntimeException runtimeException =
Assertions.assertThrows(
RuntimeException.class,
() ->
emrServerlessClient.startJobRun(
new StartJobRequest(
QUERY,
EMRS_JOB_NAME,
EMRS_APPLICATION_ID,
EMRS_EXECUTION_ROLE,
SPARK_SUBMIT_PARAMETERS,
new HashMap<>(),
false,
null)));
Assertions.assertEquals("Internal Server Error.", runtimeException.getMessage());
}

@Test
Expand Down Expand Up @@ -136,11 +138,13 @@ void testGetJobRunState() {

@Test
void testGetJobRunStateWithErrorMetric() {
doThrow(new RuntimeException()).when(emrServerless).getJobRun(any());
doThrow(new ValidationException("Not a good job")).when(emrServerless).getJobRun(any());
EmrServerlessClientImpl emrServerlessClient = new EmrServerlessClientImpl(emrServerless);
Assertions.assertThrows(
RuntimeException.class,
() -> emrServerlessClient.getJobRunResult(EMRS_APPLICATION_ID, "123"));
RuntimeException runtimeException =
Assertions.assertThrows(
RuntimeException.class,
() -> emrServerlessClient.getJobRunResult(EMRS_APPLICATION_ID, "123"));
Assertions.assertEquals("Internal Server Error.", runtimeException.getMessage());
}

@Test
Expand All @@ -165,13 +169,10 @@ void testCancelJobRunWithErrorMetric() {
void testCancelJobRunWithValidationException() {
doThrow(new ValidationException("Error")).when(emrServerless).cancelJobRun(any());
EmrServerlessClientImpl emrServerlessClient = new EmrServerlessClientImpl(emrServerless);
IllegalArgumentException illegalArgumentException =
RuntimeException runtimeException =
Assertions.assertThrows(
IllegalArgumentException.class,
RuntimeException.class,
() -> emrServerlessClient.cancelJobRun(EMRS_APPLICATION_ID, EMR_JOB_ID));
Assertions.assertEquals(
"Couldn't cancel the queryId: job-123xxx due to Error (Service: null; Status Code: 0; Error"
+ " Code: null; Request ID: null; Proxy: null)",
illegalArgumentException.getMessage());
Assertions.assertEquals("Internal Server Error.", runtimeException.getMessage());
}
}

0 comments on commit 3ebd124

Please sign in to comment.