diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java index 02a7f65057b..507a856a4b7 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java @@ -620,4 +620,54 @@ public long getNetworkDequeues() { return destination.getDestinationStatistics().getNetworkDequeues().getCount(); } + @Override + public boolean isAdvancedMessageStatisticsEnabled() { + return destination.isAdvancedMessageStatisticsEnabled(); + } + + @Override + public void setAdvancedMessageStatisticsEnabled(boolean advancedMessageStatisticsEnabled) { + destination.setAdvancedMessageStatisticsEnabled(advancedMessageStatisticsEnabled); + } + + @Override + public long getEnqueuedMessageBrokerInTime() { + return destination.getDestinationStatistics().getEnqueuedMessageBrokerInTime().getValue(); + } + + @Override + public String getEnqueuedMessageClientId() { + return destination.getDestinationStatistics().getEnqueuedMessageClientID().getValue(); + } + + @Override + public String getEnqueuedMessageId() { + return destination.getDestinationStatistics().getEnqueuedMessageID().getValue(); + } + + @Override + public long getEnqueuedMessageTimestamp() { + return destination.getDestinationStatistics().getEnqueuedMessageTimestamp().getValue(); + } + + @Override + public long getDequeuedMessageBrokerInTime() { + return destination.getDestinationStatistics().getDequeuedMessageBrokerInTime().getValue(); + } + + @Override + public String getDequeuedMessageClientId() { + return destination.getDestinationStatistics().getDequeuedMessageClientID().getValue(); + } + + @Override + public String getDequeuedMessageId() { + return destination.getDestinationStatistics().getDequeuedMessageID().getValue(); + } + + @Override + public long getDequeuedMessageTimestamp() { + return destination.getDestinationStatistics().getDequeuedMessageTimestamp().getValue(); + } + } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java index 328ddb09f09..1e9c74c952f 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java @@ -493,4 +493,34 @@ public String sendTextMessageWithProperties(@MBeanInfo("properties") String prop @MBeanInfo("Number of messages acknowledged from the destination via network connection") long getNetworkDequeues(); + + @MBeanInfo("Query Advanced Message Statistics flag") + boolean isAdvancedMessageStatisticsEnabled(); + + @MBeanInfo("Toggle Advanced Message Statistics flag") + void setAdvancedMessageStatisticsEnabled(boolean advancedMessageStatisticsEnabled); + + @MBeanInfo("Broker in time (ms) of last enqueued message to the destination") + long getEnqueuedMessageBrokerInTime(); + + @MBeanInfo("ClientID of last enqueued message to the destination") + String getEnqueuedMessageClientId(); + + @MBeanInfo("MessageID of last enqueued message to the destination") + String getEnqueuedMessageId(); + + @MBeanInfo("Message timestamp in (ms) of last enqueued message to the destination") + long getEnqueuedMessageTimestamp(); + + @MBeanInfo("Broker in time (ms) of last dequeued message to the destination") + long getDequeuedMessageBrokerInTime(); + + @MBeanInfo("ClientID of last dequeued message to the destination") + String getDequeuedMessageClientId(); + + @MBeanInfo("MessageID of last dequeued message to the destination") + String getDequeuedMessageId(); + + @MBeanInfo("Message timestamp in (ms) of last dequeued message to the destination") + long getDequeuedMessageTimestamp(); } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java index e34f23a4dda..36bc07b474f 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java @@ -111,6 +111,7 @@ public abstract class BaseDestination implements Destination { private boolean disposed = false; private boolean doOptimzeMessageStorage = true; private boolean advancedNetworkStatisticsEnabled = false; + private boolean advancedMessageStatisticsEnabled = false; /* * percentage of in-flight messages above which optimize message store is disabled @@ -880,6 +881,16 @@ public void setAdvancedNetworkStatisticsEnabled(boolean advancedNetworkStatistic this.advancedNetworkStatisticsEnabled = advancedNetworkStatisticsEnabled; } + @Override + public boolean isAdvancedMessageStatisticsEnabled() { + return this.advancedMessageStatisticsEnabled; + } + + @Override + public void setAdvancedMessageStatisticsEnabled(boolean advancedMessageStatisticsEnabled) { + this.advancedMessageStatisticsEnabled = advancedMessageStatisticsEnabled; + } + @Override public abstract List getConsumers(); diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Destination.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Destination.java index 45e3de7b3cb..48b9ed139be 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Destination.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Destination.java @@ -264,4 +264,9 @@ public interface Destination extends Service, Task, Message.MessageDestination { void setAdvancedNetworkStatisticsEnabled(boolean advancedNetworkStatisticsEnabled); + // [AMQ-8463] + boolean isAdvancedMessageStatisticsEnabled(); + + void setAdvancedMessageStatisticsEnabled(boolean advancedMessageStatisticsEnabled); + } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java index 85ef367a771..1ab96560ac2 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java @@ -419,6 +419,16 @@ public void setAdvancedNetworkStatisticsEnabled(boolean advancedNetworkStatistic next.setAdvancedNetworkStatisticsEnabled(advancedNetworkStatisticsEnabled); } + @Override + public boolean isAdvancedMessageStatisticsEnabled() { + return next.isAdvancedMessageStatisticsEnabled(); + } + + @Override + public void setAdvancedMessageStatisticsEnabled(boolean advancedMessageStatisticsEnabled) { + next.setAdvancedMessageStatisticsEnabled(advancedMessageStatisticsEnabled); + } + public void deleteSubscription(ConnectionContext context, SubscriptionKey key) throws Exception { if (next instanceof DestinationFilter) { DestinationFilter filter = (DestinationFilter) next; diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java index dc6b17dfafc..df3445e56d9 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java @@ -50,6 +50,16 @@ public class DestinationStatistics extends StatsImpl { protected CountStatisticImpl networkEnqueues; protected CountStatisticImpl networkDequeues; + // [AMQ-8463] Advanced Message Statistics are optionally enabled + protected LongStatisticImpl enqueuedMessageBrokerInTime; + protected StringStatisticImpl enqueuedMessageClientID; + protected StringStatisticImpl enqueuedMessageID; + protected LongStatisticImpl enqueuedMessageTimestamp; + protected LongStatisticImpl dequeuedMessageBrokerInTime; + protected StringStatisticImpl dequeuedMessageClientID; + protected StringStatisticImpl dequeuedMessageID; + protected LongStatisticImpl dequeuedMessageTimestamp; + public DestinationStatistics() { enqueues = new CountStatisticImpl("enqueues", "The number of messages that have been sent to the destination"); @@ -76,6 +86,16 @@ public DestinationStatistics() { networkEnqueues = new CountStatisticImpl("networkEnqueues", "The number of messages that have been sent to the destination via network connection"); networkDequeues = new CountStatisticImpl("networkDequeues", "The number of messages that have been acknowledged from the destination via network connection"); + enqueuedMessageBrokerInTime = new LongStatisticImpl("enqueuedMessageBrokerInTime", "Broker in time (ms) of last enqueued message to the destination"); + enqueuedMessageClientID = new StringStatisticImpl("enqueuedMessageClientID", "ClientID of last enqueued message to the destination"); + enqueuedMessageID = new StringStatisticImpl("enqueuedMessageID", "MessageID of last enqueued message to the destination"); + enqueuedMessageTimestamp = new LongStatisticImpl("enqueuedMessageTimestamp", "Message timestamp of last enqueued message to the destination"); + + dequeuedMessageBrokerInTime = new LongStatisticImpl("dequeuedMessageBrokerInTime", "Broker in time (ms) of last dequeued message to the destination"); + dequeuedMessageClientID = new StringStatisticImpl("dequeuedMessageClientID", "ClientID of last dequeued message to the destination"); + dequeuedMessageID = new StringStatisticImpl("dequeuedMessageID", "MessageID of last dequeued message to the destination"); + dequeuedMessageTimestamp = new LongStatisticImpl("dequeuedMessageTimestamp", "Message timestamp of last dequeued message to the destination"); + addStatistic("enqueues", enqueues); addStatistic("dispatched", dispatched); addStatistic("dequeues", dequeues); @@ -94,6 +114,15 @@ public DestinationStatistics() { addStatistic("networkEnqueues", networkEnqueues); addStatistic("networkDequeues", networkDequeues); + + addStatistic("enqueuedMessageBrokerInTime", enqueuedMessageBrokerInTime); + addStatistic("enqueuedMessageClientID", enqueuedMessageClientID); + addStatistic("enqueuedMessageID", enqueuedMessageID); + addStatistic("enqueuedMessageTimestamp", enqueuedMessageTimestamp); + addStatistic("dequeuedMessageBrokerInTime", dequeuedMessageBrokerInTime); + addStatistic("dequeuedMessageClientID", dequeuedMessageClientID); + addStatistic("dequeuedMessageID", dequeuedMessageID); + addStatistic("dequeuedMessageTimestamp", dequeuedMessageTimestamp); } public CountStatisticImpl getEnqueues() { @@ -170,6 +199,38 @@ public CountStatisticImpl getNetworkDequeues() { return networkDequeues; } + public LongStatisticImpl getEnqueuedMessageBrokerInTime() { + return enqueuedMessageBrokerInTime; + } + + public StringStatisticImpl getEnqueuedMessageClientID() { + return enqueuedMessageClientID; + } + + public StringStatisticImpl getEnqueuedMessageID() { + return enqueuedMessageID; + } + + public LongStatisticImpl getEnqueuedMessageTimestamp() { + return enqueuedMessageTimestamp; + } + + public LongStatisticImpl getDequeuedMessageBrokerInTime() { + return dequeuedMessageBrokerInTime; + } + + public StringStatisticImpl getDequeuedMessageClientID() { + return dequeuedMessageClientID; + } + + public StringStatisticImpl getDequeuedMessageID() { + return dequeuedMessageID; + } + + public LongStatisticImpl getDequeuedMessageTimestamp() { + return dequeuedMessageTimestamp; + } + public void reset() { if (this.isDoReset()) { super.reset(); @@ -186,6 +247,14 @@ public void reset() { maxUncommittedExceededCount.reset(); networkEnqueues.reset(); networkDequeues.reset(); + enqueuedMessageBrokerInTime.reset(); + enqueuedMessageClientID.reset(); + enqueuedMessageID.reset(); + enqueuedMessageTimestamp.reset(); + dequeuedMessageBrokerInTime.reset(); + dequeuedMessageClientID.reset(); + dequeuedMessageID.reset(); + dequeuedMessageTimestamp.reset(); } } @@ -208,9 +277,20 @@ public void setEnabled(boolean enabled) { messageSize.setEnabled(enabled); maxUncommittedExceededCount.setEnabled(enabled); - // [AMQ-9437] Advanced Statistics + // [AMQ-9437] Advanced Network Statistics networkEnqueues.setEnabled(enabled); networkDequeues.setEnabled(enabled); + + // [AMQ-9437] Advanced Message Statistics + enqueuedMessageBrokerInTime.setEnabled(enabled); + enqueuedMessageClientID.setEnabled(enabled); + enqueuedMessageID.setEnabled(enabled); + enqueuedMessageTimestamp.setEnabled(enabled); + dequeuedMessageBrokerInTime.setEnabled(enabled); + dequeuedMessageClientID.setEnabled(enabled); + dequeuedMessageID.setEnabled(enabled); + dequeuedMessageTimestamp.setEnabled(enabled); + } public void setParent(DestinationStatistics parent) { @@ -233,6 +313,7 @@ public void setParent(DestinationStatistics parent) { maxUncommittedExceededCount.setParent(parent.maxUncommittedExceededCount); networkEnqueues.setParent(parent.networkEnqueues); networkDequeues.setParent(parent.networkDequeues); + // [AMQ-9437] Advanced Message Statistics do not parent. } else { enqueues.setParent(null); dispatched.setParent(null); @@ -252,6 +333,7 @@ public void setParent(DestinationStatistics parent) { maxUncommittedExceededCount.setParent(null); networkEnqueues.setParent(null); networkDequeues.setParent(null); + // [AMQ-9437] Advanced Message Statistics do not parent. } } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java index 0ed6763f7ba..279db17bf5d 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -1916,6 +1916,13 @@ private void dropMessage(ConnectionContext context, QueueMessageReference refere getDestinationStatistics().getDequeues().increment(); getDestinationStatistics().getMessages().decrement(); + if(isAdvancedMessageStatisticsEnabled()) { + getDestinationStatistics().getDequeuedMessageBrokerInTime().setValue(reference.getMessage().getBrokerInTime()); + getDestinationStatistics().getDequeuedMessageClientID().setValue(context.getClientId()); + getDestinationStatistics().getDequeuedMessageID().setValue(reference.getMessageId().getTextView()); + getDestinationStatistics().getDequeuedMessageTimestamp().setValue(reference.getMessage().getTimestamp()); + } + if(isAdvancedNetworkStatisticsEnabled() && context.getConnection() != null && context.getConnection().isNetworkConnection()) { getDestinationStatistics().getNetworkDequeues().increment(); } @@ -1975,6 +1982,13 @@ final void messageSent(final ConnectionContext context, final Message msg) throw destinationStatistics.getMessages().increment(); destinationStatistics.getMessageSize().addSize(msg.getSize()); + if(isAdvancedMessageStatisticsEnabled()) { + destinationStatistics.getEnqueuedMessageBrokerInTime().setValue(msg.getBrokerInTime()); + destinationStatistics.getEnqueuedMessageClientID().setValue(context.getClientId()); + destinationStatistics.getEnqueuedMessageID().setValue(msg.getMessageId().toString()); + destinationStatistics.getEnqueuedMessageTimestamp().setValue(msg.getTimestamp()); + } + if(isAdvancedNetworkStatisticsEnabled() && context.getConnection() != null && context.getConnection().isNetworkConnection()) { destinationStatistics.getNetworkEnqueues().increment(); } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java index a9e07874e05..a7773463dce 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java @@ -779,6 +779,13 @@ protected void dispatch(final ConnectionContext context, Message message) throws // destinationStatistics.getMessages().increment(); destinationStatistics.getEnqueues().increment(); + if(isAdvancedMessageStatisticsEnabled()) { + destinationStatistics.getEnqueuedMessageBrokerInTime().setValue(message.getBrokerInTime()); + destinationStatistics.getEnqueuedMessageClientID().setValue(context.getClientId()); + destinationStatistics.getEnqueuedMessageID().setValue(message.getMessageId().toString()); + destinationStatistics.getEnqueuedMessageTimestamp().setValue(message.getTimestamp()); + } + if(isAdvancedNetworkStatisticsEnabled() && context != null && context.isNetworkConnection()) { destinationStatistics.getNetworkEnqueues().increment(); } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java index e33f13b48ca..d3ceb34ee01 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java @@ -107,6 +107,8 @@ public class PolicyEntry extends DestinationMapEntry { private int maxDestinations = -1; private boolean useTopicSubscriptionInflightStats = true; private boolean advancedNetworkStatisticsEnabled = false; // [AMQ-9437] + private boolean advancedMessageStatisticsEnabled = false; // [AMQ-8463] + /* * percentage of in-flight messages above which optimize message store is disabled */ @@ -309,6 +311,9 @@ public void baseUpdate(BaseDestination destination, Set includedProperti if (isUpdate("advancedNetworkStatisticsEnabled", includedProperties)) { destination.setAdvancedNetworkStatisticsEnabled(isAdvancedNetworkStatisticsEnabled()); } + if (isUpdate("advancedMessageStatisticsEnabled", includedProperties)) { + destination.setAdvancedMessageStatisticsEnabled(isAdvancedMessageStatisticsEnabled()); + } } public void baseConfiguration(Broker broker, BaseDestination destination) { @@ -1187,4 +1192,12 @@ public boolean isAdvancedNetworkStatisticsEnabled() { public void setAdvancedNetworkStatisticsEnabled(boolean advancedNetworkStatisticsEnabled) { this.advancedNetworkStatisticsEnabled = advancedNetworkStatisticsEnabled; } + + public boolean isAdvancedMessageStatisticsEnabled() { + return this.advancedMessageStatisticsEnabled; + } + + public void setAdvancedMessageStatisticsEnabled(boolean advancedMessageStatisticsEnabled) { + this.advancedMessageStatisticsEnabled = advancedMessageStatisticsEnabled; + } } diff --git a/activemq-client/src/main/java/org/apache/activemq/management/LongStatistic.java b/activemq-client/src/main/java/org/apache/activemq/management/LongStatistic.java new file mode 100644 index 00000000000..8303784d2f2 --- /dev/null +++ b/activemq-client/src/main/java/org/apache/activemq/management/LongStatistic.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.activemq.management; + +/** + * A statistic to store a single long value that is not incremented + * + * Example: Store a timestamp value of a recent message + * + */ +public interface LongStatistic extends UnsampledStatistic { + public Long getValue(); +} diff --git a/activemq-client/src/main/java/org/apache/activemq/management/LongStatisticImpl.java b/activemq-client/src/main/java/org/apache/activemq/management/LongStatisticImpl.java new file mode 100644 index 00000000000..f2017fd2782 --- /dev/null +++ b/activemq-client/src/main/java/org/apache/activemq/management/LongStatisticImpl.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.management; + +/** + * A long statistic implementation + */ +public class LongStatisticImpl extends UnsampledStatisticImpl implements LongStatistic, Resettable { + + // Note: [AMQ-8463] Adding volatile to 'value' would be more accurate, but performance impact + private Long value = null; + + public LongStatisticImpl(String name, String description) { + this(name, "value", description); + } + + public LongStatisticImpl(String name, String unit, String description) { + super(name, unit, description); + } + + @Override + public void reset() { + if (isDoReset()) { + value = null; + } + } + + @Override + public Long getValue() { + return (value != null ? value : 0l); + } + + public void setValue(Long value) { + if (isEnabled()) { + this.value = value; + } + } + + protected void appendFieldDescription(StringBuffer buffer) { + buffer.append(" value: "); + buffer.append(value); + super.appendFieldDescription(buffer); + } +} diff --git a/activemq-client/src/main/java/org/apache/activemq/management/StatisticImpl.java b/activemq-client/src/main/java/org/apache/activemq/management/StatisticImpl.java index 1dbcc80c69f..3984d55e02f 100644 --- a/activemq-client/src/main/java/org/apache/activemq/management/StatisticImpl.java +++ b/activemq-client/src/main/java/org/apache/activemq/management/StatisticImpl.java @@ -25,11 +25,11 @@ public class StatisticImpl implements Statistic, Resettable { protected boolean enabled; - private String name; - private String unit; - private String description; - private long startTime; - private long lastSampleTime; + protected String name; + protected String unit; + protected String description; + protected long startTime; + protected long lastSampleTime; private boolean doReset = true; public StatisticImpl(String name, String unit, String description) { @@ -40,6 +40,14 @@ public StatisticImpl(String name, String unit, String description) { this.lastSampleTime = this.startTime; } + protected StatisticImpl(String name, String unit, String description, long startTime, long lastSampleTime) { + this.name = name; + this.unit = unit; + this.description = description; + this.startTime = startTime; + this.lastSampleTime = lastSampleTime; + } + public synchronized void reset() { if(isDoReset()) { this.startTime = System.currentTimeMillis(); diff --git a/activemq-client/src/main/java/org/apache/activemq/management/StringStatistic.java b/activemq-client/src/main/java/org/apache/activemq/management/StringStatistic.java new file mode 100644 index 00000000000..be829823e46 --- /dev/null +++ b/activemq-client/src/main/java/org/apache/activemq/management/StringStatistic.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.activemq.management; + +public interface StringStatistic extends UnsampledStatistic { + public String getValue(); +} diff --git a/activemq-client/src/main/java/org/apache/activemq/management/StringStatisticImpl.java b/activemq-client/src/main/java/org/apache/activemq/management/StringStatisticImpl.java new file mode 100644 index 00000000000..872f49bc085 --- /dev/null +++ b/activemq-client/src/main/java/org/apache/activemq/management/StringStatisticImpl.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.management; + +/** + * A string statistic implementation + */ +public class StringStatisticImpl extends UnsampledStatisticImpl implements StringStatistic { + + // Note: [AMQ-8463] Adding volatile to 'value' would be more accurate, but performance impact + private String value = null; + + public StringStatisticImpl(String name, String description) { + this(name, "value", description); + } + + public StringStatisticImpl(String name, String unit, String description) { + super(name, unit, description); + } + + @Override + public void reset() { + if (isDoReset()) { + value = null; + } + } + + @Override + public String getValue() { + return value; + } + + public void setValue(String value) { + if (isEnabled()) { + this.value = value; + } + } + + protected void appendFieldDescription(StringBuffer buffer) { + buffer.append(" value: "); + buffer.append(value); + super.appendFieldDescription(buffer); + } +} diff --git a/activemq-client/src/main/java/org/apache/activemq/management/UnsampledStatistic.java b/activemq-client/src/main/java/org/apache/activemq/management/UnsampledStatistic.java new file mode 100644 index 00000000000..42a6fbbbf8d --- /dev/null +++ b/activemq-client/src/main/java/org/apache/activemq/management/UnsampledStatistic.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.activemq.management; + +/** + * A marker interface for Statistics without + * sampleTime or or startTime. + */ +public interface UnsampledStatistic extends Statistic {} + diff --git a/activemq-client/src/main/java/org/apache/activemq/management/UnsampledStatisticImpl.java b/activemq-client/src/main/java/org/apache/activemq/management/UnsampledStatisticImpl.java new file mode 100644 index 00000000000..f9d5fc6fa96 --- /dev/null +++ b/activemq-client/src/main/java/org/apache/activemq/management/UnsampledStatisticImpl.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.management; + +/** + * Base class for a UnsampledStatistic implementation + * + * + */ +public abstract class UnsampledStatisticImpl extends StatisticImpl implements UnsampledStatistic { + + public UnsampledStatisticImpl(String name, String unit, String description) { + super(name, unit, description, 0l, 0l); + } + + @Override + public void reset() {} + + @Override + protected void updateSampleTime() {} + + @Override + public long getStartTime() { + return 0l; + } + + @Override + public long getLastSampleTime() { + return 0l; + } +} diff --git a/activemq-client/src/test/java/org/apache/activemq/management/UnsampledStatisticsTest.java b/activemq-client/src/test/java/org/apache/activemq/management/UnsampledStatisticsTest.java new file mode 100644 index 00000000000..cd2eae30eea --- /dev/null +++ b/activemq-client/src/test/java/org/apache/activemq/management/UnsampledStatisticsTest.java @@ -0,0 +1,36 @@ +package org.apache.activemq.management; + +import static org.junit.Assert.*; + +import org.junit.Test; + +public class UnsampledStatisticsTest { + + @Test + public void testUnsampledStatisticsEnabled() { + LongStatisticImpl longStatisticImpl = new LongStatisticImpl("longStat", "long", "A long statistic"); + longStatisticImpl.setEnabled(true); + longStatisticImpl.setValue(Long.MAX_VALUE); + LongStatistic longStatistic = longStatisticImpl; + + StringStatisticImpl stringStatisticImpl = new StringStatisticImpl("stringStat", "chars", "A string statistic"); + stringStatisticImpl.setEnabled(true); + stringStatisticImpl.setValue("Hello World!"); + StringStatistic stringStatistic = stringStatisticImpl; + + assertEquals("A long statistic", longStatistic.getDescription()); + assertEquals(Long.valueOf(0l), Long.valueOf(longStatistic.getLastSampleTime())); + assertEquals("longStat", longStatistic.getName()); + assertEquals(Long.valueOf(0l), Long.valueOf(longStatistic.getStartTime())); + assertEquals("long", longStatistic.getUnit()); + assertEquals(Long.valueOf(Long.MAX_VALUE), longStatistic.getValue()); + + assertEquals("A string statistic", stringStatistic.getDescription()); + assertEquals(Long.valueOf(0l), Long.valueOf(stringStatistic.getLastSampleTime())); + assertEquals("stringStat", stringStatistic.getName()); + assertEquals(Long.valueOf(0l), Long.valueOf(stringStatistic.getStartTime())); + assertEquals("chars", stringStatistic.getUnit()); + assertEquals("Hello World!", stringStatistic.getValue()); + } + +} diff --git a/activemq-runtime-config/src/test/java/org/apache/activemq/PolicyEntryTest.java b/activemq-runtime-config/src/test/java/org/apache/activemq/PolicyEntryTest.java index 53341c3a53c..5aa9e1df6bd 100644 --- a/activemq-runtime-config/src/test/java/org/apache/activemq/PolicyEntryTest.java +++ b/activemq-runtime-config/src/test/java/org/apache/activemq/PolicyEntryTest.java @@ -57,6 +57,18 @@ public void testModSendDuplicateFromStoreToDLQ() throws Exception { verifyBooleanField("AMQ.8397", "sendDuplicateFromStoreToDLQ", true); } + @Test + public void testModAdvancedMessageStatistics() throws Exception { + final String brokerConfig = configurationSeed + "-policy-ml-broker"; + applyNewConfig(brokerConfig, configurationSeed + "-policy-advancedMessageStatistics"); + startBroker(brokerConfig); + assertTrue("broker alive", brokerService.isStarted()); + + verifyBooleanField("AMQ.8463", "advancedMessageStatisticsEnabled", false); + applyNewConfig(brokerConfig, configurationSeed + "-policy-advancedMessageStatistics-mod", SLEEP); + verifyBooleanField("AMQ.8463", "advancedMessageStatisticsEnabled", true); + } + @Test public void testModAdvancedNetworkStatistics() throws Exception { final String brokerConfig = configurationSeed + "-policy-ml-broker"; @@ -133,6 +145,9 @@ private void verifyBooleanField(String dest, String fieldName, boolean value) th session.createConsumer(session.createQueue(dest)); switch(fieldName) { + case "advancedMessageStatisticsEnabled": + assertEquals(value, brokerService.getRegionBroker().getDestinationMap().get(new ActiveMQQueue(dest)).isAdvancedMessageStatisticsEnabled()); + break; case "advancedNetworkStatisticsEnabled": assertEquals(value, brokerService.getRegionBroker().getDestinationMap().get(new ActiveMQQueue(dest)).isAdvancedNetworkStatisticsEnabled()); break; diff --git a/activemq-runtime-config/src/test/resources/org/apache/activemq/policyEntryTest-policy-advancedMessageStatistics-mod.xml b/activemq-runtime-config/src/test/resources/org/apache/activemq/policyEntryTest-policy-advancedMessageStatistics-mod.xml new file mode 100644 index 00000000000..4f56b3185db --- /dev/null +++ b/activemq-runtime-config/src/test/resources/org/apache/activemq/policyEntryTest-policy-advancedMessageStatistics-mod.xml @@ -0,0 +1,36 @@ + + + + + + + + + + + + + + + + + diff --git a/activemq-runtime-config/src/test/resources/org/apache/activemq/policyEntryTest-policy-advancedMessageStatistics.xml b/activemq-runtime-config/src/test/resources/org/apache/activemq/policyEntryTest-policy-advancedMessageStatistics.xml new file mode 100644 index 00000000000..8b9166c778e --- /dev/null +++ b/activemq-runtime-config/src/test/resources/org/apache/activemq/policyEntryTest-policy-advancedMessageStatistics.xml @@ -0,0 +1,36 @@ + + + + + + + + + + + + + + + + +