Skip to content

Commit

Permalink
[pinpoint-apm#11267] Add Hbase client side metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
intr3p1d committed Sep 13, 2024
1 parent 4699b99 commit 4bdf3c1
Show file tree
Hide file tree
Showing 7 changed files with 357 additions and 0 deletions.
2 changes: 2 additions & 0 deletions collector-monitor/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,6 @@
</dependency>
</dependencies>

<profiles>
</profiles>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Copyright 2024 NAVER Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.navercorp.pinpoint.collector.monitor.config;

import org.apache.hadoop.hbase.client.AsyncConnectionImpl;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.MetricsConnection;
import org.apache.hadoop.hbase.shaded.com.codahale.metrics.MetricRegistry;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Optional;

/**
* @author intr3p1d
*/
public class HbaseConnectionReflects {

private static final Logger logger = LogManager.getLogger(HbaseConnectionReflects.class);


static List<MetricRegistry> getRegistriesFromConnections(ClusterConnection conn, AsyncConnectionImpl asyncConn) {
List<MetricRegistry> registries = new ArrayList<>();

addMetricRegistryFromConnection(registries, conn);
addMetricRegistryFromAsyncConnection(registries, asyncConn);
return registries;
}

static void addMetricRegistryFromConnection(List<MetricRegistry> registries, ClusterConnection conn) {
MetricRegistry metricRegistry = getMetricRegistry(getMetricsConnection(conn));
if (metricRegistry != null) {
registries.add(metricRegistry);
}
}

static void addMetricRegistryFromAsyncConnection(List<MetricRegistry> registries, AsyncConnectionImpl asyncConn) {
MetricsConnection metricsConnection = getMetricsConnection(asyncConn)
.orElseThrow(() -> new NoSuchElementException("MetricsConnection not present"));
MetricRegistry metricRegistry = getMetricRegistry(metricsConnection);
if (metricRegistry != null) {
registries.add(metricRegistry);
}
}

@SuppressWarnings("unchecked")
static Optional<MetricsConnection> getMetricsConnection(AsyncConnectionImpl asyncConnection) {
try {
Method method = asyncConnection.getClass().getDeclaredMethod("getConnectionMetrics");
method.setAccessible(true);
return (Optional<MetricsConnection>) method.invoke(asyncConnection);
} catch (Exception e) {
logger.warn(e);
return Optional.empty();
}
}

static MetricsConnection getMetricsConnection(ClusterConnection connectionImplementation) {
try {
Method method = connectionImplementation.getClass().getDeclaredMethod("getConnectionMetrics");
method.setAccessible(true);
return (MetricsConnection) method.invoke(connectionImplementation);
} catch (Exception e) {
logger.warn(e);
return null;
}
}

static MetricRegistry getMetricRegistry(MetricsConnection metricsConnection) {
try {
Method method = metricsConnection.getClass().getDeclaredMethod("getMetricRegistry");
method.setAccessible(true);
return (MetricRegistry) method.invoke(metricsConnection);
} catch (Exception e) {
logger.warn(e);
return null;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Copyright 2024 NAVER Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.navercorp.pinpoint.collector.monitor.config;

import com.navercorp.pinpoint.collector.monitor.dao.hbase.HBaseMetricsAdapter;
import io.micrometer.core.instrument.MeterRegistry;
import org.apache.hadoop.hbase.client.AsyncConnection;
import org.apache.hadoop.hbase.client.AsyncConnectionImpl;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.shaded.com.codahale.metrics.MetricRegistry;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.beans.factory.FactoryBean;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.List;

import static com.navercorp.pinpoint.collector.monitor.config.HbaseConnectionReflects.getRegistriesFromConnections;

/**
* @author intr3p1d
*/
@Configuration
@ConditionalOnProperty(value = "pinpoint.modules.collector.hbase-client-metric", havingValue = "true")
public class HbaseMetricsConfiguration {

private final Logger logger = LogManager.getLogger(HbaseMetricsConfiguration.class);

public HbaseMetricsConfiguration() {
logger.info("Install {}", HbaseMetricsConfiguration.class.getSimpleName());
}

@Bean
public HBaseMetricsAdapter collectHBaseMetrics(
MeterRegistry meterRegistry,
@Qualifier("hbaseConnection")
FactoryBean<Connection> connectionFactoryBean,
@Qualifier("hbaseAsyncConnection")
FactoryBean<AsyncConnection> asyncConnectionFactoryBean
) {
try {
ClusterConnection conn = (ClusterConnection) connectionFactoryBean.getObject();
AsyncConnectionImpl asyncConn = (AsyncConnectionImpl) asyncConnectionFactoryBean.getObject();
List<MetricRegistry> registries = getRegistriesFromConnections(conn, asyncConn);

return new HBaseMetricsAdapter(
meterRegistry, registries
);
} catch (Exception e) {
logger.error("HbaseMetrics Error: ", e);
}
return null;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package com.navercorp.pinpoint.collector.monitor.dao.hbase;

import io.micrometer.core.instrument.DistributionSummary;
import io.micrometer.core.instrument.MeterRegistry;
import org.apache.hadoop.hbase.shaded.com.codahale.metrics.Counter;
import org.apache.hadoop.hbase.shaded.com.codahale.metrics.Gauge;
import org.apache.hadoop.hbase.shaded.com.codahale.metrics.Histogram;
import org.apache.hadoop.hbase.shaded.com.codahale.metrics.MetricRegistry;
import org.apache.hadoop.hbase.shaded.com.codahale.metrics.Timer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Collection;

import static com.navercorp.pinpoint.collector.monitor.dao.hbase.MetricNameExtractor.extractName;
import static com.navercorp.pinpoint.collector.monitor.dao.hbase.MetricNameExtractor.extractTags;


public class HBaseMetricsAdapter {
private final Logger logger = LogManager.getLogger(HBaseMetricsAdapter.class);
private final MeterRegistry meterRegistry;
private final Collection<MetricRegistry> metricRegistries;

public HBaseMetricsAdapter(MeterRegistry meterRegistry, Collection<MetricRegistry> metricRegistries) {
this.meterRegistry = meterRegistry;
this.metricRegistries = metricRegistries;
initialize();
}

private void initialize() {
logger.info("initialize metricRegistries: {}", metricRegistries);

for (MetricRegistry metricRegistry : metricRegistries) {
if (metricRegistry != null) {
logger.info(metricRegistry);
metricRegistry.getMetrics().forEach((name, metric) -> {
if (metric instanceof Counter counter) {
registerCounterMetric(name, counter);
} else if (metric instanceof Timer timer) {
registerTimerMetric(name, timer);
} else if (metric instanceof Gauge<?> gauge) {
registerGaugeMetric(name, gauge);
} else if (metric instanceof Histogram histogram) {
registerHistogramMetric(name, histogram);
}
});
}
}
}

private void registerCounterMetric(String name, Counter counter) {
io.micrometer.core.instrument.Gauge.builder(extractName(name), counter, Counter::getCount)
.tags(extractTags(name))
.register(meterRegistry);
}

private void registerTimerMetric(String name, Timer timer) {
io.micrometer.core.instrument.Gauge.builder(extractName(name), timer, Timer::getCount)
.tags(extractTags(name))
.register(meterRegistry);
}

private void registerGaugeMetric(String name, Gauge<?> gauge) {
io.micrometer.core.instrument.Gauge.builder(extractName(name), gauge, HBaseMetricsAdapter::doubleValue)
.tags(extractTags(name))
.register(meterRegistry);
}

private void registerHistogramMetric(String name, Histogram histogram) {
DistributionSummary.builder(extractName(name))
.tags(extractTags(name))
.register(meterRegistry);
}

public static double doubleValue(Gauge<?> gauge) {
if (gauge == null || gauge.getValue() == null) {
return Double.NaN;
}
Object value = gauge.getValue();
return Double.parseDouble(value.toString());
}


@Override
public String toString() {
return "HBaseMetricsAdapter{" +
"logger=" + logger +
", meterRegistry=" + meterRegistry +
", metricRegistries=" + metricRegistries +
'}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright 2024 NAVER Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.navercorp.pinpoint.collector.monitor.dao.hbase;

import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Tags;

import java.util.regex.Matcher;
import java.util.regex.Pattern;

/**
* @author intr3p1d
*/
public class MetricNameExtractor {

static String extractName(String name) {
int atIndex = name.lastIndexOf('.');
if (atIndex != -1) {
return name.substring(0, atIndex);
} else {
return name;
}
}

static Tags extractTags(String name) {
String regex = ".*\\.([0-9a-fA-F\\-]{36})@([0-9a-fA-F]+)$";
Pattern pattern = Pattern.compile(regex);
Matcher matcher = pattern.matcher(name);

if (matcher.matches()) {
String uuid = matcher.group(1);
String hash = matcher.group(2);

return Tags.of(
Tag.of("clusterId", uuid),
Tag.of("connectionHash", hash)
);
} else {
return Tags.empty();
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package com.navercorp.pinpoint.collector.monitor.dao.hbase;

import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Tags;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

/**
* @author intr3p1d
*/
class MetricNameExtractorTest {

static final String METRIC_NAME = "org.apache.hadoop.hbase.client.MetricsConnection.executorPoolActiveThreads";
static final String DUMMY_CLUSTER_ID = "f72a0b6a-8141-4df9-96a3-754aac08e173";
static final String DUMMY_HASH = "10579683cf";

@Test
public void testCustomName() {
String example = METRIC_NAME + "." + DUMMY_CLUSTER_ID + "@" + DUMMY_HASH;
String actual = MetricNameExtractor.extractName(example);

Assertions.assertEquals(METRIC_NAME, actual);
}

@Test
public void testExtractTags() {
String example = METRIC_NAME + "." + DUMMY_CLUSTER_ID + "@" + DUMMY_HASH;
Tags expected = Tags.of(
Tag.of("clusterId", DUMMY_CLUSTER_ID),
Tag.of("connectionHash", DUMMY_HASH)
);
Tags actual = MetricNameExtractor.extractTags(example);

Assertions.assertEquals(expected, actual);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.navercorp.pinpoint.collector.grpc.config.GrpcStatConfiguration;
import com.navercorp.pinpoint.collector.grpc.config.GrpcStatReceiverConfiguration;
import com.navercorp.pinpoint.collector.monitor.MonitoredThreadPoolExecutorFactoryProvider;
import com.navercorp.pinpoint.collector.monitor.config.HbaseMetricsConfiguration;
import com.navercorp.pinpoint.collector.monitor.config.MicrometerConfiguration;
import com.navercorp.pinpoint.collector.monitor.MonitoringExecutors;
import com.navercorp.pinpoint.common.server.executor.ExecutorCustomizer;
Expand Down Expand Up @@ -55,6 +56,7 @@
GrpcKeepAliveScheduler.class,

MicrometerConfiguration.class,
HbaseMetricsConfiguration.class,

ChannelzConfiguration.class
})
Expand Down

0 comments on commit 4bdf3c1

Please sign in to comment.