Skip to content

Commit

Permalink
[#11158] Refactor GrpcDataSender
Browse files Browse the repository at this point in the history
  • Loading branch information
emeroad committed Jul 2, 2024
1 parent d47129e commit 0cc0f32
Show file tree
Hide file tree
Showing 7 changed files with 72 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,13 @@ public EnhancedDataSender<MetaDataType, ResponseMessage> get() {
final ChannelFactoryBuilder channelFactoryBuilder = newChannelFactoryBuilder(sslEnable, clientRetryEnable);

final ChannelFactory channelFactory = channelFactoryBuilder.build();

final int senderExecutorQueueSize = grpcTransportConfig.getMetadataSenderExecutorQueueSize();
if (clientRetryEnable) {
return new MetadataGrpcHedgingDataSender<>(collectorIp, collectorPort, messageConverter, channelFactory);
return new MetadataGrpcHedgingDataSender<>(collectorIp, collectorPort, senderExecutorQueueSize, messageConverter, channelFactory);
}

final int senderExecutorQueueSize = grpcTransportConfig.getMetadataSenderExecutorQueueSize();

final int retryMaxCount = grpcTransportConfig.getMetadataRetryMaxCount();
final int retryDelayMillis = grpcTransportConfig.getMetadataRetryDelayMillis();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public MetadataGrpcDataSender(String host, int port, int executorQueueSize,
ChannelFactory channelFactory, int retryMaxCount, int retryDelayMillis) {
super(host, port, executorQueueSize, messageConverter, channelFactory);

this.maxAttempts = getMaxAttempts(retryMaxCount);
this.maxAttempts = Math.max(retryMaxCount, 0);
this.retryDelayMillis = retryDelayMillis;
this.metadataStub = MetadataGrpc.newStub(managedChannel);

Expand All @@ -73,19 +73,12 @@ public boolean isSuccess(PResult response) {
}

@Override
public void scheduleNextRetry(GeneratedMessageV3 request, int remainingRetryCount) {
MetadataGrpcDataSender.this.scheduleNextRetry(request, remainingRetryCount);
public void scheduleNextRetry(GeneratedMessageV3 request, int retryCount) {
MetadataGrpcDataSender.this.scheduleNextRetry(request, retryCount);
}
};
}

private int getMaxAttempts(int retryMaxCount) {
if (retryMaxCount < 0) {
return 0;
}
return retryMaxCount;
}

private Timer newTimer(String name) {
ThreadFactory threadFactory = new PinpointThreadFactory(PinpointThreadFactory.DEFAULT_THREAD_NAME_PREFIX + name, true);
return new HashedWheelTimer(threadFactory, 100, TimeUnit.MILLISECONDS, 512, false, MAX_PENDING_TIMEOUTS);
Expand All @@ -109,6 +102,10 @@ public boolean send(T data) {

@Override
public boolean request(final T data) {
if (data == null) {
return true;
}

final Runnable convertAndRun = new Runnable() {
@Override
public void run() {
Expand All @@ -118,7 +115,7 @@ public void run() {
if (isDebug) {
logger.debug("Request metadata={}", MessageFormatUtils.debugLog(message));
}
request0(message, maxAttempts);
request0(message, 0);
} catch (Exception ex) {
logger.info("Failed to request metadata={}", data, ex);
}
Expand All @@ -127,33 +124,33 @@ public void run() {
try {
executor.execute(convertAndRun);
} catch (RejectedExecutionException reject) {
logger.info("Rejected metadata={}", data);
logger.info("Rejected metadata={}", data.getClass().getSimpleName());
return false;
}
return true;
}

// Request
private void request0(final GeneratedMessageV3 message, final int remainingRetryCount) {
private void request0(final GeneratedMessageV3 message, final int retryCount) {
if (message instanceof PSqlMetaData) {
final PSqlMetaData sqlMetaData = (PSqlMetaData) message;
final StreamObserver<PResult> responseObserver = newResponseStream(message, remainingRetryCount);
final StreamObserver<PResult> responseObserver = newResponseStream(message, retryCount);
this.metadataStub.requestSqlMetaData(sqlMetaData, responseObserver);
} else if (message instanceof PSqlUidMetaData) {
final PSqlUidMetaData sqlUidMetaData = (PSqlUidMetaData) message;
final StreamObserver<PResult> responseObserver = newResponseStream(message, remainingRetryCount);
final StreamObserver<PResult> responseObserver = newResponseStream(message, retryCount);
this.metadataStub.requestSqlUidMetaData(sqlUidMetaData, responseObserver);
} else if (message instanceof PApiMetaData) {
final PApiMetaData apiMetaData = (PApiMetaData) message;
final StreamObserver<PResult> responseObserver = newResponseStream(message, remainingRetryCount);
final StreamObserver<PResult> responseObserver = newResponseStream(message, retryCount);
this.metadataStub.requestApiMetaData(apiMetaData, responseObserver);
} else if (message instanceof PStringMetaData) {
final PStringMetaData stringMetaData = (PStringMetaData) message;
final StreamObserver<PResult> responseObserver = newResponseStream(message, remainingRetryCount);
final StreamObserver<PResult> responseObserver = newResponseStream(message, retryCount);
this.metadataStub.requestStringMetaData(stringMetaData, responseObserver);
} else if (message instanceof PExceptionMetaData) {
final PExceptionMetaData exceptionMetaData = (PExceptionMetaData) message;
final StreamObserver<PResult> responseObserver = newResponseStream(message, remainingRetryCount);
final StreamObserver<PResult> responseObserver = newResponseStream(message, retryCount);
this.metadataStub.requestExceptionMetaData(exceptionMetaData, responseObserver);
} else {
logger.warn("Unsupported message {}", MessageFormatUtils.debugLog(message));
Expand All @@ -165,22 +162,22 @@ private StreamObserver<PResult> newResponseStream(GeneratedMessageV3 message, in
}

// Retry
private void scheduleNextRetry(final GeneratedMessageV3 message, final int remainingRetryCount) {
private void scheduleNextRetry(final GeneratedMessageV3 message, final int retryCount) {
if (shutdown) {
if (isDebug) {
logger.debug("Request drop. Already shutdown request={}", MessageFormatUtils.debugLog(message));
logger.debug("Request drop. Already shutdown metadata={}", MessageFormatUtils.debugLog(message));
}
return;
}
if (remainingRetryCount <= 0) {
if (retryCount > maxAttempts) {
if (isDebug) {
logger.debug("Request drop. request={}, remainingRetryCount={}", MessageFormatUtils.debugLog(message), remainingRetryCount);
logger.debug("Request drop. metadata={}, retryCount={}", MessageFormatUtils.debugLog(message), retryCount);
}
return;
}

if (isDebug) {
logger.debug("Request retry. request={}, remainingRetryCount={}", MessageFormatUtils.debugLog(message), remainingRetryCount);
logger.debug("Request retry. metadata={}, retryCount={}", MessageFormatUtils.debugLog(message), retryCount);
}
final TimerTask timerTask = new TimerTask() {
@Override
Expand All @@ -191,7 +188,7 @@ public void run(Timeout timeout) throws Exception {
if (shutdown) {
return;
}
request0(message, remainingRetryCount);
request0(message, retryCount);
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@
package com.navercorp.pinpoint.profiler.sender.grpc;

import com.google.protobuf.GeneratedMessageV3;
import com.navercorp.pinpoint.common.profiler.concurrent.ExecutorFactory;
import com.navercorp.pinpoint.common.profiler.concurrent.PinpointThreadFactory;
import com.navercorp.pinpoint.common.profiler.message.EnhancedDataSender;
import com.navercorp.pinpoint.common.profiler.message.MessageConverter;
import com.navercorp.pinpoint.grpc.ExecutorUtils;
import com.navercorp.pinpoint.grpc.MessageFormatUtils;
import com.navercorp.pinpoint.grpc.client.ChannelFactory;
import com.navercorp.pinpoint.grpc.trace.MetadataGrpc;
Expand All @@ -31,6 +34,9 @@
import com.navercorp.pinpoint.io.ResponseMessage;
import io.grpc.stub.StreamObserver;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;

Expand All @@ -42,14 +48,23 @@ public class MetadataGrpcHedgingDataSender<T> extends AbstractGrpcDataSender<T>

private final AtomicLong requestIdGen = new AtomicLong(0);

public MetadataGrpcHedgingDataSender(String host, int port,
private final ExecutorService executor;

public MetadataGrpcHedgingDataSender(String host, int port, int queueSize,
MessageConverter<T, GeneratedMessageV3> messageConverter,
ChannelFactory channelFactory) {
super(host, port, messageConverter, channelFactory);

this.metadataStub = MetadataGrpc.newStub(managedChannel);
this.executor = newExecutorService(name + "-Executor", queueSize);
}

protected ExecutorService newExecutorService(String name, int senderExecutorQueueSize) {
ThreadFactory threadFactory = new PinpointThreadFactory(PinpointThreadFactory.DEFAULT_THREAD_NAME_PREFIX + name, true);
return ExecutorFactory.newFixedThreadPool(1, senderExecutorQueueSize, threadFactory);
}


// Unsupported Operation
@Override
public boolean request(T data, int retry) {
Expand All @@ -68,6 +83,25 @@ public boolean send(T data) {

@Override
public boolean request(final T data) {
Runnable sendTask = new Runnable() {
@Override
public void run() {
doRequest(data);
}
};

try {
this.executor.execute(sendTask);
} catch (RejectedExecutionException reject) {
if (tLogger.isWarnEnabled()) {
tLogger.warn("Rejected Metadata sendTask {}", tLogger.getCounter());
}
return false;
}
return true;
}

private boolean doRequest(T data) {
try {
final GeneratedMessageV3 message = messageConverter.toMessage(data);

Expand Down Expand Up @@ -111,6 +145,7 @@ public void stop() {
}
this.shutdown = true;

ExecutorUtils.shutdownExecutorService(name, executor);
super.releaseChannel();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,13 @@ public class RetryResponseStreamObserver<ReqT, ResT> implements StreamObserver<R
private final Logger logger;
private final RetryScheduler<ReqT, ResT> retryScheduler;
private final ReqT message;
private final int remainingRetryCount;
private final int retryCount;

public RetryResponseStreamObserver(Logger logger, RetryScheduler<ReqT, ResT> retryScheduler, ReqT message, int remainingRetryCount) {
public RetryResponseStreamObserver(Logger logger, RetryScheduler<ReqT, ResT> retryScheduler, ReqT message, int retryCount) {
this.logger = Objects.requireNonNull(logger, "logger");
this.retryScheduler = Objects.requireNonNull(retryScheduler, "retryScheduler");
this.message = Objects.requireNonNull(message, "message");
this.remainingRetryCount = remainingRetryCount;
this.retryCount = retryCount;
}

@Override
Expand All @@ -51,7 +51,7 @@ public void onNext(ResT response) {
} else {
// Retry
if (logger.isInfoEnabled()) {
logger.info("Request fail. request={}, result={}", logString(message), logString(response));
logger.info("Request failed. PResult.getSuccess() is false. request={}, result={}", logString(message), logString(response));
}
retryScheduler.scheduleNextRetry(message, nextRetryCount());
}
Expand All @@ -68,16 +68,15 @@ public void onError(Throwable throwable) {
}

// Retry
final int remainingRetryCount = nextRetryCount();
retryScheduler.scheduleNextRetry(message, remainingRetryCount);
retryScheduler.scheduleNextRetry(message, nextRetryCount());
}

@Override
public void onCompleted() {
}

private int nextRetryCount() {
return remainingRetryCount - 1;
return retryCount + 1;
}

private String logString(Object message) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,6 @@
*/
public interface RetryScheduler<ReqT, ResT> {
boolean isSuccess(ResT response);
void scheduleNextRetry(ReqT request, int remainingRetryCount);
void scheduleNextRetry(ReqT request, int retryCount);

}
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,6 @@ public void close() {
}
};

return new MetadataGrpcHedgingDataSender<>("localhost", 1234, converter, factory);
return new MetadataGrpcHedgingDataSender<>("localhost", 1234, 1, converter, factory);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,15 +78,15 @@ public MetadataClientMock(final String host, final int port, final boolean agent
this.channel = channelFactory.build(host, port);

this.metadataStub = MetadataGrpc.newStub(channel);
this.retryScheduler = new RetryScheduler<GeneratedMessageV3, PResult>() {
this.retryScheduler = new RetryScheduler<>() {
@Override
public boolean isSuccess(PResult response) {
return response.getSuccess();
}

@Override
public void scheduleNextRetry(GeneratedMessageV3 request, int remainingRetryCount) {
MetadataClientMock.this.scheduleNextRetry(request, remainingRetryCount);
public void scheduleNextRetry(GeneratedMessageV3 request, int retryCount) {
MetadataClientMock.this.scheduleNextRetry(request, retryCount);
}
};
}
Expand Down

0 comments on commit 0cc0f32

Please sign in to comment.