Skip to content

Commit

Permalink
review
Browse files Browse the repository at this point in the history
  • Loading branch information
lmolkova committed Dec 14, 2021
1 parent c619db3 commit f16278d
Show file tree
Hide file tree
Showing 9 changed files with 46 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.concurrent.atomic.AtomicReference;

import static com.azure.core.amqp.implementation.AmqpLoggingUtils.addSignalTypeAndResult;
import static com.azure.core.amqp.implementation.ClientConstants.INTERVAL_KEY;

/**
* Manages the re-authorization of the client to the token audience against the CBS node.
Expand Down Expand Up @@ -143,7 +144,8 @@ private Disposable scheduleRefreshTokenTask(Duration initialRefresh) {

logger.atError()
.addKeyValue("scopes", scopes)
.log("Error is transient. Rescheduling authorization task at interval {} ms", lastRefresh.toMillis(), amqpException);
.addKeyValue(INTERVAL_KEY, interval)
.log("Error is transient. Rescheduling authorization task.", amqpException);

durationSource.emitNext(lastRefresh, (signalType, emitResult) -> {
addSignalTypeAndResult(logger.atVerbose(), signalType, emitResult)
Expand All @@ -156,7 +158,8 @@ private Disposable scheduleRefreshTokenTask(Duration initialRefresh) {
.subscribe(interval -> {
logger.atVerbose()
.addKeyValue("scopes", scopes)
.log("Authorization successful. Refreshing token in {} ms.", interval);
.addKeyValue(INTERVAL_KEY, interval)
.log("Authorization successful. Refreshing token.");

authorizationResults.emitNext(AmqpResponseCode.ACCEPTED, (signalType, emitResult) -> {
addSignalTypeAndResult(logger.atVerbose(), signalType, emitResult)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,16 @@
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Function;

import static com.azure.core.amqp.implementation.ClientConstants.INTERVAL_KEY;

public class AmqpChannelProcessor<T> extends Mono<T> implements Processor<T, T>, CoreSubscriber<T>, Disposable {
@SuppressWarnings("rawtypes")
private static final AtomicReferenceFieldUpdater<AmqpChannelProcessor, Subscription> UPSTREAM =
AtomicReferenceFieldUpdater.newUpdater(AmqpChannelProcessor.class, Subscription.class,
"upstream");

private static final String RETRY_NUMBER_KEY = "retry";

private final ClientLogger logger;
private final AtomicBoolean isDisposed = new AtomicBoolean();
private final AtomicBoolean isRequested = new AtomicBoolean();
Expand All @@ -55,11 +59,7 @@ public AmqpChannelProcessor(String fullyQualifiedNamespace, String entityPath,
this.endpointStatesFunction = Objects.requireNonNull(endpointStatesFunction,
"'endpointStates' cannot be null.");
this.retryPolicy = Objects.requireNonNull(retryPolicy, "'retryPolicy' cannot be null.");

/*Map<String, Object> loggingContext = new HashMap<>();
loggingContext.put(FULLY_QUALIFIED_NAMESPACE_KEY, Objects.requireNonNull(fullyQualifiedNamespace, "'fullyQualifiedNamespace' cannot be null."));
loggingContext.put(ENTITY_PATH_KEY, Objects.requireNonNull(entityPath, "'entityPath' cannot be null."));*/
this.logger = Objects.requireNonNull(logger, "'retryPolicy' cannot be null.");
this.logger = Objects.requireNonNull(logger, "'logger' cannot be null.");

this.errorContext = new AmqpErrorContext(fullyQualifiedNamespace);
}
Expand Down Expand Up @@ -178,20 +178,29 @@ public void onError(Throwable throwable) {
return;
}

logger.info("Retry #{}. Transient error occurred. Retrying after {} ms.", attempts, retryInterval.toMillis(), throwable);
logger.atInfo()
.addKeyValue(RETRY_NUMBER_KEY, attempts)
.addKeyValue(INTERVAL_KEY, retryInterval.toMillis())
.log("Transient error occurred. Retrying.", throwable);

retrySubscription = Mono.delay(retryInterval).subscribe(i -> {
if (isDisposed()) {
logger.info("Retry #{}. Not requesting from upstream. Processor is disposed.", attempts);
logger.atInfo()
.addKeyValue(RETRY_NUMBER_KEY, attempts)
.log("Not requesting from upstream. Processor is disposed.");
} else {
logger.info("Retry #{}. Requesting from upstream.", attempts);
logger.atInfo()
.addKeyValue(RETRY_NUMBER_KEY, attempts)
.log("Requesting from upstream.");

requestUpstream();
isRetryPending.set(false);
}
});
} else {
logger.warning("Retry #{}. Retry attempts exhausted or exception was not retriable.", attempts, throwable);
logger.atWarning()
.addKeyValue(RETRY_NUMBER_KEY, attempts)
.log("Retry attempts exhausted or exception was not retriable.", throwable);

lastError = throwable;
isDisposed.set(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,18 @@
/**
* Utils for contextual logging.
*/
public class AmqpLoggingUtils {
public final class AmqpLoggingUtils {

private AmqpLoggingUtils() {
}

/**
* Creates logging context with connectionId.
*/
public static Map<String, Object> createContextWithConnectionId(String connectionId) {
Objects.requireNonNull(connectionId, "'connectionId' cannot be null.");
Map<String, Object> globalLoggingContext = new HashMap<>();
// caller should be able to add more context, please keep the map mutable.
Map<String, Object> globalLoggingContext = new HashMap<>(1);
globalLoggingContext.put(CONNECTION_ID_KEY, connectionId);

return globalLoggingContext;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public final class ClientConstants {
public static final String EMIT_RESULT_KEY = "emitResult";
public static final String SIGNAL_TYPE_KEY = "signalType";
public static final String HOSTNAME_KEY = "hostName";
public static final String INTERVAL_KEY = "interval_ms";

/**
* The default maximum allowable size, in bytes, for a batch to be sent.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,8 @@ public ReactorConnection(String connectionId, ConnectionOptions connectionOption
if (isDisposed.getAndSet(true)) {
logger.verbose("Connection was already disposed: Error occurred while connection was starting.", error);
} else {
closeAsync(new AmqpShutdownSignal(false, false, String.format(
"Error occurred while connection was starting. Error: %s", error))).subscribe();
closeAsync(new AmqpShutdownSignal(false, false,
"Error occurred while connection was starting. Error: " + error)).subscribe();
}
});

Expand Down Expand Up @@ -393,6 +393,8 @@ protected Mono<Connection> getReactorConnection() {
protected AmqpChannelProcessor<RequestResponseChannel> createRequestResponseChannel(String sessionName,
String linkName, String entityPath) {

Objects.requireNonNull(entityPath, "'entityPath' cannot be null.");

final Flux<RequestResponseChannel> createChannel = createSession(sessionName)
.cast(ReactorSession.class)
.map(reactorSession -> new RequestResponseChannel(this, getId(), getFullyQualifiedNamespace(), linkName,
Expand All @@ -407,7 +409,7 @@ protected AmqpChannelProcessor<RequestResponseChannel> createRequestResponseChan
.repeat();

Map<String, Object> loggingContext = createContextWithConnectionId(connectionId);
loggingContext.put(ENTITY_PATH_KEY, Objects.requireNonNull(entityPath, "'entityPath' cannot be null."));
loggingContext.put(ENTITY_PATH_KEY, entityPath);

return createChannel
.subscribeWith(new AmqpChannelProcessor<>(connectionId, entityPath,
Expand Down Expand Up @@ -453,7 +455,7 @@ Mono<Void> closeAsync(AmqpShutdownSignal shutdownSignal) {
try {
dispatcher.invoke(() -> closeConnectionWork());
} catch (IOException e) {
logger.warning("IOException while scheduling closeConnection work. Manually disposing", e);
logger.warning("IOException while scheduling closeConnection work. Manually disposing.", e);

closeConnectionWork();
} catch (RejectedExecutionException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ class ReactorExecutor implements AsyncCloseable {
this.timeout = Objects.requireNonNull(timeout, "'timeout' cannot be null.");
this.exceptionHandler = Objects.requireNonNull(exceptionHandler, "'exceptionHandler' cannot be null.");
this.hostname = Objects.requireNonNull(hostname, "'hostname' cannot be null.");
Objects.requireNonNull(connectionId, "'connectionId' cannot be null.");
this.logger = new ClientLogger(ReactorExecutor.class, createContextWithConnectionId(connectionId));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
*/
class ReactorSender implements AmqpSendLink, AsyncCloseable, AutoCloseable {
private static final String DELIVERY_TAG_KEY = "deliveryTag";
private static final String PENDING_SENDS_SIZE_KEY = "pending_sends_size";
private final String entityPath;
private final Sender sender;
private final SendLinkHandler handler;
Expand Down Expand Up @@ -677,7 +678,9 @@ private void handleError(Throwable error) {
if (isDisposed.getAndSet(true)) {
logger.verbose("This was already disposed. Dropping error.");
} else {
logger.verbose("Disposing of '{}' pending sends with error.", pendingSendsMap.size());
logger.atVerbose()
.addKeyValue(PENDING_SENDS_SIZE_KEY, () -> String.valueOf(pendingSendsMap.size()))
.log("Disposing pending sends with error.");
}

pendingSendsMap.forEach((key, value) -> value.error(error));
Expand All @@ -697,7 +700,9 @@ private void handleClose() {
if (isDisposed.getAndSet(true)) {
logger.verbose("This was already disposed.");
} else {
logger.verbose("Disposing of '{}' pending sends.", pendingSendsMap.size());
logger.atVerbose()
.addKeyValue(PENDING_SENDS_SIZE_KEY, () -> String.valueOf(pendingSendsMap.size()))
.log("Disposing pending sends.");
}

pendingSendsMap.forEach((key, value) -> value.error(new AmqpException(true, message, context)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,8 +237,7 @@ public Mono<Void> closeAsync() {
.timeout(retryOptions.getTryTimeout())
.onErrorResume(TimeoutException.class, error -> {
return Mono.fromRunnable(() -> {
logger.atInfo()
.log("Timed out waiting for RequestResponseChannel to complete closing. Manually closing.");
logger.info("Timed out waiting for RequestResponseChannel to complete closing. Manually closing.");

onTerminalState("SendLinkHandler");
onTerminalState("ReceiveLinkHandler");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
class ServiceBusReactorReceiverTest {
private static final String ENTITY_PATH = "queue-name";
private static final String LINK_NAME = "a-link-name";
private static final String CONNECTION_ID = "a-connection-id";

private final ClientLogger logger = new ClientLogger(ServiceBusReactorReceiver.class);
private final EmitterProcessor<EndpointState> endpointStates = EmitterProcessor.create();
Expand Down Expand Up @@ -110,6 +111,7 @@ void setup(TestInfo testInfo) throws IOException {
when(receiveLinkHandler.getEndpointStates()).thenReturn(endpointStates);

when(tokenManager.getAuthorizationResults()).thenReturn(Flux.create(sink -> sink.next(AmqpResponseCode.OK)));
when(receiveLinkHandler.getConnectionId()).thenReturn(CONNECTION_ID);

when(connection.getShutdownSignals()).thenReturn(Flux.never());

Expand Down

0 comments on commit f16278d

Please sign in to comment.