Skip to content

Commit

Permalink
[#9932] Replace DefaultFuture with CompletableFuture
Browse files Browse the repository at this point in the history
  • Loading branch information
emeroad committed May 15, 2023
1 parent d9ef155 commit de1949e
Show file tree
Hide file tree
Showing 48 changed files with 470 additions and 1,057 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,17 @@
package com.navercorp.pinpoint.collector.cluster;

import com.navercorp.pinpoint.common.server.cluster.ClusterKey;
import com.navercorp.pinpoint.rpc.Future;

import com.navercorp.pinpoint.rpc.ResponseMessage;
import org.apache.thrift.TBase;

import java.util.concurrent.CompletableFuture;

/**
* Connection with agent module
*/
public interface ClusterPoint<M> {

Future<ResponseMessage> request(M request);
CompletableFuture<ResponseMessage> request(M request);

ClusterKey getDestClusterKey();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@
import com.navercorp.pinpoint.collector.receiver.grpc.PinpointGrpcServer;
import com.navercorp.pinpoint.common.server.cluster.ClusterKey;
import com.navercorp.pinpoint.profiler.context.grpc.CommandThriftToGrpcMessageConverter;
import com.navercorp.pinpoint.rpc.DefaultFuture;
import com.navercorp.pinpoint.rpc.Future;
import com.navercorp.pinpoint.rpc.PinpointSocketException;
import com.navercorp.pinpoint.rpc.ResponseMessage;
import com.navercorp.pinpoint.rpc.packet.stream.StreamCode;
Expand All @@ -34,6 +32,7 @@

import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;

/**
* @author Taejin Koo
Expand All @@ -54,11 +53,11 @@ public GrpcAgentConnection(PinpointGrpcServer pinpointGrpcServer, List<Integer>
}

@Override
public Future<ResponseMessage> request(TBase<?, ?> request) {
public CompletableFuture<ResponseMessage> request(TBase<?, ?> request) {
GeneratedMessageV3 message = messageConverter.toMessage(request);
if (message == null) {
DefaultFuture<ResponseMessage> failedFuture = new DefaultFuture<>();
failedFuture.setFailure(new PinpointSocketException(TRouteResult.NOT_SUPPORTED_REQUEST.name()));
CompletableFuture<ResponseMessage> failedFuture = new CompletableFuture<>();
failedFuture.completeExceptionally(new PinpointSocketException(TRouteResult.NOT_SUPPORTED_REQUEST.name()));
return failedFuture;
}
return pinpointGrpcServer.request(message);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package com.navercorp.pinpoint.collector.cluster;

import com.navercorp.pinpoint.common.server.cluster.ClusterKey;
import com.navercorp.pinpoint.rpc.Future;
import com.navercorp.pinpoint.rpc.ResponseMessage;
import com.navercorp.pinpoint.rpc.server.ChannelProperties;
import com.navercorp.pinpoint.rpc.server.PinpointServer;
Expand All @@ -27,6 +26,7 @@

import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;

/**
* @author koo.taejin
Expand Down Expand Up @@ -61,7 +61,7 @@ private static ClusterKey newClusterKey(ChannelProperties channelProperties) {
}

@Override
public Future<ResponseMessage> request(byte[] payload) {
public CompletableFuture<ResponseMessage> request(byte[] payload) {
return pinpointServer.request(payload);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,17 @@
import com.navercorp.pinpoint.collector.cluster.GrpcAgentConnection;
import com.navercorp.pinpoint.collector.cluster.ThriftAgentConnection;
import com.navercorp.pinpoint.collector.cluster.route.filter.RouteFilter;
import com.navercorp.pinpoint.rpc.Future;
import com.navercorp.pinpoint.rpc.ResponseMessage;
import com.navercorp.pinpoint.thrift.dto.command.TCommandTransferResponse;
import com.navercorp.pinpoint.thrift.dto.command.TRouteResult;

import org.apache.commons.lang3.ArrayUtils;
import org.apache.thrift.TBase;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
* @author koo.taejin
* @author HyunGil Jeong
Expand Down Expand Up @@ -83,7 +86,7 @@ private TCommandTransferResponse onRoute0(RequestEvent event) {
return createResponse(TRouteResult.NOT_SUPPORTED_REQUEST);
}

Future<ResponseMessage> future;
CompletableFuture<ResponseMessage> future;
if (clusterPoint instanceof ThriftAgentConnection) {
ThriftAgentConnection thriftAgentConnection = (ThriftAgentConnection) clusterPoint;
future = thriftAgentConnection.request(event.getDeliveryCommand().getPayload());
Expand All @@ -94,26 +97,26 @@ private TCommandTransferResponse onRoute0(RequestEvent event) {
return createResponse(TRouteResult.NOT_ACCEPTABLE);
}

boolean isCompleted = future.await();
if (!isCompleted) {
try {
ResponseMessage responseMessage = future.get(3000, TimeUnit.MILLISECONDS);
if (responseMessage == null) {
return createResponse(TRouteResult.EMPTY_RESPONSE);
}

final byte[] responsePayload = responseMessage.getMessage();
if (ArrayUtils.isEmpty(responsePayload)) {
return createResponse(TRouteResult.EMPTY_RESPONSE, new byte[0]);
}

return createResponse(TRouteResult.OK, responsePayload);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return createResponse(TRouteResult.UNKNOWN, e.getMessage());
} catch (ExecutionException e) {
return createResponse(TRouteResult.UNKNOWN, e.getCause().getMessage());
} catch (TimeoutException e) {
return createResponse(TRouteResult.TIMEOUT);
}

if (future.getCause() != null) {
return createResponse(TRouteResult.UNKNOWN, future.getCause().getMessage());
}

ResponseMessage responseMessage = future.getResult();
if (responseMessage == null) {
return createResponse(TRouteResult.EMPTY_RESPONSE);
}

final byte[] responsePayload = responseMessage.getMessage();
if (ArrayUtils.isEmpty(responsePayload)) {
return createResponse(TRouteResult.EMPTY_RESPONSE, new byte[0]);
}

return createResponse(TRouteResult.OK, responsePayload);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import com.navercorp.pinpoint.common.util.Assert;
import com.navercorp.pinpoint.common.util.StringUtils;
import com.navercorp.pinpoint.io.request.Message;
import com.navercorp.pinpoint.rpc.Future;
import com.navercorp.pinpoint.rpc.PinpointSocketException;
import com.navercorp.pinpoint.rpc.ResponseMessage;
import com.navercorp.pinpoint.rpc.common.SocketStateCode;
Expand All @@ -35,9 +34,9 @@
import com.navercorp.pinpoint.thrift.io.HeaderTBaseDeserializer;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import org.apache.thrift.TBase;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.thrift.TBase;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
Expand All @@ -47,6 +46,10 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
* @author Taejin Koo
Expand Down Expand Up @@ -158,7 +161,7 @@ private List<GrpcAgentConnection> getGrpcAgentConnectionList(final String applic
private CheckConnectionStatusResult request(GrpcAgentConnection grpcAgentConnection, int checkCount) {
logger.info("Ping message will be sent. collector => {}.", grpcAgentConnection.getDestClusterKey());

Future<ResponseMessage> response = null;
CompletableFuture<ResponseMessage> response = null;
try {
response = request0(grpcAgentConnection, checkCount);
} catch (StatusRuntimeException e) {
Expand All @@ -173,15 +176,8 @@ private CheckConnectionStatusResult request(GrpcAgentConnection grpcAgentConnect
clearConnection(grpcAgentConnection);
return CheckConnectionStatusResult.FAIL_AND_CLEAR_CONNECTION;
}

if (!response.isSuccess()) {
Throwable cause = response.getCause();
logger.warn("Failed while request message. message:{}", cause.getMessage(), cause);
return CheckConnectionStatusResult.FAIL;
}

try {
ResponseMessage result = response.getResult();
ResponseMessage result = response.get(3000, TimeUnit.MILLISECONDS);
Message<TBase<?, ?>> deserialize = tBaseDeserializer.deserialize(result.getMessage());

TBase<?, ?> data = deserialize.getData();
Expand All @@ -191,9 +187,11 @@ private CheckConnectionStatusResult request(GrpcAgentConnection grpcAgentConnect
}
}
logger.warn("Receive unexpected response data. data:{}", data);
} catch (Exception e) {
logger.warn("Exception occurred while handles response message. message:{}", e.getMessage(), e);
} catch (Exception cause) {
logger.warn("Failed while request message. message:{}", cause.getMessage(), cause);
return CheckConnectionStatusResult.FAIL;
}

return CheckConnectionStatusResult.FAIL;
}

Expand All @@ -204,12 +202,19 @@ private void clearConnection(GrpcAgentConnection grpcAgentConnection) {

// If the occur excption in connection, do not retry
// Multiple attempts only at timeout
private Future<ResponseMessage> request0(GrpcAgentConnection grpcAgentConnection, int maxCount) {
private CompletableFuture<ResponseMessage> request0(GrpcAgentConnection grpcAgentConnection, int maxCount) {
for (int i = 0; i < maxCount; i++) {
Future<ResponseMessage> responseFuture = grpcAgentConnection.request(CONNECTION_CHECK_COMMAND);
boolean await = responseFuture.await();
if (await) {
CompletableFuture<ResponseMessage> responseFuture = grpcAgentConnection.request(CONNECTION_CHECK_COMMAND);
try {
responseFuture.get(3000, TimeUnit.MILLISECONDS);
return responseFuture;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PinpointSocketException(e);
} catch (ExecutionException e) {
throw new PinpointSocketException(e.getCause());
} catch (TimeoutException e) {
throw new PinpointSocketException(e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package com.navercorp.pinpoint.collector.receiver.grpc;

import com.google.protobuf.Empty;
import com.google.protobuf.GeneratedMessageV3;
import com.navercorp.pinpoint.collector.cluster.GrpcAgentConnection;
import com.navercorp.pinpoint.collector.cluster.ProfilerClusterManager;
import com.navercorp.pinpoint.common.server.cluster.ClusterKey;
Expand All @@ -27,8 +29,6 @@
import com.navercorp.pinpoint.grpc.trace.PCmdRequest;
import com.navercorp.pinpoint.grpc.trace.PCmdResponse;
import com.navercorp.pinpoint.profiler.context.thrift.CommandGrpcToThriftMessageConverter;
import com.navercorp.pinpoint.rpc.DefaultFuture;
import com.navercorp.pinpoint.rpc.Future;
import com.navercorp.pinpoint.rpc.PinpointSocketException;
import com.navercorp.pinpoint.rpc.ResponseMessage;
import com.navercorp.pinpoint.rpc.client.RequestManager;
Expand All @@ -46,21 +46,19 @@
import com.navercorp.pinpoint.thrift.dto.command.TRouteResult;
import com.navercorp.pinpoint.thrift.io.CommandHeaderTBaseSerializerFactory;
import com.navercorp.pinpoint.thrift.util.SerializationUtils;

import com.google.protobuf.Empty;
import com.google.protobuf.GeneratedMessageV3;
import io.grpc.Status;
import io.grpc.StatusException;
import io.grpc.stub.StreamObserver;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.thrift.TBase;
import org.apache.thrift.TException;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;

import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;

/**
Expand Down Expand Up @@ -135,7 +133,7 @@ private SocketStateChangeResult toState(SocketStateCode socketStateCode) {
return result;
}

public Future<ResponseMessage> request(GeneratedMessageV3 message) {
public CompletableFuture<ResponseMessage> request(GeneratedMessageV3 message) {
if (!state.checkState(SocketStateCode.RUN_DUPLEX)) {
return createFailedFuture(new IllegalStateException("failed to request message. caused:illegal State"));
}
Expand All @@ -145,7 +143,7 @@ public Future<ResponseMessage> request(GeneratedMessageV3 message) {
return createFailedFuture(new PinpointSocketException(TRouteResult.NOT_SUPPORTED_REQUEST.name()));
}

DefaultFuture<ResponseMessage> future = requestManager.register(request.getRequestId());
CompletableFuture<ResponseMessage> future = requestManager.register(request.getRequestId());
requestObserver.onNext(request);
return future;
}
Expand Down Expand Up @@ -346,9 +344,9 @@ public void close(SocketStateCode toState) {
}

private void setFailMessageToFuture(int responseId, String message) {
DefaultFuture<ResponseMessage> future = requestManager.removeMessageFuture(responseId);
CompletableFuture<ResponseMessage> future = requestManager.removeMessageFuture(responseId);
if (future != null) {
future.setFailure(new PinpointSocketException(message));
future.completeExceptionally(new PinpointSocketException(message));
}
}

Expand All @@ -364,9 +362,9 @@ public ClusterKey getClusterKey() {
return clusterKey;
}

public Future<ResponseMessage> createFailedFuture(Exception failException) {
DefaultFuture<ResponseMessage> failedFuture = new DefaultFuture<>();
failedFuture.setFailure(failException);
public CompletableFuture<ResponseMessage> createFailedFuture(Exception failException) {
CompletableFuture<ResponseMessage> failedFuture = new CompletableFuture<>();
failedFuture.completeExceptionally(failException);
return failedFuture;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,18 @@

import com.navercorp.pinpoint.collector.cluster.GrpcAgentConnection;
import com.navercorp.pinpoint.collector.receiver.grpc.PinpointGrpcServer;
import com.navercorp.pinpoint.rpc.Future;
import com.navercorp.pinpoint.rpc.ResponseMessage;
import com.navercorp.pinpoint.thrift.dto.TResult;
import com.navercorp.pinpoint.thrift.dto.command.TCommandEcho;
import com.navercorp.pinpoint.thrift.io.TCommandType;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.function.Executable;
import org.mockito.Mockito;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

/**
* @author Taejin Koo
Expand All @@ -50,14 +52,18 @@ public void requestTest() {
supportCommand = grpcAgentConnection.isSupportCommand(TCommandType.ECHO.getBodyFactory().getObject());
Assertions.assertTrue(supportCommand);

Future<ResponseMessage> future = grpcAgentConnection.request(new TResult());
Assertions.assertFalse(future.isSuccess());
Assertions.assertNotNull(future.getCause());
final CompletableFuture<ResponseMessage> future = grpcAgentConnection.request(new TResult());

Assertions.assertThrows(Exception.class, new Executable() {
@Override
public void execute() throws Throwable {
future.get(3000, TimeUnit.MILLISECONDS);
}
});
TCommandEcho commandEcho = new TCommandEcho("hello");
// check to pass validation
future = grpcAgentConnection.request(commandEcho);
Assertions.assertNull(future);
final CompletableFuture<ResponseMessage> future2 = grpcAgentConnection.request(commandEcho);
Assertions.assertNull(future2);
}

@Test
Expand All @@ -67,7 +73,6 @@ public void equalsTest() {
List<Integer> supportCommandList = List.of(Short.toUnsignedInt(TCommandType.ECHO.getCode()));
GrpcAgentConnection grpcAgentConnection = new GrpcAgentConnection(mockGrpcServer1, supportCommandList);

Assertions.assertEquals(grpcAgentConnection, grpcAgentConnection);
Assertions.assertEquals(grpcAgentConnection, new GrpcAgentConnection(mockGrpcServer1, supportCommandList));

PinpointGrpcServer mockGrpcServer2 = Mockito.mock(PinpointGrpcServer.class);
Expand Down
Loading

0 comments on commit de1949e

Please sign in to comment.