diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/AmqpChannelProcessor.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/AmqpChannelProcessor.java index c009b3651bd0b..65dcff548b111 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/AmqpChannelProcessor.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/AmqpChannelProcessor.java @@ -18,6 +18,7 @@ import reactor.core.publisher.Operators; import java.time.Duration; +import java.util.Map; import java.util.Objects; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.RejectedExecutionException; @@ -26,6 +27,8 @@ import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.function.Function; +import static com.azure.core.amqp.implementation.AmqpLoggingUtils.createContextWithConnectionId; +import static com.azure.core.amqp.implementation.ClientConstants.ENTITY_PATH_KEY; import static com.azure.core.amqp.implementation.ClientConstants.INTERVAL_KEY; public class AmqpChannelProcessor extends Mono implements Processor, CoreSubscriber, Disposable { @@ -54,6 +57,10 @@ public class AmqpChannelProcessor extends Mono implements Processor, private volatile Disposable connectionSubscription; private volatile Disposable retrySubscription; + /** + * @deprecated Use constructor overload that does not take {@link ClientLogger} + */ + @Deprecated public AmqpChannelProcessor(String fullyQualifiedNamespace, String entityPath, Function> endpointStatesFunction, AmqpRetryPolicy retryPolicy, ClientLogger logger) { this.endpointStatesFunction = Objects.requireNonNull(endpointStatesFunction, @@ -64,6 +71,20 @@ public AmqpChannelProcessor(String fullyQualifiedNamespace, String entityPath, this.errorContext = new AmqpErrorContext(fullyQualifiedNamespace); } + + public AmqpChannelProcessor(String fullyQualifiedNamespace, String entityPath, String connectionId, + Function> endpointStatesFunction, AmqpRetryPolicy retryPolicy) { + this.endpointStatesFunction = Objects.requireNonNull(endpointStatesFunction, + "'endpointStates' cannot be null."); + this.retryPolicy = Objects.requireNonNull(retryPolicy, "'retryPolicy' cannot be null."); + + Map loggingContext = createContextWithConnectionId(connectionId); + loggingContext.put(ENTITY_PATH_KEY, Objects.requireNonNull(entityPath, "'entityPath' cannot be null.")); + this.logger = new ClientLogger(AmqpChannelProcessor.class, loggingContext); + + this.errorContext = new AmqpErrorContext(fullyQualifiedNamespace); + } + @Override public void onSubscribe(Subscription subscription) { if (Operators.setOnce(UPSTREAM, this, subscription)) { diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorConnection.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorConnection.java index c5902a03b22c8..25bf8f5e93098 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorConnection.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorConnection.java @@ -412,8 +412,8 @@ protected AmqpChannelProcessor createRequestResponseChan loggingContext.put(ENTITY_PATH_KEY, entityPath); return createChannel - .subscribeWith(new AmqpChannelProcessor<>(connectionId, entityPath, - channel -> channel.getEndpointStates(), retryPolicy, new ClientLogger(RequestResponseChannel.class, loggingContext))); + .subscribeWith(new AmqpChannelProcessor<>(getFullyQualifiedNamespace(), entityPath, connectionId, + channel -> channel.getEndpointStates(), retryPolicy)); } @Override diff --git a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/AmqpChannelProcessorTest.java b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/AmqpChannelProcessorTest.java index bc151714c88bc..bc42248afd7b4 100644 --- a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/AmqpChannelProcessorTest.java +++ b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/AmqpChannelProcessorTest.java @@ -8,8 +8,6 @@ import com.azure.core.amqp.exception.AmqpErrorCondition; import com.azure.core.amqp.exception.AmqpErrorContext; import com.azure.core.amqp.exception.AmqpException; -import com.azure.core.amqp.implementation.handler.LinkHandlerTest; -import com.azure.core.util.logging.ClientLogger; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; @@ -58,8 +56,8 @@ class AmqpChannelProcessorTest { void setup() { mocksCloseable = MockitoAnnotations.openMocks(this); - channelProcessor = new AmqpChannelProcessor<>("connection-test", "test-path", - TestObject::getStates, retryPolicy, new ClientLogger(LinkHandlerTest.class)); + channelProcessor = new AmqpChannelProcessor<>("namespace-test", "test-path", "connection-test", + TestObject::getStates, retryPolicy); } @AfterEach diff --git a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ManagementChannelTest.java b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ManagementChannelTest.java index 314a33eef0ad5..7f873f719ef6d 100644 --- a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ManagementChannelTest.java +++ b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ManagementChannelTest.java @@ -95,7 +95,7 @@ public void setup(TestInfo testInfo) { final AmqpChannelProcessor requestResponseMono = Mono.defer(() -> Mono.just(requestResponseChannel)).subscribeWith(new AmqpChannelProcessor<>( - "foo", "bar", RequestResponseChannel::getEndpointStates, retryPolicy, new ClientLogger(ManagementChannelTest.class))); + "foo", "bar", "baz", RequestResponseChannel::getEndpointStates, retryPolicy)); when(tokenManager.authorize()).thenReturn(Mono.just(1000L)); when(tokenManager.getAuthorizationResults()).thenReturn(tokenProviderResults.flux());