Skip to content

Commit

Permalink
[#noissue] Cleanup ActiveMQClientIT
Browse files Browse the repository at this point in the history
  • Loading branch information
emeroad committed Sep 24, 2024
1 parent 05006bb commit a9f953b
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@
import com.navercorp.test.pinpoint.plugin.activemq.MessageReceiver;
import com.navercorp.test.pinpoint.plugin.activemq.PollingMessageReceiver;
import org.apache.activemq.ActiveMQMessageConsumer;
import org.apache.activemq.ActiveMQMessageProducer;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.MessageDispatchChannel;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
Expand All @@ -43,9 +45,8 @@
import javax.jms.MessageProducer;
import javax.jms.TextMessage;
import java.lang.reflect.Method;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import static com.navercorp.pinpoint.bootstrap.plugin.test.Expectations.annotation;
import static com.navercorp.pinpoint.bootstrap.plugin.test.Expectations.event;
Expand Down Expand Up @@ -77,8 +78,7 @@ public void testQueuePull() throws Exception {
final String testQueueName = "TestPullQueue";
final ActiveMQQueue testQueue = new ActiveMQQueue(testQueueName);
final String testMessage = "Hello World for Queue!";
final CountDownLatch consumerLatch = new CountDownLatch(1);
final AtomicReference<Exception> exception = new AtomicReference<>();
final CompletableFuture<Message> receivedMessage = new CompletableFuture<>();
// create producer
final ActiveMQSession producerSession = ActiveMQClientITHelper.createSession(getProducerBrokerName(), getProducerBrokerUrl());
final MessageProducer producer = new MessageProducerBuilder(producerSession, testQueue).waitTillStarted().build();
Expand All @@ -91,22 +91,20 @@ public void testQueuePull() throws Exception {
@Override
public void run() {
try {
messageReceiver.receiveMessage(1000L);
Message message = messageReceiver.receiveMessage(1000L);
receivedMessage.complete(message);
} catch (Exception e) {
exception.set(e);
} finally {
consumerLatch.countDown();
receivedMessage.completeExceptionally(e);
}
}
});

// When
producer.send(expectedTextMessage);
consumerThread.start();
consumerLatch.await(1L, TimeUnit.SECONDS);

// Then
assertNoConsumerError(exception.get());
assertNoConsumerError(receivedMessage);

PluginTestVerifier verifier = PluginTestVerifierHolder.getInstance();
verifier.printCache();
Expand Down Expand Up @@ -137,8 +135,9 @@ public void testQueuePush() throws Exception {
final ActiveMQQueue testQueue = new ActiveMQQueue(testQueueName);
final String testMessage = "Hello World for Queue!";
final MessagePrinter messagePrinter = new MessagePrinter();
final CountDownLatch consumerLatch = new CountDownLatch(1);
final AtomicReference<Exception> exception = new AtomicReference<>();

final CompletableFuture<Message> receivedMessage = new CompletableFuture<>();

// create producer
final ActiveMQSession producerSession = ActiveMQClientITHelper.createSession(getProducerBrokerName(), getProducerBrokerUrl());
final MessageProducer producer = new MessageProducerBuilder(producerSession, testQueue).waitTillStarted().build();
Expand All @@ -152,21 +151,19 @@ public void testQueuePush() throws Exception {
public void onMessage(Message message) {
try {
messagePrinter.printMessage(message);
receivedMessage.complete(message);
} catch (Exception e) {
exception.set(e);
} finally {
consumerLatch.countDown();
receivedMessage.completeExceptionally(e);
}
}
})
.build();

// When
producer.send(expectedTextMessage);
consumerLatch.await(1L, TimeUnit.SECONDS);

// Then
assertNoConsumerError(exception.get());
assertNoConsumerError(receivedMessage);

PluginTestVerifier verifier = PluginTestVerifierHolder.getInstance();
verifier.printCache();
Expand Down Expand Up @@ -234,8 +231,9 @@ public void testTopicPull() throws Exception {
final String testTopicName = "TestPullTopic";
final ActiveMQTopic testTopic = new ActiveMQTopic(testTopicName);
final String testMessage = "Hello World for Topic!";
final CountDownLatch consumerLatch = new CountDownLatch(2);
final AtomicReference<Exception> exception = new AtomicReference<>();
final CompletableFuture<Message> receivedMessage1 = new CompletableFuture<>();
final CompletableFuture<Message> receivedMessage2 = new CompletableFuture<>();

// create producer
final ActiveMQSession producerSession = ActiveMQClientITHelper.createSession(getProducerBrokerName(), getProducerBrokerUrl());
final MessageProducer producer = new MessageProducerBuilder(producerSession, testTopic).waitTillStarted().build();
Expand All @@ -248,11 +246,10 @@ public void testTopicPull() throws Exception {
@Override
public void run() {
try {
messageReceiver1.receiveMessage(1000L);
Message message = messageReceiver1.receiveMessage(1000L);
receivedMessage1.complete(message);
} catch (Exception e) {
exception.set(e);
} finally {
consumerLatch.countDown();
receivedMessage1.completeExceptionally(e);
}
}
});
Expand All @@ -263,11 +260,10 @@ public void run() {
@Override
public void run() {
try {
messageReceiver2.receiveMessage(1000L);
Message message = messageReceiver2.receiveMessage(1000L);
receivedMessage2.complete(message);
} catch (Exception e) {
exception.set(e);
} finally {
consumerLatch.countDown();
receivedMessage2.completeExceptionally(e);
}
}
});
Expand All @@ -276,10 +272,10 @@ public void run() {
producer.send(expectedTextMessage);
consumer1Thread.start();
consumer2Thread.start();
consumerLatch.await(1L, TimeUnit.SECONDS);

// Then
assertNoConsumerError(exception.get());
assertNoConsumerError(receivedMessage1);
assertNoConsumerError(receivedMessage2);

PluginTestVerifier verifier = PluginTestVerifierHolder.getInstance();
verifier.printCache();
Expand Down Expand Up @@ -313,8 +309,8 @@ public void testTopicPush() throws Exception {
final ActiveMQTopic testTopic = new ActiveMQTopic(testTopicName);
final String testMessage = "Hello World for Topic!";
final MessagePrinter messagePrinter = new MessagePrinter();
final CountDownLatch consumerLatch = new CountDownLatch(2);
final AtomicReference<Exception> exception = new AtomicReference<>();

final CompletableFuture<Message> receivedMessage = new CompletableFuture<>();
// create producer
final ActiveMQSession producerSession = ActiveMQClientITHelper.createSession(getProducerBrokerName(), getProducerBrokerUrl());
final MessageProducer producer = new MessageProducerBuilder(producerSession, testTopic).waitTillStarted().build();
Expand All @@ -325,10 +321,9 @@ public void testTopicPush() throws Exception {
public void onMessage(Message message) {
try {
messagePrinter.printMessage(message);
receivedMessage.complete(message);
} catch (Exception e) {
exception.set(e);
} finally {
consumerLatch.countDown();
receivedMessage.completeExceptionally(e);
}
}
};
Expand All @@ -345,10 +340,9 @@ public void onMessage(Message message) {

// When
producer.send(expectedTextMessage);
consumerLatch.await(1L, TimeUnit.SECONDS);

// Then
assertNoConsumerError(exception.get());
assertNoConsumerError(receivedMessage);

PluginTestVerifier verifier = PluginTestVerifierHolder.getInstance();
verifier.printCache();
Expand Down Expand Up @@ -428,7 +422,7 @@ public void testTopicPoll() throws Exception {
}

/**
* Verifies traces for when {@link org.apache.activemq.ActiveMQMessageProducer ActiveMQMessageProducer}
* Verifies traces for when {@link ActiveMQMessageProducer ActiveMQMessageProducer}
* sends the message.
*
* @param verifier verifier used to verify traces
Expand All @@ -454,7 +448,7 @@ private void verifyProducerSendEvent(PluginTestVerifier verifier, ActiveMQDestin
/**
* Verifies traces for when {@link ActiveMQMessageConsumer} consumes the message and dispatches for
* further processing (for example, calling {@link MessageListener} attached to the consumer directly, or adding
* the message to {@link org.apache.activemq.MessageDispatchChannel MessageDispatchChannel}.
* the message to {@link MessageDispatchChannel MessageDispatchChannel}.
*
* @param verifier verifier used to verify traces
* @param destination the destination from which the consumer is receiving the message
Expand Down Expand Up @@ -484,4 +478,17 @@ private String getMessageAsString(Message message) throws JMSException {
protected final void assertNoConsumerError(Exception consumerException) {
Assertions.assertNull(consumerException, "Failed with exception : " + consumerException);
}

protected final void assertNoConsumerError(CompletableFuture<Message> consumerException) {
Throwable exception = null;
try {
consumerException.get(1000, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
exception = e;
} catch (Throwable th) {
exception = th;
}
Assertions.assertNull(exception, "Failed with exception : " + exception);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,15 @@ public MessageReceiver(MessageConsumer messageConsumer) {
this.messageConsumer = Objects.requireNonNull(messageConsumer, "messageConsumer");
}

public void receiveMessage() throws JMSException {
public Message receiveMessage() throws JMSException {
Message message = messageConsumer.receive();
messageLogger.printMessage(message);
return message;
}

public void receiveMessage(long timeout) throws JMSException {
public Message receiveMessage(long timeout) throws JMSException {
Message message = messageConsumer.receive(timeout);
messageLogger.printMessage(message);
return message;
}
}
Loading

0 comments on commit a9f953b

Please sign in to comment.