Skip to content

Commit

Permalink
[ISSUE #8486]Add more test coverage for BrokerMetricsManager (#8487)
Browse files Browse the repository at this point in the history
* Add more test case for BrokerMetricsManager. Includes:
- check the topic is retry or dlq topic
- check the group is system group or not
- check the topic and the group belongs to system or not

* Add more test case for BrokerMetricsManager. Check topic message type by request header.

* Add more test case for BrokerMetricsManager.

* Add more test case for BrokerMetricsManager.
  • Loading branch information
ziiyee authored Aug 7, 2024
1 parent f63a3ba commit cba8a64
Showing 1 changed file with 275 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,41 @@
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.common.AttributesBuilder;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.attribute.TopicMessageType;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.metrics.MetricsExporterType;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeader;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.junit.Test;

import java.io.File;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

import static org.assertj.core.api.Assertions.assertThat;

public class BrokerMetricsManagerTest {

@Test
public void testNewAttributesBuilder() {
Attributes attributes = BrokerMetricsManager.newAttributesBuilder().put("a", "b")
.build();
.build();
assertThat(attributes.get(AttributeKey.stringKey("a"))).isEqualTo("b");
}

@Test
public void testCustomizedAttributesBuilder() {
BrokerMetricsManager.attributesBuilderSupplier = () -> new AttributesBuilder() {
private AttributesBuilder attributesBuilder = Attributes.builder();

@Override
public Attributes build() {
return attributesBuilder.put("customized", "value").build();
Expand All @@ -61,8 +79,263 @@ public AttributesBuilder putAll(Attributes attributes) {
}
};
Attributes attributes = BrokerMetricsManager.newAttributesBuilder().put("a", "b")
.build();
.build();
assertThat(attributes.get(AttributeKey.stringKey("a"))).isEqualTo("b");
assertThat(attributes.get(AttributeKey.stringKey("customized"))).isEqualTo("value");
}


@Test
public void testIsRetryOrDlqTopicWithRetryTopic() {
String topic = MixAll.RETRY_GROUP_TOPIC_PREFIX + "TestTopic";
boolean result = BrokerMetricsManager.isRetryOrDlqTopic(topic);
assertThat(result).isTrue();
}

@Test
public void testIsRetryOrDlqTopicWithDlqTopic() {
String topic = MixAll.DLQ_GROUP_TOPIC_PREFIX + "TestTopic";
boolean result = BrokerMetricsManager.isRetryOrDlqTopic(topic);
assertThat(result).isTrue();
}

@Test
public void testIsRetryOrDlqTopicWithNonRetryOrDlqTopic() {
String topic = "NormalTopic";
boolean result = BrokerMetricsManager.isRetryOrDlqTopic(topic);
assertThat(result).isFalse();
}

@Test
public void testIsRetryOrDlqTopicWithEmptyTopic() {
String topic = "";
boolean result = BrokerMetricsManager.isRetryOrDlqTopic(topic);
assertThat(result).isFalse();
}

@Test
public void testIsRetryOrDlqTopicWithNullTopic() {
String topic = null;
boolean result = BrokerMetricsManager.isRetryOrDlqTopic(topic);
assertThat(result).isFalse();
}

@Test
public void testIsSystemGroup_SystemGroup_ReturnsTrue() {
String group = "FooGroup";
String systemGroup = MixAll.CID_RMQ_SYS_PREFIX + group;
boolean result = BrokerMetricsManager.isSystemGroup(systemGroup);
assertThat(result).isTrue();
}

@Test
public void testIsSystemGroup_NonSystemGroup_ReturnsFalse() {
String group = "FooGroup";
boolean result = BrokerMetricsManager.isSystemGroup(group);
assertThat(result).isFalse();
}

@Test
public void testIsSystemGroup_EmptyGroup_ReturnsFalse() {
String group = "";
boolean result = BrokerMetricsManager.isSystemGroup(group);
assertThat(result).isFalse();
}

@Test
public void testIsSystemGroup_NullGroup_ReturnsFalse() {
String group = null;
boolean result = BrokerMetricsManager.isSystemGroup(group);
assertThat(result).isFalse();
}

@Test
public void testIsSystem_SystemTopicOrSystemGroup_ReturnsTrue() {
String topic = "FooTopic";
String group = "FooGroup";
String systemTopic = TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC;
String systemGroup = MixAll.CID_RMQ_SYS_PREFIX + group;

boolean resultTopic = BrokerMetricsManager.isSystem(systemTopic, group);
assertThat(resultTopic).isTrue();

boolean resultGroup = BrokerMetricsManager.isSystem(topic, systemGroup);
assertThat(resultGroup).isTrue();
}

@Test
public void testIsSystem_NonSystemTopicAndGroup_ReturnsFalse() {
String topic = "FooTopic";
String group = "FooGroup";
boolean result = BrokerMetricsManager.isSystem(topic, group);
assertThat(result).isFalse();
}

@Test
public void testIsSystem_EmptyTopicAndGroup_ReturnsFalse() {
String topic = "";
String group = "";
boolean result = BrokerMetricsManager.isSystem(topic, group);
assertThat(result).isFalse();
}

@Test
public void testGetMessageTypeAsNormal() {
SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
requestHeader.setProperties("");

TopicMessageType result = BrokerMetricsManager.getMessageType(requestHeader);
assertThat(TopicMessageType.NORMAL).isEqualTo(result);
}

@Test
public void testGetMessageTypeAsTransaction() {
SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();

Map<String, String> map = new HashMap<>();
map.put(MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
requestHeader.setProperties(MessageDecoder.messageProperties2String(map));

TopicMessageType result = BrokerMetricsManager.getMessageType(requestHeader);
assertThat(TopicMessageType.TRANSACTION).isEqualTo(result);
}

@Test
public void testGetMessageTypeAsFifo() {
SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
Map<String, String> map = new HashMap<>();
map.put(MessageConst.PROPERTY_SHARDING_KEY, "shardingKey");
requestHeader.setProperties(MessageDecoder.messageProperties2String(map));

TopicMessageType result = BrokerMetricsManager.getMessageType(requestHeader);
assertThat(TopicMessageType.FIFO).isEqualTo(result);
}

@Test
public void testGetMessageTypeAsDelayLevel() {
SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
Map<String, String> map = new HashMap<>();
map.put(MessageConst.PROPERTY_DELAY_TIME_LEVEL, "1");
requestHeader.setProperties(MessageDecoder.messageProperties2String(map));

TopicMessageType result = BrokerMetricsManager.getMessageType(requestHeader);
assertThat(TopicMessageType.DELAY).isEqualTo(result);
}

@Test
public void testGetMessageTypeAsDeliverMS() {
SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
Map<String, String> map = new HashMap<>();
map.put(MessageConst.PROPERTY_TIMER_DELIVER_MS, "10");
requestHeader.setProperties(MessageDecoder.messageProperties2String(map));

TopicMessageType result = BrokerMetricsManager.getMessageType(requestHeader);
assertThat(TopicMessageType.DELAY).isEqualTo(result);
}

@Test
public void testGetMessageTypeAsDelaySEC() {
SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
Map<String, String> map = new HashMap<>();
map.put(MessageConst.PROPERTY_TIMER_DELAY_SEC, "1");
requestHeader.setProperties(MessageDecoder.messageProperties2String(map));

TopicMessageType result = BrokerMetricsManager.getMessageType(requestHeader);
assertThat(TopicMessageType.DELAY).isEqualTo(result);
}

@Test
public void testGetMessageTypeAsDelayMS() {
SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
Map<String, String> map = new HashMap<>();
map.put(MessageConst.PROPERTY_TIMER_DELAY_MS, "10");
requestHeader.setProperties(MessageDecoder.messageProperties2String(map));

TopicMessageType result = BrokerMetricsManager.getMessageType(requestHeader);
assertThat(TopicMessageType.DELAY).isEqualTo(result);
}

@Test
public void testGetMessageTypeWithUnknownProperty() {
SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
Map<String, String> map = new HashMap<>();
map.put("unknownProperty", "unknownValue");
requestHeader.setProperties(MessageDecoder.messageProperties2String(map));

TopicMessageType result = BrokerMetricsManager.getMessageType(requestHeader);
assertThat(TopicMessageType.NORMAL).isEqualTo(result);
}

@Test
public void testGetMessageTypeWithMultipleProperties() {
SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
Map<String, String> map = new HashMap<>();
map.put(MessageConst.PROPERTY_DELAY_TIME_LEVEL, "1");
map.put(MessageConst.PROPERTY_SHARDING_KEY, "shardingKey");
requestHeader.setProperties(MessageDecoder.messageProperties2String(map));

TopicMessageType result = BrokerMetricsManager.getMessageType(requestHeader);
assertThat(TopicMessageType.FIFO).isEqualTo(result);
}

@Test
public void testGetMessageTypeWithTransactionFlagButOtherPropertiesPresent() {
SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
Map<String, String> map = new HashMap<>();
map.put(MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
map.put(MessageConst.PROPERTY_SHARDING_KEY, "shardingKey");
requestHeader.setProperties(MessageDecoder.messageProperties2String(map));

TopicMessageType result = BrokerMetricsManager.getMessageType(requestHeader);
assertThat(TopicMessageType.TRANSACTION).isEqualTo(result);
}

@Test
public void testGetMessageTypeWithEmptyProperties() {
TopicMessageType result = BrokerMetricsManager.getMessageType(new SendMessageRequestHeader());
assertThat(TopicMessageType.NORMAL).isEqualTo(result);
}

@Test
public void testCreateMetricsManager() {
MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
String storePathRootDir = System.getProperty("java.io.tmpdir") + File.separator + "store-"
+ UUID.randomUUID();
messageStoreConfig.setStorePathRootDir(storePathRootDir);
BrokerConfig brokerConfig = new BrokerConfig();

NettyServerConfig nettyServerConfig = new NettyServerConfig();
nettyServerConfig.setListenPort(0);

BrokerController brokerController = new BrokerController(brokerConfig, nettyServerConfig,
new NettyClientConfig(), messageStoreConfig);

BrokerMetricsManager metricsManager = new BrokerMetricsManager(brokerController);

assertThat(metricsManager.getBrokerMeter()).isNull();
}

@Test
public void testCreateMetricsManagerLogType() throws CloneNotSupportedException {
BrokerConfig brokerConfig = new BrokerConfig();
brokerConfig.setMetricsExporterType(MetricsExporterType.LOG);
brokerConfig.setMetricsLabel("label1:value1;label2:value2");
brokerConfig.setMetricsOtelCardinalityLimit(1);

MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
String storePathRootDir = System.getProperty("java.io.tmpdir") + File.separator + "store-"
+ UUID.randomUUID();
messageStoreConfig.setStorePathRootDir(storePathRootDir);

NettyServerConfig nettyServerConfig = new NettyServerConfig();
nettyServerConfig.setListenPort(0);

BrokerController brokerController = new BrokerController(brokerConfig, nettyServerConfig,
new NettyClientConfig(), messageStoreConfig);
brokerController.initialize();

BrokerMetricsManager metricsManager = new BrokerMetricsManager(brokerController);

assertThat(metricsManager.getBrokerMeter()).isNotNull();
}
}

0 comments on commit cba8a64

Please sign in to comment.