Skip to content

Commit

Permalink
[#11256] Improved atomicity of completion events
Browse files Browse the repository at this point in the history
  • Loading branch information
emeroad committed Aug 1, 2024
1 parent 2cc3ddb commit 0e3fd4e
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import io.grpc.Status;
import io.grpc.StatusException;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.ServerCallStreamObserver;
import io.grpc.stub.StreamObserver;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -58,7 +59,9 @@ public SpanService(DispatchHandler<GeneratedMessageV3, GeneratedMessageV3> dispa
}

@Override
public StreamObserver<PSpanMessage> sendSpan(final StreamObserver<Empty> responseObserver) {
public StreamObserver<PSpanMessage> sendSpan(final StreamObserver<Empty> responseStream) {
final ServerCallStreamObserver<Empty> responseObserver = (ServerCallStreamObserver<Empty>) responseStream;

StreamObserver<PSpanMessage> observer = new StreamObserver<>() {
@Override
public void onNext(PSpanMessage spanMessage) {
Expand All @@ -76,6 +79,7 @@ public void onNext(PSpanMessage spanMessage) {
if (isDebug) {
logger.debug("Found empty span message {}", MessageFormatUtils.debugLog(spanMessage));
}
onError(Status.INVALID_ARGUMENT.withDescription("Invalid Request").asException());
}
}

Expand All @@ -86,16 +90,25 @@ public void onError(Throwable throwable) {
Status status = Status.fromThrowable(throwable);
Metadata metadata = Status.trailersFromThrowable(throwable);
if (logger.isInfoEnabled()) {
logger.info("Failed to span stream, {} {} {}", header, status, metadata);
logger.info("onError: Failed to span stream, {} {} {}", header, status, metadata);
}
responseCompleted(responseObserver);
}

@Override
public void onCompleted() {
com.navercorp.pinpoint.grpc.Header header = ServerContext.getAgentInfo();
logger.info("onCompleted {}", header);

Empty empty = Empty.newBuilder().build();
responseCompleted(responseObserver);
}

private void responseCompleted(ServerCallStreamObserver<Empty> responseObserver) {
if (responseObserver.isCancelled()) {
logger.info("responseCompleted: ResponseObserver is cancelled");
return;
}
Empty empty = Empty.getDefaultInstance();
responseObserver.onNext(empty);
responseObserver.onCompleted();
}
Expand All @@ -109,18 +122,27 @@ private <T> Message<T> newMessage(T requestData, short serviceType) {
return new DefaultMessage<>(header, headerEntity, requestData);
}

private void send(final Message<? extends GeneratedMessageV3> message, StreamObserver<Empty> responseObserver) {
private void send(final Message<? extends GeneratedMessageV3> message, ServerCallStreamObserver<Empty> responseObserver) {
try {
ServerRequest<GeneratedMessageV3> request = (ServerRequest<GeneratedMessageV3>) serverRequestFactory.newServerRequest(message);
this.dispatchHandler.dispatchSendMessage(request);
} catch (Exception e) {
} catch (Throwable e) {
logger.warn("Failed to request. message={}", message, e);
if (e instanceof StatusException || e instanceof StatusRuntimeException) {
responseObserver.onError(e);
} else {
// Avoid detailed exception
responseObserver.onError(Status.INTERNAL.withDescription("Bad Request").asException());
}
onError(responseObserver, e);
}
}

private void onError(ServerCallStreamObserver<Empty> responseObserver, Throwable e) {
if (responseObserver.isCancelled()) {
logger.info("onError: ResponseObserver is cancelled");
return;
}
if (e instanceof StatusException || e instanceof StatusRuntimeException) {
responseObserver.onError(e);
} else {
// Avoid detailed exception
StatusException statusException = Status.INTERNAL.withDescription("Bad Request").asException();
responseObserver.onError(statusException);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import io.grpc.Status;
import io.grpc.StatusException;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.ServerCallStreamObserver;
import io.grpc.stub.StreamObserver;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -60,8 +61,9 @@ public StatService(DispatchHandler<GeneratedMessageV3, GeneratedMessageV3> dispa
}

@Override
public StreamObserver<PStatMessage> sendAgentStat(StreamObserver<Empty> responseObserver) {
StreamObserver<PStatMessage> observer = new StreamObserver<PStatMessage>() {
public StreamObserver<PStatMessage> sendAgentStat(StreamObserver<Empty> responseStream) {
final ServerCallStreamObserver<Empty> responseObserver = (ServerCallStreamObserver<Empty>) responseStream;
StreamObserver<PStatMessage> observer = new StreamObserver<>() {
@Override
public void onNext(PStatMessage statMessage) {
if (isDebug) {
Expand Down Expand Up @@ -89,16 +91,25 @@ public void onError(Throwable throwable) {
Status status = Status.fromThrowable(throwable);
Metadata metadata = Status.trailersFromThrowable(throwable);
if (logger.isInfoEnabled()) {
logger.info("Failed to stat stream, {} {}", status, metadata);
logger.info("onError: Failed to stat stream, {} {}", status, metadata);
}
responseCompleted(responseObserver);
}

@Override
public void onCompleted() {
com.navercorp.pinpoint.grpc.Header header = ServerContext.getAgentInfo();
logger.info("onCompleted {}", header);
responseCompleted(responseObserver);
}

responseObserver.onNext(Empty.newBuilder().build());
private void responseCompleted(ServerCallStreamObserver<Empty> responseObserver) {
if (responseObserver.isCancelled()) {
logger.info("responseCompleted: ResponseObserver is cancelled");
return;
}
Empty empty = Empty.getDefaultInstance();
responseObserver.onNext(empty);
responseObserver.onCompleted();
}
};
Expand All @@ -112,18 +123,27 @@ private <T> Message<T> newMessage(T requestData, short serviceType) {
return new DefaultMessage<>(header, headerEntity, requestData);
}

private void send(final Message<? extends GeneratedMessageV3> message, StreamObserver<Empty> responseObserver) {
private void send(final Message<? extends GeneratedMessageV3> message, ServerCallStreamObserver<Empty> responseObserver) {
try {
ServerRequest<GeneratedMessageV3> request = (ServerRequest<GeneratedMessageV3>) serverRequestFactory.newServerRequest(message);
this.dispatchHandler.dispatchSendMessage(request);
} catch (Exception e) {
logger.warn("Failed to request. message={}", message, e);
if (e instanceof StatusException || e instanceof StatusRuntimeException) {
responseObserver.onError(e);
} else {
// Avoid detailed exception
responseObserver.onError(Status.INTERNAL.withDescription("Bad Request").asException());
}
onError(responseObserver, e);
}
}

private void onError(ServerCallStreamObserver<Empty> responseObserver, Exception e) {
if (responseObserver.isCancelled()) {
logger.info("onError: ResponseObserver is cancelled");
return;
}
if (e instanceof StatusException || e instanceof StatusRuntimeException) {
responseObserver.onError(e);
} else {
// Avoid detailed exception
StatusException statusException = Status.INTERNAL.withDescription("Bad Request").asException();
responseObserver.onError(statusException);
}
}

Expand Down

0 comments on commit 0e3fd4e

Please sign in to comment.