Skip to content

Commit

Permalink
AmqpChannelProcessor add constructor overload that creates logger and…
Browse files Browse the repository at this point in the history
… deprecate old one
  • Loading branch information
lmolkova committed Dec 14, 2021
1 parent 5a18454 commit 6425523
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import reactor.core.publisher.Operators;

import java.time.Duration;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.RejectedExecutionException;
Expand All @@ -26,6 +27,8 @@
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Function;

import static com.azure.core.amqp.implementation.AmqpLoggingUtils.createContextWithConnectionId;
import static com.azure.core.amqp.implementation.ClientConstants.ENTITY_PATH_KEY;
import static com.azure.core.amqp.implementation.ClientConstants.INTERVAL_KEY;

public class AmqpChannelProcessor<T> extends Mono<T> implements Processor<T, T>, CoreSubscriber<T>, Disposable {
Expand Down Expand Up @@ -54,6 +57,10 @@ public class AmqpChannelProcessor<T> extends Mono<T> implements Processor<T, T>,
private volatile Disposable connectionSubscription;
private volatile Disposable retrySubscription;

/**
* @deprecated Use constructor overload that does not take {@link ClientLogger}
*/
@Deprecated
public AmqpChannelProcessor(String fullyQualifiedNamespace, String entityPath,
Function<T, Flux<AmqpEndpointState>> endpointStatesFunction, AmqpRetryPolicy retryPolicy, ClientLogger logger) {
this.endpointStatesFunction = Objects.requireNonNull(endpointStatesFunction,
Expand All @@ -64,6 +71,20 @@ public AmqpChannelProcessor(String fullyQualifiedNamespace, String entityPath,
this.errorContext = new AmqpErrorContext(fullyQualifiedNamespace);
}


public AmqpChannelProcessor(String fullyQualifiedNamespace, String entityPath, String connectionId,
Function<T, Flux<AmqpEndpointState>> endpointStatesFunction, AmqpRetryPolicy retryPolicy) {
this.endpointStatesFunction = Objects.requireNonNull(endpointStatesFunction,
"'endpointStates' cannot be null.");
this.retryPolicy = Objects.requireNonNull(retryPolicy, "'retryPolicy' cannot be null.");

Map<String, Object> loggingContext = createContextWithConnectionId(connectionId);
loggingContext.put(ENTITY_PATH_KEY, Objects.requireNonNull(entityPath, "'entityPath' cannot be null."));
this.logger = new ClientLogger(AmqpChannelProcessor.class, loggingContext);

this.errorContext = new AmqpErrorContext(fullyQualifiedNamespace);
}

@Override
public void onSubscribe(Subscription subscription) {
if (Operators.setOnce(UPSTREAM, this, subscription)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -412,8 +412,8 @@ protected AmqpChannelProcessor<RequestResponseChannel> createRequestResponseChan
loggingContext.put(ENTITY_PATH_KEY, entityPath);

return createChannel
.subscribeWith(new AmqpChannelProcessor<>(connectionId, entityPath,
channel -> channel.getEndpointStates(), retryPolicy, new ClientLogger(RequestResponseChannel.class, loggingContext)));
.subscribeWith(new AmqpChannelProcessor<>(getFullyQualifiedNamespace(), entityPath, connectionId,
channel -> channel.getEndpointStates(), retryPolicy));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@
import com.azure.core.amqp.exception.AmqpErrorCondition;
import com.azure.core.amqp.exception.AmqpErrorContext;
import com.azure.core.amqp.exception.AmqpException;
import com.azure.core.amqp.implementation.handler.LinkHandlerTest;
import com.azure.core.util.logging.ClientLogger;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -58,8 +56,8 @@ class AmqpChannelProcessorTest {
void setup() {
mocksCloseable = MockitoAnnotations.openMocks(this);

channelProcessor = new AmqpChannelProcessor<>("connection-test", "test-path",
TestObject::getStates, retryPolicy, new ClientLogger(LinkHandlerTest.class));
channelProcessor = new AmqpChannelProcessor<>("namespace-test", "test-path", "connection-test",
TestObject::getStates, retryPolicy);
}

@AfterEach
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public void setup(TestInfo testInfo) {

final AmqpChannelProcessor<RequestResponseChannel> requestResponseMono =
Mono.defer(() -> Mono.just(requestResponseChannel)).subscribeWith(new AmqpChannelProcessor<>(
"foo", "bar", RequestResponseChannel::getEndpointStates, retryPolicy, new ClientLogger(ManagementChannelTest.class)));
"foo", "bar", "baz", RequestResponseChannel::getEndpointStates, retryPolicy));

when(tokenManager.authorize()).thenReturn(Mono.just(1000L));
when(tokenManager.getAuthorizationResults()).thenReturn(tokenProviderResults.flux());
Expand Down

0 comments on commit 6425523

Please sign in to comment.