Skip to content

Commit

Permalink
[pinpoint-apm#11497] Improved atomicity
Browse files Browse the repository at this point in the history
  • Loading branch information
emeroad committed Oct 11, 2024
1 parent c968eca commit 08d85d7
Showing 1 changed file with 51 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,14 @@

package com.navercorp.pinpoint.profiler.sender.grpc;

import com.navercorp.pinpoint.grpc.MessageFormatUtils;
import com.navercorp.pinpoint.grpc.stream.ClientCallStateStreamObserver;
import com.navercorp.pinpoint.grpc.stream.StreamUtils;
import com.navercorp.pinpoint.grpc.trace.AgentGrpc;
import com.navercorp.pinpoint.grpc.trace.PPing;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.ClientResponseObserver;
import io.grpc.stub.StreamObserver;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand All @@ -43,11 +42,12 @@ public class PingStreamContext {
// for debug
private final StreamId streamId;

private final StreamObserver<PPing> requestObserver;
private ClientCallStateStreamObserver<PPing> requestStream;
private final PingClientResponseObserver responseObserver;
private final Reconnector reconnector;

private final ScheduledExecutorService retransmissionExecutor;
private volatile boolean closed = false;

public PingStreamContext(AgentGrpc.AgentStub agentStub,
Reconnector reconnector,
Expand All @@ -60,12 +60,16 @@ public PingStreamContext(AgentGrpc.AgentStub agentStub,
this.retransmissionExecutor = Objects.requireNonNull(retransmissionExecutor, "retransmissionExecutor");
// WARNING
this.responseObserver = new PingClientResponseObserver();
this.requestObserver = agentStub.pingSession(responseObserver);

agentStub.pingSession(responseObserver);
}

private PPing newPing() {
PPing.Builder builder = PPing.newBuilder();
return builder.build();
return PPing.getDefaultInstance();
}

public boolean isClosed() {
return closed;
}


Expand All @@ -74,8 +78,7 @@ private class PingClientResponseObserver implements ClientResponseObserver<PPing

@Override
public void onNext(PPing ping) {
logger.info("{} success:{}", streamId, MessageFormatUtils.debugLog(ping));

logger.info("Ping Response {}", streamId);
}


Expand All @@ -84,16 +87,29 @@ public void onError(Throwable t) {
final Status status = Status.fromThrowable(t);
Metadata metadata = Status.trailersFromThrowable(t);

logger.info("Failed to ping stream, streamId={}, {} {}", streamId, status, metadata);
logger.info("onError PingResponse {}, {} {}", streamId, status, metadata);

cancelPingScheduler();
PingStreamContext.this.reconnector.reconnect();
dispose();

if (requestStream.isRun()) {
StreamUtils.onCompleted(requestStream, (th) -> logger.info("PingStreamContext.onError", th));
}
}


@Override
public void onCompleted() {
logger.info("{} completed", streamId);
logger.info("onCompleted {}", streamId);

dispose();

if (requestStream.isRun()) {
StreamUtils.onCompleted(requestStream, (th) -> logger.info("PingStreamContext.onCompleted", th));
}
}

private void dispose() {
closed = true;
cancelPingScheduler();
PingStreamContext.this.reconnector.reconnect();
}
Expand All @@ -107,44 +123,60 @@ private void cancelPingScheduler() {
}
}

private void registerSchedulerFuture(ScheduledFuture<?> pingScheduler) {
synchronized (this) {
final ScheduledFuture<?> copy = this.pingScheduler;
if (copy != null) {
logger.info("registerSchedulerFuture : Cancel pingScheduler {}", streamId);
copy.cancel(false);
}
this.pingScheduler = pingScheduler;
}
}

@Override
public void beforeStart(final ClientCallStreamObserver<PPing> requestStream) {
public void beforeStart(final ClientCallStreamObserver<PPing> steram) {
requestStream = ClientCallStateStreamObserver.clientCall(steram);

requestStream.setOnReadyHandler(new Runnable() {
@Override
public void run() {
logger.info("{} onReady", streamId);
logger.info("onReadyHandler {}", streamId);
PingStreamContext.this.reconnector.reset();

final Runnable pingRunnable = new Runnable() {
@Override
public void run() {
PPing pPing = newPing();
if (requestStream.isReady()) {
if (logger.isTraceEnabled()) {
logger.trace("Send Ping {}", streamId);
}
requestStream.onNext(pPing);
} else {
logger.debug("{} ping fail. client is not ready", streamId);
logger.debug("Send Ping failed. isReady=false {}", streamId);
}
}
};

PingClientResponseObserver.this.pingScheduler = schedule(pingRunnable);
registerSchedulerFuture(schedule(pingRunnable));
}
});
}
}

private ScheduledFuture<?> schedule(Runnable command) {
try {
return retransmissionExecutor.scheduleAtFixedRate(command, 0, 1,TimeUnit.MINUTES);
return retransmissionExecutor.scheduleAtFixedRate(command, 0, 3,TimeUnit.SECONDS);
} catch (RejectedExecutionException e) {
logger.info("Ping scheduling failed");
return null;
}
}

public void close() {
logger.info("{} close()", streamId);
StreamUtils.onCompleted(this.requestObserver, (th) -> this.logger.info("PingStreamContext.close", th));
logger.info("close() {}", streamId);
StreamUtils.onCompleted(this.requestStream, (th) -> this.logger.info("PingStreamContext.close", th));
}

@Override
Expand Down

0 comments on commit 08d85d7

Please sign in to comment.