diff --git a/agent-module/profiler/pom.xml b/agent-module/profiler/pom.xml index 157fb71663162..04fa24e89f76d 100644 --- a/agent-module/profiler/pom.xml +++ b/agent-module/profiler/pom.xml @@ -167,10 +167,16 @@ --> + org.mapstruct mapstruct + + io.grpc + grpc-inprocess + test + io.micrometer micrometer-registry-otlp diff --git a/agent-module/profiler/src/test/java/com/navercorp/pinpoint/profiler/sender/grpc/MetadataGrpcDataSenderTest.java b/agent-module/profiler/src/test/java/com/navercorp/pinpoint/profiler/sender/grpc/MetadataGrpcDataSenderTest.java new file mode 100644 index 0000000000000..a22d18a9cfad8 --- /dev/null +++ b/agent-module/profiler/src/test/java/com/navercorp/pinpoint/profiler/sender/grpc/MetadataGrpcDataSenderTest.java @@ -0,0 +1,334 @@ +package com.navercorp.pinpoint.profiler.sender.grpc; + +import com.navercorp.pinpoint.grpc.client.ChannelFactory; +import com.navercorp.pinpoint.grpc.client.retry.HedgingServiceConfigBuilder; +import com.navercorp.pinpoint.grpc.trace.MetadataGrpc; +import com.navercorp.pinpoint.grpc.trace.PApiMetaData; +import com.navercorp.pinpoint.grpc.trace.PResult; +import com.navercorp.pinpoint.profiler.context.grpc.GrpcMetadataMessageConverter; +import com.navercorp.pinpoint.profiler.context.grpc.mapper.MetaDataMapper; +import com.navercorp.pinpoint.profiler.metadata.ApiMetaData; +import com.navercorp.pinpoint.profiler.metadata.MetaDataType; +import io.grpc.CallOptions; +import io.grpc.Channel; +import io.grpc.ClientCall; +import io.grpc.ClientInterceptor; +import io.grpc.Context; +import io.grpc.Contexts; +import io.grpc.ForwardingClientCall; +import io.grpc.ManagedChannel; +import io.grpc.Metadata; +import io.grpc.MethodDescriptor; +import io.grpc.Server; +import io.grpc.ServerCall; +import io.grpc.ServerCallHandler; +import io.grpc.ServerInterceptor; +import io.grpc.ServerInterceptors; +import io.grpc.Status; +import io.grpc.inprocess.InProcessChannelBuilder; +import io.grpc.inprocess.InProcessServerBuilder; +import io.grpc.stub.StreamObserver; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mapstruct.factory.Mappers; + +import java.sql.Timestamp; +import java.util.Arrays; +import java.util.Collections; +import java.util.concurrent.CompletableFuture; + +class MetadataGrpcDataSenderTest { + + private final Logger logger = LogManager.getLogger(this.getClass()); + + private static final long DEFAULT_TEST_HEDGING_DELAY_MILLIS = 500L; + private static final String DELAY_METADATA = "delay few seconds"; + private static final String RUNTIME_EXCEPTION_METADATA = "runtime exception test"; + private static final String UNAVAILABLE_METADATA = "status code UNAVAILABLE"; + private static final String UNKNOWN_METADATA = "status code UNKNOWN"; + private static final String FAIL_METADATA = "success=false"; + + private static final Metadata.Key TEST_ID_KEY = Metadata.Key.of("test-id", Metadata.ASCII_STRING_MARSHALLER); + private static final Metadata.Key GRPC_PREVIOUS_RPC_ATTEMPTS_KEY = Metadata.Key.of("grpc-previous-rpc-attempts", Metadata.ASCII_STRING_MARSHALLER); + + private static Server server; + private static String serverName; + private static int testId; + private static int requestCounter; + + @BeforeAll + public static void setUp() { + serverName = InProcessServerBuilder.generateName(); + + server = InProcessServerBuilder + .forName(serverName) + //.directExecutor() + .addService(ServerInterceptors.intercept(new MetadataGrpcService(), new TestServerInterceptor())) + .build(); + + CompletableFuture.supplyAsync(() -> { + try { + server.start(); + } catch (Exception e) { + e.printStackTrace(); + } + return null; + }); + + testId = 0; + } + + @AfterAll + public static void tearDown() { + server.shutdown(); + } + + @BeforeEach + public void resetCounter() { + testId++; + requestCounter = 0; + } + + public static class MetadataGrpcService extends MetadataGrpc.MetadataImplBase { + @Override + public void requestApiMetaData(PApiMetaData request, StreamObserver responseObserver) { + System.out.println(request); + switch (request.getApiInfo()) { + case DELAY_METADATA: + try { + Thread.sleep(1000); + System.out.println("server delayed response time: " + new Timestamp(System.currentTimeMillis())); + responseObserver.onNext(PResult.newBuilder().setSuccess(true).setMessage("test 1s delay, status code: OK").build()); + } catch (InterruptedException ignore) { + } + responseObserver.onCompleted(); + break; + case UNAVAILABLE_METADATA: + responseObserver.onError(Status.UNAVAILABLE.withDescription("test status code: UNAVAILABLE").asException()); + break; + case UNKNOWN_METADATA: + responseObserver.onError(Status.UNKNOWN.withDescription("test status code: UNKNOWN").asException()); + break; + case RUNTIME_EXCEPTION_METADATA: + responseObserver.onError(new RuntimeException("test with runtime exception, status code: UNKNOWN ")); + break; + case FAIL_METADATA: + responseObserver.onNext(PResult.newBuilder().setSuccess(false).setMessage("test success=false, status code: OK").build()); + responseObserver.onCompleted(); + break; + default: + responseObserver.onNext(PResult.newBuilder().setSuccess(true).setMessage("test success=true, status code: OK").build()); + responseObserver.onCompleted(); + break; + } + } + } + + public static class TestServerInterceptor implements ServerInterceptor { + @Override + public ServerCall.Listener interceptCall(ServerCall serverCall, Metadata metadata, ServerCallHandler serverCallHandler) { + int totalAttempts = -1; + String callTestId = metadata.get(TEST_ID_KEY); + if (callTestId != null && callTestId.equals(Integer.toString(testId))) { + requestCounter++; + + String previousAttempts = metadata.get(GRPC_PREVIOUS_RPC_ATTEMPTS_KEY); + if (previousAttempts == null) { + totalAttempts = 1; + } else { + totalAttempts = Integer.parseInt(previousAttempts) + 1; + } + //Assertions.assertThat(requestCounter).isEqualTo(totalAttempts); + } + + System.out.println("---- server time: " + new Timestamp(System.currentTimeMillis())); + System.out.println("testId: " + callTestId); + System.out.println("total attempts: " + totalAttempts); + return Contexts.interceptCall(Context.current(), serverCall, metadata, serverCallHandler); + } + } + + @Test + void sendTest() throws InterruptedException { + HedgingServiceConfigBuilder serviceConfigBuilder = getTestServiceConfigBuilder(); + InProcessChannelBuilder channelBuilder = getInProcessChannelBuilder() + .defaultServiceConfig(serviceConfigBuilder.buildMetadataConfig()); + + MetadataGrpcHedgingDataSender metadataGrpcDataSender = getMetadataGrpcHedgingDataSender(channelBuilder); + + ApiMetaData apiMetaData = new ApiMetaData(1, "call", 10, 2); + boolean send = metadataGrpcDataSender.request(apiMetaData); + + Assertions.assertThat(send).isTrue(); + Thread.sleep(DEFAULT_TEST_HEDGING_DELAY_MILLIS * 2); + Assertions.assertThat(requestCounter).isGreaterThan(0); + } + + @Test + void sendFatalStatusCodeTest() throws InterruptedException { + HedgingServiceConfigBuilder serviceConfigBuilder = getTestServiceConfigBuilder(); + serviceConfigBuilder.setNonFatalStatusCodes(Collections.emptyList()); + InProcessChannelBuilder channelBuilder = getInProcessChannelBuilder() + .defaultServiceConfig(serviceConfigBuilder.buildMetadataConfig()); + MetadataGrpcHedgingDataSender metadataGrpcDataSender = getMetadataGrpcHedgingDataSender(channelBuilder); + + ApiMetaData apiMetaData = new ApiMetaData(1, UNAVAILABLE_METADATA, 10, 2); + boolean send = metadataGrpcDataSender.request(apiMetaData); + + Assertions.assertThat(send).isTrue(); + Thread.sleep(DEFAULT_TEST_HEDGING_DELAY_MILLIS * 4); + Assertions.assertThat(requestCounter).isEqualTo(3); + } + + @Test + void sendFailRetryTest() throws InterruptedException { + HedgingServiceConfigBuilder serviceConfigBuilder = getTestServiceConfigBuilder(); + InProcessChannelBuilder channelBuilder = getInProcessChannelBuilder() + .defaultServiceConfig(serviceConfigBuilder.buildMetadataConfig()); + MetadataGrpcHedgingDataSender metadataGrpcDataSender = getMetadataGrpcHedgingDataSender(channelBuilder); + + ApiMetaData apiMetaData = new ApiMetaData(2, UNAVAILABLE_METADATA, 10, 2); + boolean send = metadataGrpcDataSender.request(apiMetaData); + + Assertions.assertThat(send).isTrue(); + Thread.sleep(DEFAULT_TEST_HEDGING_DELAY_MILLIS * 4); + Assertions.assertThat(requestCounter).isEqualTo(3); + } + + @Test + void sendDelayRetryTest() throws InterruptedException { + HedgingServiceConfigBuilder serviceConfigBuilder = getTestServiceConfigBuilder(); + serviceConfigBuilder.setHedgingDelayMillis(100); + InProcessChannelBuilder channelBuilder = getInProcessChannelBuilder() + .defaultServiceConfig(serviceConfigBuilder.buildMetadataConfig()); + MetadataGrpcHedgingDataSender metadataGrpcDataSender = getMetadataGrpcHedgingDataSender(channelBuilder); + + ApiMetaData apiMetaData = new ApiMetaData(3, DELAY_METADATA, 10, 2); + boolean send = metadataGrpcDataSender.request(apiMetaData); + + Assertions.assertThat(send).isTrue(); + Thread.sleep(DEFAULT_TEST_HEDGING_DELAY_MILLIS * 4); + Assertions.assertThat(requestCounter).isGreaterThan(1); + } + + @Test + void sendFailRetryRuntimeExceptionTest() throws InterruptedException { + HedgingServiceConfigBuilder serviceConfigBuilder = getTestServiceConfigBuilder(); + InProcessChannelBuilder channelBuilder = getInProcessChannelBuilder() + .defaultServiceConfig(serviceConfigBuilder.buildMetadataConfig()); + MetadataGrpcHedgingDataSender metadataGrpcDataSender = getMetadataGrpcHedgingDataSender(channelBuilder); + + ApiMetaData apiMetaData = new ApiMetaData(3, RUNTIME_EXCEPTION_METADATA, 10, 2); + boolean send = metadataGrpcDataSender.request(apiMetaData); + + Assertions.assertThat(send).isTrue(); + Thread.sleep(DEFAULT_TEST_HEDGING_DELAY_MILLIS * 4); + Assertions.assertThat(requestCounter).isGreaterThan(1); + } + + @Test + void sendMaxAttempts() throws InterruptedException { + HedgingServiceConfigBuilder serviceConfigBuilder = getTestServiceConfigBuilder(); + serviceConfigBuilder.setMaxAttempts(5); + InProcessChannelBuilder channelBuilder = getInProcessChannelBuilder() + .defaultServiceConfig(serviceConfigBuilder.buildMetadataConfig()); + MetadataGrpcHedgingDataSender metadataGrpcDataSender = getMetadataGrpcHedgingDataSender(channelBuilder); + + ApiMetaData apiMetaData = new ApiMetaData(3, UNAVAILABLE_METADATA, 10, 2); + boolean send = metadataGrpcDataSender.request(apiMetaData); + + Assertions.assertThat(send).isTrue(); + Thread.sleep(DEFAULT_TEST_HEDGING_DELAY_MILLIS * 7); + Assertions.assertThat(requestCounter).isEqualTo(5); + } + + @Test + void sendMaxAttemptsLimit() throws InterruptedException { + HedgingServiceConfigBuilder serviceConfigBuilder = getTestServiceConfigBuilder(); + serviceConfigBuilder.setMaxAttempts(5); + InProcessChannelBuilder channelBuilder = getInProcessChannelBuilder() + .maxHedgedAttempts(2) + .defaultServiceConfig(serviceConfigBuilder.buildMetadataConfig()); + MetadataGrpcHedgingDataSender metadataGrpcDataSender = getMetadataGrpcHedgingDataSender(channelBuilder); + + ApiMetaData apiMetaData = new ApiMetaData(3, UNAVAILABLE_METADATA, 10, 2); + boolean send = metadataGrpcDataSender.request(apiMetaData); + + Assertions.assertThat(send).isTrue(); + Thread.sleep(DEFAULT_TEST_HEDGING_DELAY_MILLIS * 6); + Assertions.assertThat(requestCounter).isEqualTo(2); + } + + + private InProcessChannelBuilder getInProcessChannelBuilder() { + return InProcessChannelBuilder.forName(serverName) + .directExecutor() + .intercept(new TestClientInterceptor()) + .enableRetry() + //.retryBufferSize() + //.perRpcBufferLimit() + ; + } + + public class TestClientInterceptor implements ClientInterceptor { + @Override + public ClientCall interceptCall(MethodDescriptor methodDescriptor, CallOptions callOptions, Channel channel) { + final ClientCall clientCall = channel.newCall(methodDescriptor, callOptions); + return new ForwardingClientCall.SimpleForwardingClientCall(clientCall) { + @Override + public void start(Listener responseListener, Metadata headers) { + logger.info("request, testId: {}, client time: {}", testId, new Timestamp(System.currentTimeMillis()).toString()); + + headers.put(TEST_ID_KEY, Integer.toString(testId)); + super.start(responseListener, headers); + } + }; + } + } + + private HedgingServiceConfigBuilder getTestServiceConfigBuilder() { + HedgingServiceConfigBuilder serviceConfigBuilder = new HedgingServiceConfigBuilder(); + serviceConfigBuilder.setHedgingDelayMillis(DEFAULT_TEST_HEDGING_DELAY_MILLIS); + serviceConfigBuilder.setNonFatalStatusCodes(Arrays.asList( + Status.Code.UNKNOWN.name(), + Status.Code.INTERNAL.name(), + Status.Code.UNAVAILABLE.name() + )); + return serviceConfigBuilder; + } + + + private MetadataGrpcHedgingDataSender getMetadataGrpcHedgingDataSender(InProcessChannelBuilder channelBuilder) { + MetaDataMapper mapper = Mappers.getMapper(MetaDataMapper.class); + GrpcMetadataMessageConverter converter = new GrpcMetadataMessageConverter(mapper); + + ChannelFactory factory = new ChannelFactory() { + @Override + public String getFactoryName() { + return "inprocess-builder"; + } + + @Override + public ManagedChannel build(String channelName, String host, int port) { + return channelBuilder.build(); + } + + @Override + public ManagedChannel build(String host, int port) { + return channelBuilder.build(); + } + + @Override + public void close() { + } + }; + + return new MetadataGrpcHedgingDataSender<>("localhost", 1234, 1, + converter, factory); + } +} \ No newline at end of file diff --git a/grpc/src/main/java/com/navercorp/pinpoint/grpc/client/retry/HedgingServiceConfigBuilder.java b/grpc/src/main/java/com/navercorp/pinpoint/grpc/client/retry/HedgingServiceConfigBuilder.java index 4766e7212dd10..61bb0c5988ec4 100644 --- a/grpc/src/main/java/com/navercorp/pinpoint/grpc/client/retry/HedgingServiceConfigBuilder.java +++ b/grpc/src/main/java/com/navercorp/pinpoint/grpc/client/retry/HedgingServiceConfigBuilder.java @@ -16,6 +16,9 @@ package com.navercorp.pinpoint.grpc.client.retry; +import io.grpc.Status; + +import java.util.Arrays; import java.util.Collections; import java.util.LinkedHashMap; import java.util.List; @@ -25,6 +28,11 @@ public class HedgingServiceConfigBuilder implements ServiceConfigBuilder { public static final int DEFAULT_MAX_ATTEMPTS = 3; public static final long DEFAULT_HEDGING_DELAY_MILLIS = 1000L; + public static final List DEFAULT_STATUS_CODES = Arrays.asList( + Status.Code.UNKNOWN.name(), + Status.Code.INTERNAL.name(), + Status.Code.UNAVAILABLE.name() + ); private double maxAttempts = DEFAULT_MAX_ATTEMPTS; //Required. Must be two or greater private String hedgingDelay = millisToString(DEFAULT_HEDGING_DELAY_MILLIS); //Required. Long decimal with "s" appended @@ -49,6 +57,8 @@ private void addHedgingPolicy(Map methodConfig) { retryPolicy.put("hedgingDelay", hedgingDelay); if (nonFatalStatusCodes != null && !nonFatalStatusCodes.isEmpty()) { retryPolicy.put("nonFatalStatusCodes", nonFatalStatusCodes); + } else { + retryPolicy.put("nonFatalStatusCodes", DEFAULT_STATUS_CODES); } methodConfig.put("hedgingPolicy", retryPolicy); diff --git a/grpc/src/main/java/com/navercorp/pinpoint/grpc/client/retry/ServiceConfigBuilder.java b/grpc/src/main/java/com/navercorp/pinpoint/grpc/client/retry/ServiceConfigBuilder.java index 3c88dc7c8496a..e4c78e7a33ae0 100644 --- a/grpc/src/main/java/com/navercorp/pinpoint/grpc/client/retry/ServiceConfigBuilder.java +++ b/grpc/src/main/java/com/navercorp/pinpoint/grpc/client/retry/ServiceConfigBuilder.java @@ -20,7 +20,7 @@ public interface ServiceConfigBuilder { - String METADATA_SERVICE = "v1.metadata"; + String METADATA_SERVICE = com.navercorp.pinpoint.grpc.trace.MetadataGrpc.SERVICE_NAME; Map buildMetadataConfig();