Skip to content

Commit

Permalink
Notify handler of all errors in RoutedBoltConnection
Browse files Browse the repository at this point in the history
  • Loading branch information
injectives committed Nov 21, 2024
1 parent c85bbf5 commit d57b2af
Showing 1 changed file with 20 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -183,14 +183,12 @@ public CompletionStage<BoltConnection> clear() {
@Override
public CompletionStage<Void> flush(ResponseHandler handler) {
return delegate.flush(new ResponseHandler() {
private Throwable error;
boolean notifyHandler = true;

@Override
public void onError(Throwable throwable) {
if (error == null) {
error = handledError(throwable);
handler.onError(error);
}
handler.onError(handledError(throwable, notifyHandler));
notifyHandler = false;
}

@Override
Expand Down Expand Up @@ -306,34 +304,36 @@ public boolean telemetrySupported() {
return delegate.telemetrySupported();
}

private Throwable handledError(Throwable receivedError) {
private Throwable handledError(Throwable receivedError, boolean notifyHandler) {
var error = FutureUtil.completionExceptionCause(receivedError);

if (error instanceof ServiceUnavailableException) {
return handledServiceUnavailableException(((ServiceUnavailableException) error));
} else if (error instanceof ClientException) {
return handledClientException(((ClientException) error));
} else if (error instanceof TransientException) {
return handledTransientException(((TransientException) error));
if (error instanceof ServiceUnavailableException exception) {
return handledServiceUnavailableException(exception, notifyHandler);
} else if (error instanceof ClientException exception) {
return handledClientException(exception, notifyHandler);
} else if (error instanceof TransientException exception) {
return handledTransientException(exception, notifyHandler);
} else {
return error;
}
}

private Throwable handledServiceUnavailableException(ServiceUnavailableException e) {
routingTableHandler.onConnectionFailure(serverAddress());
private Throwable handledServiceUnavailableException(ServiceUnavailableException e, boolean notifyHandler) {
if (notifyHandler) {
routingTableHandler.onConnectionFailure(serverAddress());
}
return new SessionExpiredException(format("Server at %s is no longer available", serverAddress()), e);
}

private Throwable handledTransientException(TransientException e) {
private Throwable handledTransientException(TransientException e, boolean notifyHandler) {
var errorCode = e.code();
if (Objects.equals(errorCode, "Neo.TransientError.General.DatabaseUnavailable")) {
if (Objects.equals(errorCode, "Neo.TransientError.General.DatabaseUnavailable") && notifyHandler) {
routingTableHandler.onConnectionFailure(serverAddress());
}
return e;
}

private Throwable handledClientException(ClientException e) {
private Throwable handledClientException(ClientException e, boolean notifyHandler) {
if (isFailureToWrite(e)) {
// The server is unaware of the session mode, so we have to implement this logic in the driver.
// In the future, we might be able to move this logic to the server.
Expand All @@ -349,7 +349,9 @@ private Throwable handledClientException(ClientException e) {
null);
}
case WRITE -> {
routingTableHandler.onWriteFailure(serverAddress());
if (notifyHandler) {
routingTableHandler.onWriteFailure(serverAddress());
}
return new SessionExpiredException(
format("Server at %s no longer accepts writes", serverAddress()));
}
Expand Down

0 comments on commit d57b2af

Please sign in to comment.