Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix metrics unable to retrieve lazy inited executor status #14348

Merged
merged 3 commits into from
Jun 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,6 @@ public interface DataStore {
void put(String componentName, String key, Object value);

void remove(String componentName, String key);

default void addListener(DataStoreUpdateListener dataStoreUpdateListener) {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.common.store;

public interface DataStoreUpdateListener {
void onUpdate(String componentName, String key, Object value);
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,25 @@
*/
package org.apache.dubbo.common.store.support;

import org.apache.dubbo.common.constants.LoggerCodeConstants;
import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.store.DataStore;
import org.apache.dubbo.common.store.DataStoreUpdateListener;
import org.apache.dubbo.common.utils.ConcurrentHashMapUtils;
import org.apache.dubbo.common.utils.ConcurrentHashSet;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

public class SimpleDataStore implements DataStore {
private static final ErrorTypeAwareLogger logger = LoggerFactory.getErrorTypeAwareLogger(SimpleDataStore.class);

// <component name or id, <data-name, data-value>>
private final ConcurrentMap<String, ConcurrentMap<String, Object>> data = new ConcurrentHashMap<>();
private final ConcurrentHashSet<DataStoreUpdateListener> listeners = new ConcurrentHashSet<>();

@Override
public Map<String, Object> get(String componentName) {
Expand All @@ -52,6 +59,7 @@ public void put(String componentName, String key, Object value) {
Map<String, Object> componentData =
ConcurrentHashMapUtils.computeIfAbsent(data, componentName, k -> new ConcurrentHashMap<>());
componentData.put(key, value);
notifyListeners(componentName, key, value);
}

@Override
Expand All @@ -60,5 +68,27 @@ public void remove(String componentName, String key) {
return;
}
data.get(componentName).remove(key);
notifyListeners(componentName, key, null);
}

@Override
public void addListener(DataStoreUpdateListener dataStoreUpdateListener) {
listeners.add(dataStoreUpdateListener);
}

private void notifyListeners(String componentName, String key, Object value) {
for (DataStoreUpdateListener listener : listeners) {
try {
listener.onUpdate(componentName, key, value);
} catch (Throwable t) {
logger.warn(
AlbumenJ marked this conversation as resolved.
Show resolved Hide resolved
LoggerCodeConstants.INTERNAL_ERROR,
"",
"",
"Failed to notify data store update listener. " + "ComponentName: " + componentName + " Key: "
+ key,
t);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,11 @@ public interface Constants {

String SERVER_THREAD_POOL_NAME = "DubboServerHandler";

String SERVER_THREAD_POOL_PREFIX = SERVER_THREAD_POOL_NAME + "-";

String CLIENT_THREAD_POOL_NAME = "DubboClientHandler";

String CLIENT_THREAD_POOL_PREFIX = CLIENT_THREAD_POOL_NAME + "-";

String REST_PROTOCOL = "rest";
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,13 @@
*/
package org.apache.dubbo.common.store.support;

import org.apache.dubbo.common.store.DataStoreUpdateListener;

import java.util.Map;

import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
Expand Down Expand Up @@ -57,4 +61,30 @@ void testGetComponent() throws Exception {
dataStore.remove("component", "key");
assertNotEquals(map, dataStore.get("component"));
}

@Test
void testNotify() {
DataStoreUpdateListener listener = Mockito.mock(DataStoreUpdateListener.class);
dataStore.addListener(listener);

ArgumentCaptor<String> componentNameCaptor = ArgumentCaptor.forClass(String.class);
ArgumentCaptor<String> keyCaptor = ArgumentCaptor.forClass(String.class);
ArgumentCaptor<Object> valueCaptor = ArgumentCaptor.forClass(Object.class);

dataStore.put("name", "key", "1");
Mockito.verify(listener).onUpdate(componentNameCaptor.capture(), keyCaptor.capture(), valueCaptor.capture());
assertEquals("name", componentNameCaptor.getValue());
assertEquals("key", keyCaptor.getValue());
assertEquals("1", valueCaptor.getValue());

dataStore.remove("name", "key");
Mockito.verify(listener, Mockito.times(2))
.onUpdate(componentNameCaptor.capture(), keyCaptor.capture(), valueCaptor.capture());
assertEquals("name", componentNameCaptor.getValue());
assertEquals("key", keyCaptor.getValue());
assertNull(valueCaptor.getValue());

dataStore.remove("name2", "key");
Mockito.verify(listener, Mockito.times(0)).onUpdate("name2", "key", null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.store.DataStore;
import org.apache.dubbo.common.store.DataStoreUpdateListener;
import org.apache.dubbo.common.threadpool.manager.FrameworkExecutorRepository;
import org.apache.dubbo.common.threadpool.support.AbortPolicyWithReport;
import org.apache.dubbo.common.utils.ConcurrentHashMapUtils;
Expand All @@ -42,11 +43,12 @@
import static org.apache.dubbo.common.constants.CommonConstants.CONSUMER_SHARED_EXECUTOR_SERVICE_COMPONENT_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.EXECUTOR_SERVICE_COMPONENT_KEY;
import static org.apache.dubbo.common.constants.LoggerCodeConstants.COMMON_METRICS_COLLECTOR_EXCEPTION;
import static org.apache.dubbo.config.Constants.CLIENT_THREAD_POOL_NAME;
import static org.apache.dubbo.config.Constants.CLIENT_THREAD_POOL_PREFIX;
import static org.apache.dubbo.config.Constants.SERVER_THREAD_POOL_NAME;
import static org.apache.dubbo.config.Constants.SERVER_THREAD_POOL_PREFIX;
import static org.apache.dubbo.metrics.model.MetricsCategory.THREAD_POOL;

public class ThreadPoolMetricsSampler implements MetricsSampler {
public class ThreadPoolMetricsSampler implements MetricsSampler, DataStoreUpdateListener {

private final ErrorTypeAwareLogger logger = LoggerFactory.getErrorTypeAwareLogger(ThreadPoolMetricsSampler.class);

Expand All @@ -61,14 +63,28 @@ public ThreadPoolMetricsSampler(DefaultMetricsCollector collector) {
this.collector = collector;
}

@Override
public void onUpdate(String componentName, String key, Object value) {
if (EXECUTOR_SERVICE_COMPONENT_KEY.equals(componentName)) {
if (value instanceof ThreadPoolExecutor) {
addExecutors(SERVER_THREAD_POOL_PREFIX + key, (ThreadPoolExecutor) value);
}
} else if (CONSUMER_SHARED_EXECUTOR_SERVICE_COMPONENT_KEY.equals(componentName)) {
if (value instanceof ThreadPoolExecutor) {
addExecutors(CLIENT_THREAD_POOL_PREFIX + key, (ThreadPoolExecutor) value);
}
}
}

public void addExecutors(String name, ExecutorService executorService) {
Optional.ofNullable(executorService)
.filter(Objects::nonNull)
.filter(e -> e instanceof ThreadPoolExecutor)
.map(e -> (ThreadPoolExecutor) e)
.ifPresent(threadPoolExecutor -> {
sampleThreadPoolExecutor.put(name, threadPoolExecutor);
samplesChanged.set(true);
if (sampleThreadPoolExecutor.put(name, threadPoolExecutor) == null) {
samplesChanged.set(true);
}
});
}

Expand Down Expand Up @@ -152,18 +168,20 @@ public void registryDefaultSampleThreadPoolExecutor() {
}

if (dataStore != null) {
dataStore.addListener(this);

Map<String, Object> executors = dataStore.get(EXECUTOR_SERVICE_COMPONENT_KEY);
for (Map.Entry<String, Object> entry : executors.entrySet()) {
ExecutorService executor = (ExecutorService) entry.getValue();
if (executor instanceof ThreadPoolExecutor) {
this.addExecutors(SERVER_THREAD_POOL_NAME + "-" + entry.getKey(), executor);
this.addExecutors(SERVER_THREAD_POOL_PREFIX + entry.getKey(), executor);
}
}
executors = dataStore.get(CONSUMER_SHARED_EXECUTOR_SERVICE_COMPONENT_KEY);
for (Map.Entry<String, Object> entry : executors.entrySet()) {
ExecutorService executor = (ExecutorService) entry.getValue();
if (executor instanceof ThreadPoolExecutor) {
this.addExecutors(CLIENT_THREAD_POOL_NAME + "-" + entry.getKey(), executor);
this.addExecutors(CLIENT_THREAD_POOL_PREFIX + entry.getKey(), executor);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.apache.dubbo.common.beans.factory.ScopeBeanFactory;
import org.apache.dubbo.common.extension.ExtensionLoader;
import org.apache.dubbo.common.store.DataStore;
import org.apache.dubbo.common.store.DataStoreUpdateListener;
import org.apache.dubbo.common.threadpool.manager.FrameworkExecutorRepository;
import org.apache.dubbo.metrics.collector.DefaultMetricsCollector;
import org.apache.dubbo.metrics.model.ThreadPoolMetric;
Expand All @@ -37,11 +38,13 @@
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;

import static org.apache.dubbo.common.constants.CommonConstants.CONSUMER_SHARED_EXECUTOR_SERVICE_COMPONENT_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.EXECUTOR_SERVICE_COMPONENT_KEY;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

@SuppressWarnings("all")
Expand Down Expand Up @@ -178,4 +181,32 @@ public void testRegistryDefaultSampleThreadPoolExecutor() throws NoSuchFieldExce
serverExecutor.shutdown();
clientExecutor.shutdown();
}

@Test
void testDataSourceNotify() throws Exception {
ArgumentCaptor<DataStoreUpdateListener> captor = ArgumentCaptor.forClass(DataStoreUpdateListener.class);
when(scopeBeanFactory.getBean(FrameworkExecutorRepository.class)).thenReturn(frameworkExecutorRepository);
when(frameworkExecutorRepository.getSharedExecutor()).thenReturn(null);
sampler2.registryDefaultSampleThreadPoolExecutor();

Field f = ThreadPoolMetricsSampler.class.getDeclaredField("sampleThreadPoolExecutor");
f.setAccessible(true);
Map<String, ThreadPoolExecutor> executors = (Map<String, ThreadPoolExecutor>) f.get(sampler2);

Assertions.assertEquals(0, executors.size());

verify(dataStore).addListener(captor.capture());
Assertions.assertEquals(sampler2, captor.getValue());

ExecutorService executorService = Executors.newFixedThreadPool(5);
sampler2.onUpdate(EXECUTOR_SERVICE_COMPONENT_KEY, "20880", executorService);

executors = (Map<String, ThreadPoolExecutor>) f.get(sampler2);
Assertions.assertEquals(1, executors.size());
Assertions.assertTrue(executors.containsKey("DubboServerHandler-20880"));

sampler2.onUpdate(CONSUMER_SHARED_EXECUTOR_SERVICE_COMPONENT_KEY, "client", executorService);
Assertions.assertEquals(2, executors.size());
Assertions.assertTrue(executors.containsKey("DubboClientHandler-client"));
}
}
Loading