-
Notifications
You must be signed in to change notification settings - Fork 714
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
jms: cleans up artemis code in attempts to deflake test #1394
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
This file was deleted.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,5 @@ | ||
/* | ||
* Copyright 2013-2023 The OpenZipkin Authors | ||
* Copyright 2013-2024 The OpenZipkin Authors | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except | ||
* in compliance with the License. You may obtain a copy of the License at | ||
|
@@ -40,7 +40,7 @@ | |
* {@link brave.jakarta.jms.ITTracingMessageConsumer} | ||
*/ | ||
class ITTracingJMSConsumer extends ITJms { | ||
@RegisterExtension ArtemisJmsExtension jms = new ArtemisJmsExtension(); | ||
@RegisterExtension JmsExtension jms = new JmsExtension(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the non-artemis one was dead code in this project |
||
|
||
JMSContext tracedContext; | ||
JMSProducer producer; | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,5 @@ | ||
/* | ||
* Copyright 2013-2023 The OpenZipkin Authors | ||
* Copyright 2013-2024 The OpenZipkin Authors | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except | ||
* in compliance with the License. You may obtain a copy of the License at | ||
|
@@ -16,6 +16,7 @@ | |
import jakarta.jms.BytesMessage; | ||
import jakarta.jms.Connection; | ||
import jakarta.jms.Destination; | ||
import jakarta.jms.JMSContext; | ||
import jakarta.jms.JMSException; | ||
import jakarta.jms.Message; | ||
import jakarta.jms.Queue; | ||
|
@@ -29,16 +30,28 @@ | |
import java.lang.reflect.Field; | ||
import java.lang.reflect.Method; | ||
import java.util.Optional; | ||
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; | ||
import java.util.concurrent.atomic.AtomicBoolean; | ||
import org.apache.activemq.artemis.api.core.SimpleString; | ||
import org.apache.activemq.artemis.api.core.TransportConfiguration; | ||
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl; | ||
import org.apache.activemq.artemis.core.remoting.impl.invm.InVMAcceptorFactory; | ||
import org.apache.activemq.artemis.core.server.embedded.EmbeddedActiveMQ; | ||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings; | ||
import org.apache.activemq.artemis.jms.client.ActiveMQJMSConnectionFactory; | ||
import org.apache.activemq.artemis.jms.client.ActiveMQMessage; | ||
import org.junit.jupiter.api.extension.AfterEachCallback; | ||
import org.junit.jupiter.api.extension.BeforeEachCallback; | ||
import org.junit.jupiter.api.extension.ExtensionContext; | ||
|
||
public abstract class JmsExtension implements BeforeEachCallback, AfterEachCallback { | ||
/** | ||
* ActiveMQ 6 supports JMS 2.0, but requires JDK 17. We have to build on JDK | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 was also aware of that but JDK-17 is indeed a hard requirement |
||
* 11, so use Artemis instead, despite it requiring Netty. | ||
* | ||
* <p>See https://issues.apache.org/jira/browse/AMQ-5736?focusedCommentId=16593091&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16593091 | ||
*/ | ||
public class JmsExtension implements BeforeEachCallback, AfterEachCallback { | ||
String testName; | ||
String destinationName, queueName, topicName; | ||
|
||
Connection connection; | ||
Session session; | ||
Destination destination; | ||
|
@@ -51,7 +64,56 @@ public abstract class JmsExtension implements BeforeEachCallback, AfterEachCallb | |
TopicSession topicSession; | ||
Topic topic; | ||
|
||
EmbeddedActiveMQ server = new EmbeddedActiveMQ(); | ||
ActiveMQJMSConnectionFactory factory; | ||
AtomicBoolean started = new AtomicBoolean(); | ||
|
||
JmsExtension() { | ||
factory = new ActiveMQJMSConnectionFactory("vm://0"); | ||
factory.setProducerMaxRate(1); // to allow tests to use production order | ||
} | ||
|
||
void maybeStartServer() throws Exception { | ||
if (started.getAndSet(true)) return; | ||
// Configuration values from EmbeddedActiveMQResource | ||
server.setConfiguration(new ConfigurationImpl().setName(testName) | ||
.setPersistenceEnabled(false) | ||
.setSecurityEnabled(false) | ||
.setJMXManagementEnabled(false) | ||
.addAcceptorConfiguration(new TransportConfiguration(InVMAcceptorFactory.class.getName())) | ||
.addAddressSetting("#", | ||
new AddressSettings().setDeadLetterAddress(SimpleString.toSimpleString("dla")) | ||
.setExpiryAddress(SimpleString.toSimpleString("expiry")))); | ||
server.start(); | ||
} | ||
|
||
JMSContext newContext() { | ||
return factory.createContext(JMSContext.AUTO_ACKNOWLEDGE); | ||
} | ||
|
||
Connection newConnection() throws Exception { | ||
maybeStartServer(); | ||
return factory.createConnection(); | ||
} | ||
|
||
QueueConnection newQueueConnection() throws Exception { | ||
maybeStartServer(); | ||
return factory.createQueueConnection(); | ||
} | ||
|
||
TopicConnection newTopicConnection() throws Exception { | ||
maybeStartServer(); | ||
return factory.createTopicConnection(); | ||
} | ||
|
||
void setReadOnlyProperties(Message message, boolean readOnlyProperties) { | ||
try { | ||
Field propertiesReadOnly = ActiveMQMessage.class.getDeclaredField("propertiesReadOnly"); | ||
propertiesReadOnly.setAccessible(true); | ||
propertiesReadOnly.set(message, readOnlyProperties); | ||
} catch (Exception e) { | ||
throw new AssertionError(e); | ||
} | ||
} | ||
|
||
TextMessage newMessage(String text) throws JMSException { | ||
|
@@ -64,15 +126,6 @@ BytesMessage newBytesMessage(String text) throws JMSException { | |
return message; | ||
} | ||
|
||
abstract void setReadOnlyProperties(Message message, boolean readOnlyProperties) | ||
throws JMSException; | ||
|
||
abstract Connection newConnection() throws JMSException, Exception; | ||
|
||
abstract QueueConnection newQueueConnection() throws JMSException, Exception; | ||
|
||
abstract TopicConnection newTopicConnection() throws JMSException, Exception; | ||
|
||
@Override public void beforeEach(ExtensionContext context) throws Exception { | ||
Optional<Method> testMethod = context.getTestMethod(); | ||
if (testMethod.isPresent()) { | ||
|
@@ -104,41 +157,16 @@ abstract void setReadOnlyProperties(Message message, boolean readOnlyProperties) | |
|
||
@Override public void afterEach(ExtensionContext context) throws Exception { | ||
try { | ||
session.close(); | ||
connection.close(); | ||
topicSession.close(); | ||
topicConnection.close(); | ||
queueSession.close(); | ||
queueConnection.close(); | ||
if (session != null) session.close(); | ||
if (connection != null) connection.close(); | ||
if (topicSession != null) topicSession.close(); | ||
if (topicConnection != null) topicConnection.close(); | ||
if (queueSession != null) queueSession.close(); | ||
if (queueConnection != null) queueConnection.close(); | ||
} catch (JMSException e) { | ||
throw new AssertionError(e); | ||
} | ||
} | ||
|
||
static class ActiveMQ extends JmsExtension { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this was dead code so I flattened the type. |
||
@Override Connection newConnection() throws JMSException { | ||
return new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false") | ||
.createConnection(); | ||
} | ||
|
||
@Override QueueConnection newQueueConnection() throws JMSException { | ||
return new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false") | ||
.createQueueConnection(); | ||
} | ||
|
||
@Override TopicConnection newTopicConnection() throws JMSException { | ||
return new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false") | ||
.createTopicConnection(); | ||
} | ||
|
||
@Override void setReadOnlyProperties(Message message, boolean readOnlyProperties) { | ||
try { | ||
Field propertiesReadOnly = ActiveMQMessage.class.getDeclaredField("propertiesReadOnly"); | ||
propertiesReadOnly.setAccessible(true); | ||
propertiesReadOnly.set(message, readOnlyProperties); | ||
} catch (Exception e) { | ||
throw new AssertionError(e); | ||
} | ||
} | ||
factory.close(); | ||
server.stop(); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry about that, it was definitely my mistake