Skip to content

Commit

Permalink
Service metrics (#13033)
Browse files Browse the repository at this point in the history
* ⚡ support service metric level

* ⚡ support service metric level

* ⚡ support service metric level

* ⚡ support service metric level

* ✅ add ServiceMetricsTest

* ✅ add ServiceMetricsTest

* ✅ add ServiceMetricsTest

* ⚡ servicelevel config init opt

* ⚡ servicelevel config init opt

* ⚡ servicelevel config init opt

* ⚡ servicelevel config init opt

* ⚡ servicelevel config init opt

---------

Co-authored-by: Albumen Kevin <jhq0812@gmail.com>
  • Loading branch information
songxiaosheng and AlbumenJ authored Sep 19, 2023
1 parent ade91c9 commit 575c339
Show file tree
Hide file tree
Showing 16 changed files with 134 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.dubbo.metrics.event.MetricsDispatcher;
import org.apache.dubbo.metrics.event.MetricsEventBus;
import org.apache.dubbo.metrics.event.RequestEvent;
import org.apache.dubbo.metrics.model.MethodMetric;
import org.apache.dubbo.rpc.BaseFilter;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
Expand All @@ -42,13 +43,15 @@ public class MetricsClusterFilter implements ClusterFilter, BaseFilter.Listener,
private DefaultMetricsCollector collector;
private String appName;
private MetricsDispatcher metricsDispatcher;
private boolean serviceLevel;

@Override
public void setApplicationModel(ApplicationModel applicationModel) {
this.applicationModel = applicationModel;
this.collector = applicationModel.getBeanFactory().getBean(DefaultMetricsCollector.class);
this.appName = applicationModel.tryGetApplicationName();
this.metricsDispatcher = applicationModel.getBeanFactory().getBean(MetricsDispatcher.class);
this.serviceLevel = MethodMetric.isServiceLevel(applicationModel);
}

@Override
Expand All @@ -73,7 +76,7 @@ private void handleMethodException(Throwable t, Invocation invocation) {
if (t instanceof RpcException) {
RpcException e = (RpcException) t;
if (e.isForbidden()) {
MetricsEventBus.publish(RequestEvent.toRequestErrorEvent(applicationModel, appName, metricsDispatcher, invocation, CONSUMER_SIDE, e.getCode()));
MetricsEventBus.publish(RequestEvent.toRequestErrorEvent(applicationModel, appName, metricsDispatcher, invocation, CONSUMER_SIDE, e.getCode(), serviceLevel));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,12 @@ public class MetricsConfig extends AbstractConfig {

private Boolean enableRpc;

/**
* The level of the metrics, the value can be "SERVICE", "METHOD", default is method.
*/
private String rpcLevel;


public MetricsConfig() {
}

Expand Down Expand Up @@ -134,6 +140,14 @@ public Boolean getEnableJvm() {
return enableJvm;
}

public String getRpcLevel() {
return rpcLevel;
}

public void setRpcLevel(String rpcLevel) {
this.rpcLevel = rpcLevel;
}

public void setEnableJvm(Boolean enableJvm) {
this.enableJvm = enableJvm;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.dubbo.metadata.ServiceNameMapping;
import org.apache.dubbo.metrics.event.MetricsEventBus;
import org.apache.dubbo.metrics.event.MetricsInitEvent;
import org.apache.dubbo.metrics.model.MethodMetric;
import org.apache.dubbo.registry.client.metadata.MetadataUtils;
import org.apache.dubbo.rpc.Exporter;
import org.apache.dubbo.rpc.Invoker;
Expand Down Expand Up @@ -556,11 +557,12 @@ private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> r
}

private void initServiceMethodMetrics(URL url) {
String [] methods = Optional.ofNullable(url.getParameter(METHODS_KEY)).map(i->i.split(",")).orElse( new String[]{});
Arrays.stream(methods).forEach( method-> {
RpcInvocation invocation = new RpcInvocation(url.getServiceKey(),url.getServiceModel(),method,interfaceName, url.getProtocolServiceKey(), null, null,null,null,null,null);
MetricsEventBus.publish(MetricsInitEvent.toMetricsInitEvent(application.getApplicationModel(),invocation));
});
String[] methods = Optional.ofNullable(url.getParameter(METHODS_KEY)).map(i -> i.split(",")).orElse(new String[]{});
boolean serviceLevel = MethodMetric.isServiceLevel(application.getApplicationModel());
Arrays.stream(methods).forEach(method -> {
RpcInvocation invocation = new RpcInvocation(url.getServiceKey(), url.getServiceModel(), method, interfaceName, url.getProtocolServiceKey(), null, null, null, null, null, null);
MetricsEventBus.publish(MetricsInitEvent.toMetricsInitEvent(application.getApplicationModel(), invocation, serviceLevel));
});
}

private void processServiceExecutor(URL url) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1114,6 +1114,11 @@
<xsd:documentation><![CDATA[ Enable record rpc metrics. ]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="rpc-level" type="xsd:string">
<xsd:annotation>
<xsd:documentation><![CDATA[ Is the metric for rpc requests at the METHOD level or SERVICE level ]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
</xsd:complexType>

<xsd:complexType name="tracingType">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,11 @@
* the key will not be displayed when exporting (to be optimized)
*/
public class MethodStatComposite extends AbstractMetricsExport {
private boolean serviceLevel;

public MethodStatComposite(ApplicationModel applicationModel) {
super(applicationModel);
this.serviceLevel = MethodMetric.isServiceLevel(getApplicationModel());
}

private final Map<MetricsKeyWrapper, Map<MethodMetric, AtomicLong>> methodNumStats = new ConcurrentHashMap<>();
Expand All @@ -59,7 +61,8 @@ public void initMethodKey(MetricsKeyWrapper wrapper, Invocation invocation) {
if (!methodNumStats.containsKey(wrapper)) {
return;
}
methodNumStats.get(wrapper).computeIfAbsent(new MethodMetric(getApplicationModel(), invocation), k -> new AtomicLong(0L));

methodNumStats.get(wrapper).computeIfAbsent(new MethodMetric(getApplicationModel(), invocation, serviceLevel), k -> new AtomicLong(0L));
}

public void incrementMethodKey(MetricsKeyWrapper wrapper, MethodMetric methodMetric, int size) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,11 @@
*/
@SuppressWarnings({"rawtypes", "unchecked"})
public class RtStatComposite extends AbstractMetricsExport {
private boolean serviceLevel;

public RtStatComposite(ApplicationModel applicationModel) {
super(applicationModel);
this.serviceLevel = MethodMetric.isServiceLevel(getApplicationModel());
}

private final Map<String, List<LongContainer<? extends Number>>> rtStats = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -168,7 +170,7 @@ private List<Action> calMethodRtActions(Invocation invocation, String registryOp
List<Action> actions;
actions = new LinkedList<>();
for (LongContainer container : rtStats.get(registryOpType)) {
MethodMetric key = new MethodMetric(getApplicationModel(), invocation);
MethodMetric key = new MethodMetric(getApplicationModel(), invocation, serviceLevel);
Number current = (Number) container.get(key);
if (current == null) {
container.putIfAbsent(key, container.getInitFunc().apply(key));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ public MetricsInitEvent(ApplicationModel source,TypeWrapper typeWrapper) {
super(source,typeWrapper);
}

public static MetricsInitEvent toMetricsInitEvent(ApplicationModel applicationModel, Invocation invocation) {
MethodMetric methodMetric = new MethodMetric(applicationModel, invocation);
public static MetricsInitEvent toMetricsInitEvent(ApplicationModel applicationModel, Invocation invocation, boolean serviceLevel) {
MethodMetric methodMetric = new MethodMetric(applicationModel, invocation, serviceLevel);
MetricsInitEvent initEvent = new MetricsInitEvent(applicationModel, METRIC_EVENT);
initEvent.putAttachment(MetricsConstants.INVOCATION, invocation);
initEvent.putAttachment(MetricsConstants.METHOD_METRICS, methodMetric);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,17 @@

package org.apache.dubbo.metrics.model;

import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.config.MetricsConfig;
import org.apache.dubbo.config.context.ConfigManager;
import org.apache.dubbo.metrics.model.key.MetricsLevel;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.model.ApplicationModel;
import org.apache.dubbo.rpc.support.RpcUtils;

import java.util.Map;
import java.util.Objects;
import java.util.Optional;

import static org.apache.dubbo.common.constants.MetricsConstants.TAG_GROUP_KEY;
import static org.apache.dubbo.common.constants.MetricsConstants.TAG_VERSION_KEY;
Expand All @@ -36,12 +41,31 @@ public class MethodMetric extends ServiceKeyMetric {
private String group;
private String version;

public MethodMetric(ApplicationModel applicationModel, Invocation invocation) {

public MethodMetric(ApplicationModel applicationModel, Invocation invocation, boolean serviceLevel) {
super(applicationModel, MetricsSupport.getInterfaceName(invocation));
this.methodName = RpcUtils.getMethodName(invocation);
this.side = MetricsSupport.getSide(invocation);
this.group = MetricsSupport.getGroup(invocation);
this.version = MetricsSupport.getVersion(invocation);
this.methodName = serviceLevel ? null : RpcUtils.getMethodName(invocation);
}


public static boolean isServiceLevel(ApplicationModel applicationModel) {
if(applicationModel == null){
return false;
}
ConfigManager applicationConfigManager = applicationModel.getApplicationConfigManager();
if (applicationConfigManager == null) {
return false;
}
Optional<MetricsConfig> metrics = applicationConfigManager.getMetrics();
if (!metrics.isPresent()) {
return false;
}
String rpcLevel = metrics.map(MetricsConfig::getRpcLevel).orElse(MetricsLevel.METHOD.name());
rpcLevel = StringUtils.isBlank(rpcLevel) ? MetricsLevel.METHOD.name() : rpcLevel;
return MetricsLevel.SERVICE.name().equalsIgnoreCase(rpcLevel);
}

public String getGroup() {
Expand Down Expand Up @@ -82,13 +106,13 @@ public void setSide(String side) {
@Override
public String toString() {
return "MethodMetric{" +
"applicationName='" + getApplicationName() + '\'' +
", side='" + side + '\'' +
", interfaceName='" + getServiceKey() + '\'' +
", methodName='" + methodName + '\'' +
", group='" + group + '\'' +
", version='" + version + '\'' +
'}';
"applicationName='" + getApplicationName() + '\'' +
", side='" + side + '\'' +
", interfaceName='" + getServiceKey() + '\'' +
", methodName='" + methodName + '\'' +
", group='" + group + '\'' +
", version='" + version + '\'' +
'}';
}

@Override
Expand All @@ -100,6 +124,7 @@ public boolean equals(Object o) {
}

private volatile int hashCode = 0;

@Override
public int hashCode() {
if (hashCode == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
* Aggregation metrics collector implementation of {@link MetricsCollector}.
* This collector only enabled when metrics aggregation config is enabled.
*/
public class AggregateMetricsCollector implements MetricsCollector<RequestEvent>{
public class AggregateMetricsCollector implements MetricsCollector<RequestEvent> {
private int bucketNum = DEFAULT_BUCKET_NUM;
private int timeWindowSeconds = DEFAULT_TIME_WINDOW_SECONDS;
private int qpsTimeWindowMillSeconds = DEFAULT_QPS_TIME_WINDOW_MILL_SECONDS;
Expand All @@ -76,6 +76,7 @@ public class AggregateMetricsCollector implements MetricsCollector<RequestEvent>

private final ConcurrentMap<MethodMetric, TimeWindowAggregator> rtAgr = new ConcurrentHashMap<>();

private boolean serviceLevel;

public AggregateMetricsCollector(ApplicationModel applicationModel) {
this.applicationModel = applicationModel;
Expand All @@ -96,6 +97,7 @@ public AggregateMetricsCollector(ApplicationModel applicationModel) {
this.enableRt = Optional.ofNullable(aggregation.getEnableRt()).orElse(true);
this.enableRequest = Optional.ofNullable(aggregation.getEnableRequest()).orElse(true);
}
this.serviceLevel = MethodMetric.isServiceLevel(applicationModel);
}
}

Expand Down Expand Up @@ -157,7 +159,7 @@ public void onEventError(RequestEvent event) {
}

private void onRTEvent(RequestEvent event) {
MethodMetric metric = new MethodMetric(applicationModel, event.getAttachmentValue(MetricsConstants.INVOCATION));
MethodMetric metric = new MethodMetric(applicationModel, event.getAttachmentValue(MetricsConstants.INVOCATION), serviceLevel);
long responseTime = event.getTimePair().calc();
if (enableRt) {
TimeWindowQuantile quantile = ConcurrentHashMapUtils.computeIfAbsent(rt, metric,
Expand All @@ -176,7 +178,7 @@ private void onRTEvent(RequestEvent event) {
private MethodMetric calcWindowCounter(RequestEvent event, MetricsKey targetKey) {
MetricsPlaceValue placeType = MetricsPlaceValue.of(event.getAttachmentValue(MetricsConstants.INVOCATION_SIDE), MetricsLevel.SERVICE);
MetricsKeyWrapper metricsKeyWrapper = new MetricsKeyWrapper(targetKey, placeType);
MethodMetric metric = new MethodMetric(applicationModel, event.getAttachmentValue(MetricsConstants.INVOCATION));
MethodMetric metric = new MethodMetric(applicationModel, event.getAttachmentValue(MetricsConstants.INVOCATION), serviceLevel);

ConcurrentMap<MethodMetric, TimeWindowCounter> counter = methodTypeCounter.computeIfAbsent(metricsKeyWrapper, k -> new ConcurrentHashMap<>());

Expand All @@ -189,7 +191,7 @@ private MethodMetric calcWindowCounter(RequestEvent event, MetricsKey targetKey)
@Override
public List<MetricSample> collect() {
List<MetricSample> list = new ArrayList<>();
if (!isCollectEnabled()){
if (!isCollectEnabled()) {
return list;
}
collectRequests(list);
Expand Down Expand Up @@ -266,7 +268,7 @@ private void registerListener() {

@Override
public void initMetrics(MetricsEvent event) {
MethodMetric metric = new MethodMetric(applicationModel, event.getAttachmentValue(MetricsConstants.INVOCATION));
MethodMetric metric = new MethodMetric(applicationModel, event.getAttachmentValue(MetricsConstants.INVOCATION), serviceLevel);
if (enableQps) {
initMethodMetric(event);
initQpsMetric(metric);
Expand All @@ -279,27 +281,27 @@ public void initMetrics(MetricsEvent event) {
}
}

public void initMethodMetric(MetricsEvent event){
INIT_AGG_METHOD_KEYS.stream().forEach(key->initWindowCounter(event,key));
public void initMethodMetric(MetricsEvent event) {
INIT_AGG_METHOD_KEYS.stream().forEach(key -> initWindowCounter(event, key));
}

public void initQpsMetric(MethodMetric metric){
public void initQpsMetric(MethodMetric metric) {
ConcurrentHashMapUtils.computeIfAbsent(qps, metric, methodMetric -> new TimeWindowCounter(bucketNum, timeWindowSeconds));
}

public void initRtMetric(MethodMetric metric){
public void initRtMetric(MethodMetric metric) {
ConcurrentHashMapUtils.computeIfAbsent(rt, metric, k -> new TimeWindowQuantile(DEFAULT_COMPRESSION, bucketNum, timeWindowSeconds));
}

public void initRtAgrMetric(MethodMetric metric){
public void initRtAgrMetric(MethodMetric metric) {
ConcurrentHashMapUtils.computeIfAbsent(rtAgr, metric, k -> new TimeWindowAggregator(bucketNum, timeWindowSeconds));
}

public void initWindowCounter(MetricsEvent event, MetricsKey targetKey){
public void initWindowCounter(MetricsEvent event, MetricsKey targetKey) {

MetricsKeyWrapper metricsKeyWrapper = new MetricsKeyWrapper(targetKey, MetricsPlaceValue.of(event.getAttachmentValue(MetricsConstants.INVOCATION_SIDE), MetricsLevel.SERVICE));

MethodMetric metric = new MethodMetric(applicationModel, event.getAttachmentValue(MetricsConstants.INVOCATION));
MethodMetric metric = new MethodMetric(applicationModel, event.getAttachmentValue(MetricsConstants.INVOCATION), serviceLevel);

ConcurrentMap<MethodMetric, TimeWindowCounter> counter = methodTypeCounter.computeIfAbsent(metricsKeyWrapper, k -> new ConcurrentHashMap<>());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ public class HistogramMetricsCollector extends AbstractMetricsListener<RequestEv

private static final Integer[] DEFAULT_BUCKETS_MS = new Integer[]{100, 300, 500, 1000, 3000, 5000, 10000};

private boolean serviceLevel;

public HistogramMetricsCollector(ApplicationModel applicationModel) {
this.applicationModel = applicationModel;

Expand All @@ -68,6 +70,7 @@ public HistogramMetricsCollector(ApplicationModel applicationModel) {
}

metricRegister = new HistogramMetricRegister(MetricsGlobalRegistry.getCompositeRegistry(applicationModel), histogram);
this.serviceLevel = MethodMetric.isServiceLevel(applicationModel);
}
}

Expand All @@ -92,7 +95,7 @@ public void onEventError(RequestEvent event) {

private void onRTEvent(RequestEvent event) {
if (metricRegister != null) {
MethodMetric metric = new MethodMetric(applicationModel, event.getAttachmentValue(MetricsConstants.INVOCATION));
MethodMetric metric = new MethodMetric(applicationModel, event.getAttachmentValue(MetricsConstants.INVOCATION), serviceLevel);
long responseTime = event.getTimePair().calc();

HistogramMetricSample sample = new HistogramMetricSample(MetricsKey.METRIC_RT_HISTOGRAM.getNameByType(metric.getSide()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ public RequestEvent(ApplicationModel applicationModel, String appName, MetricsDi

public static RequestEvent toRequestEvent(ApplicationModel applicationModel, String appName,
MetricsDispatcher metricsDispatcher, DefaultMetricsCollector collector,
Invocation invocation, String side) {
MethodMetric methodMetric = new MethodMetric(applicationModel, invocation);
Invocation invocation, String side, boolean serviceLevel) {
MethodMetric methodMetric = new MethodMetric(applicationModel, invocation, serviceLevel);
RequestEvent requestEvent = new RequestEvent(applicationModel, appName, metricsDispatcher, collector, REQUEST_EVENT);
requestEvent.putAttachment(MetricsConstants.INVOCATION, invocation);
requestEvent.putAttachment(MetricsConstants.METHOD_METRICS, methodMetric);
Expand All @@ -80,13 +80,13 @@ public void customAfterPost(Object postResult) {
/**
* Acts on MetricsClusterFilter to monitor exceptions that occur before request execution
*/
public static RequestEvent toRequestErrorEvent(ApplicationModel applicationModel, String appName, MetricsDispatcher metricsDispatcher, Invocation invocation, String side, int code) {
public static RequestEvent toRequestErrorEvent(ApplicationModel applicationModel, String appName, MetricsDispatcher metricsDispatcher, Invocation invocation, String side, int code, boolean serviceLevel) {
RequestEvent event = new RequestEvent(applicationModel, appName, metricsDispatcher, null, REQUEST_ERROR_EVENT);
event.putAttachment(ATTACHMENT_KEY_SERVICE, MetricsSupport.getInterfaceName(invocation));
event.putAttachment(MetricsConstants.INVOCATION_SIDE, side);
event.putAttachment(MetricsConstants.INVOCATION, invocation);
event.putAttachment(MetricsConstants.INVOCATION_REQUEST_ERROR, code);
event.putAttachment(MetricsConstants.METHOD_METRICS, new MethodMetric(applicationModel, invocation));
event.putAttachment(MetricsConstants.METHOD_METRICS, new MethodMetric(applicationModel, invocation, serviceLevel));
return event;
}

Expand Down
Loading

0 comments on commit 575c339

Please sign in to comment.