diff --git a/web/src/main/java/com/navercorp/pinpoint/web/applicationmap/config/MapHbaseConfiguration.java b/web/src/main/java/com/navercorp/pinpoint/web/applicationmap/config/MapHbaseConfiguration.java index ad19cb896ae8..6766cff86048 100644 --- a/web/src/main/java/com/navercorp/pinpoint/web/applicationmap/config/MapHbaseConfiguration.java +++ b/web/src/main/java/com/navercorp/pinpoint/web/applicationmap/config/MapHbaseConfiguration.java @@ -19,7 +19,9 @@ import com.navercorp.pinpoint.common.hbase.ConnectionFactoryBean; import com.navercorp.pinpoint.common.hbase.HbaseTemplate; +import com.navercorp.pinpoint.common.hbase.RowMapper; import com.navercorp.pinpoint.common.hbase.TableFactory; +import com.navercorp.pinpoint.common.hbase.TableNameProvider; import com.navercorp.pinpoint.common.hbase.async.AsyncConnectionFactoryBean; import com.navercorp.pinpoint.common.hbase.async.AsyncTableCustomizer; import com.navercorp.pinpoint.common.hbase.async.AsyncTableFactory; @@ -28,6 +30,18 @@ import com.navercorp.pinpoint.common.hbase.util.ScanMetricReporter; import com.navercorp.pinpoint.common.server.executor.ExecutorCustomizer; import com.navercorp.pinpoint.common.server.executor.ExecutorProperties; +import com.navercorp.pinpoint.web.applicationmap.dao.MapResponseDao; +import com.navercorp.pinpoint.web.applicationmap.dao.MapStatisticsCalleeDao; +import com.navercorp.pinpoint.web.applicationmap.dao.MapStatisticsCallerDao; +import com.navercorp.pinpoint.web.applicationmap.dao.hbase.HbaseMapResponseTimeDao; +import com.navercorp.pinpoint.web.applicationmap.dao.hbase.HbaseMapStatisticsCalleeDao; +import com.navercorp.pinpoint.web.applicationmap.dao.hbase.HbaseMapStatisticsCallerDao; +import com.navercorp.pinpoint.web.applicationmap.dao.hbase.MapScanFactory; +import com.navercorp.pinpoint.web.applicationmap.dao.mapper.RowMapperFactory; +import com.navercorp.pinpoint.web.applicationmap.rawdata.LinkDataMap; +import com.navercorp.pinpoint.web.vo.RangeFactory; +import com.navercorp.pinpoint.web.vo.ResponseTime; +import com.sematext.hbase.wd.RowKeyDistributorByHashPrefix; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.AsyncConnection; import org.apache.hadoop.hbase.client.Connection; @@ -38,7 +52,6 @@ import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.Import; import org.springframework.scheduling.concurrent.ThreadPoolExecutorFactoryBean; @@ -46,14 +59,12 @@ import java.util.concurrent.ExecutorService; @org.springframework.context.annotation.Configuration -@ComponentScan({ - "com.navercorp.pinpoint.web.applicationmap.dao.hbase", -}) @Import({ MapMapperConfiguration.class }) public class MapHbaseConfiguration { private final Logger logger = LogManager.getLogger(MapHbaseConfiguration.class); + private final HbaseTemplateConfiguration config = new HbaseTemplateConfiguration(); public MapHbaseConfiguration() { @@ -62,7 +73,7 @@ public MapHbaseConfiguration() { @Bean public FactoryBean mapHbaseThreadPool(@Qualifier("hbaseExecutorCustomizer") ExecutorCustomizer executorCustomizer, - @Qualifier("hbaseClientExecutorProperties") ExecutorProperties properties) { + @Qualifier("hbaseClientExecutorProperties") ExecutorProperties properties) { ThreadPoolExecutorFactoryBean factory = new ThreadPoolExecutorFactoryBean(); executorCustomizer.customize(factory, properties); factory.setThreadNamePrefix("Map-" + factory.getThreadNamePrefix()); @@ -71,8 +82,8 @@ public FactoryBean mapHbaseThreadPool(@Qualifier("hbaseExecutor @Bean public FactoryBean mapHbaseConnection(Configuration configuration, - User user, - @Qualifier("mapHbaseThreadPool") ExecutorService executorService) { + User user, + @Qualifier("mapHbaseThreadPool") ExecutorService executorService) { return new ConnectionFactoryBean(configuration, user, executorService); } @@ -103,5 +114,45 @@ public HbaseTemplate mapHbaseTemplate(@Qualifier("hbaseConfiguration") Configura return config.hbaseTemplate(configurable, tableFactory, asyncTableFactory, parallelScan, nativeAsync, reporter); } + @Bean + public MapScanFactory mapScanFactory(RangeFactory rangeFactory) { + return new MapScanFactory(rangeFactory); + } + + @Bean + public MapResponseDao hbaseMapResponseTimeDao(@Qualifier("mapHbaseTemplate") + HbaseTemplate hbaseTemplate, + TableNameProvider tableNameProvider, + @Qualifier("responseTimeMapper") + RowMapper responseTimeMapper, + MapScanFactory mapScanFactory, + @Qualifier("statisticsSelfRowKeyDistributor") + RowKeyDistributorByHashPrefix rowKeyDistributor) { + return new HbaseMapResponseTimeDao(hbaseTemplate, tableNameProvider, responseTimeMapper, mapScanFactory, rowKeyDistributor); + } + + @Bean + public MapStatisticsCalleeDao hbaseMapStatisticsCalleeDao(@Qualifier("mapHbaseTemplate") + HbaseTemplate hbaseTemplate, + TableNameProvider tableNameProvider, + @Qualifier("mapCalleeMapper") + RowMapperFactory calleeMapper, + MapScanFactory mapScanFactory, + @Qualifier("statisticsCalleeRowKeyDistributor") + RowKeyDistributorByHashPrefix rowKeyDistributor) { + return new HbaseMapStatisticsCalleeDao(hbaseTemplate, tableNameProvider, calleeMapper, mapScanFactory, rowKeyDistributor); + } + + @Bean + public MapStatisticsCallerDao hbaseMapStatisticsCallerDao(@Qualifier("mapHbaseTemplate") + HbaseTemplate hbaseTemplate, + TableNameProvider tableNameProvider, + @Qualifier("mapCallerMapper") + RowMapperFactory callerMapper, + MapScanFactory mapScanFactory, + @Qualifier("statisticsCallerRowKeyDistributor") + RowKeyDistributorByHashPrefix rowKeyDistributor) { + return new HbaseMapStatisticsCallerDao(hbaseTemplate, tableNameProvider, callerMapper, mapScanFactory, rowKeyDistributor); + } } diff --git a/web/src/main/java/com/navercorp/pinpoint/web/applicationmap/config/MapMapperConfiguration.java b/web/src/main/java/com/navercorp/pinpoint/web/applicationmap/config/MapMapperConfiguration.java index c1f55179e7e8..79959805c9ac 100644 --- a/web/src/main/java/com/navercorp/pinpoint/web/applicationmap/config/MapMapperConfiguration.java +++ b/web/src/main/java/com/navercorp/pinpoint/web/applicationmap/config/MapMapperConfiguration.java @@ -7,9 +7,9 @@ import com.navercorp.pinpoint.web.applicationmap.dao.mapper.MapStatisticsCalleeMapper; import com.navercorp.pinpoint.web.applicationmap.dao.mapper.MapStatisticsCallerMapper; import com.navercorp.pinpoint.web.applicationmap.dao.mapper.ResponseTimeMapper; +import com.navercorp.pinpoint.web.applicationmap.dao.mapper.RowMapperFactory; import com.navercorp.pinpoint.web.applicationmap.rawdata.LinkDataMap; import com.navercorp.pinpoint.web.component.ApplicationFactory; -import com.navercorp.pinpoint.web.util.TimeWindowFunction; import com.navercorp.pinpoint.web.vo.ResponseTime; import com.sematext.hbase.wd.RowKeyDistributorByHashPrefix; import org.springframework.beans.factory.annotation.Qualifier; @@ -20,33 +20,18 @@ public class MapMapperConfiguration { @Bean - public RowMapper mapStatisticsCallerMapper(ApplicationFactory applicationFactory, - @Qualifier("statisticsCallerRowKeyDistributor") - RowKeyDistributorByHashPrefix rowKeyDistributor) { - return new MapStatisticsCallerMapper(applicationFactory, rowKeyDistributor, LinkFilter::skip, TimeWindowFunction.identity()); + public RowMapperFactory mapCallerMapper(ApplicationFactory applicationFactory, + @Qualifier("statisticsCallerRowKeyDistributor") + RowKeyDistributorByHashPrefix rowKeyDistributor) { + return (windowFunction) -> new MapStatisticsCallerMapper(applicationFactory, rowKeyDistributor, LinkFilter::skip, windowFunction); } @Bean - public RowMapper mapStatisticsCallerTimeAggregatedMapper(ApplicationFactory applicationFactory, - @Qualifier("statisticsCallerRowKeyDistributor") - RowKeyDistributorByHashPrefix rowKeyDistributor) { - return new MapStatisticsCallerMapper(applicationFactory, rowKeyDistributor, LinkFilter::skip, TimeWindowFunction.ALL_IN_ONE); - } - - @Bean - public RowMapper mapStatisticsCalleeMapper(ServiceTypeRegistryService registry, - ApplicationFactory applicationFactory, - @Qualifier("statisticsCalleeRowKeyDistributor") - RowKeyDistributorByHashPrefix rowKeyDistributor) { - return new MapStatisticsCalleeMapper(registry, applicationFactory, rowKeyDistributor, LinkFilter::skip, TimeWindowFunction.identity()); - } - - @Bean - public RowMapper mapStatisticsCalleeTimeAggregatedMapper(ServiceTypeRegistryService registry, - ApplicationFactory applicationFactory, - @Qualifier("statisticsCalleeRowKeyDistributor") - RowKeyDistributorByHashPrefix rowKeyDistributor) { - return new MapStatisticsCalleeMapper(registry, applicationFactory, rowKeyDistributor, LinkFilter::skip, TimeWindowFunction.ALL_IN_ONE); + public RowMapperFactory mapCalleeMapper(ServiceTypeRegistryService registry, + ApplicationFactory applicationFactory, + @Qualifier("statisticsCalleeRowKeyDistributor") + RowKeyDistributorByHashPrefix rowKeyDistributor) { + return (windowFunction) -> new MapStatisticsCalleeMapper(registry, applicationFactory, rowKeyDistributor, LinkFilter::skip, windowFunction); } @Bean diff --git a/web/src/main/java/com/navercorp/pinpoint/web/applicationmap/dao/hbase/HbaseMapResponseTimeDao.java b/web/src/main/java/com/navercorp/pinpoint/web/applicationmap/dao/hbase/HbaseMapResponseTimeDao.java index 34c049821191..5746fa4710be 100644 --- a/web/src/main/java/com/navercorp/pinpoint/web/applicationmap/dao/hbase/HbaseMapResponseTimeDao.java +++ b/web/src/main/java/com/navercorp/pinpoint/web/applicationmap/dao/hbase/HbaseMapResponseTimeDao.java @@ -20,18 +20,15 @@ import com.navercorp.pinpoint.common.hbase.HbaseOperations; import com.navercorp.pinpoint.common.hbase.RowMapper; import com.navercorp.pinpoint.common.hbase.TableNameProvider; -import com.navercorp.pinpoint.common.server.util.ApplicationMapStatisticsUtils; import com.navercorp.pinpoint.common.server.util.time.Range; import com.navercorp.pinpoint.web.applicationmap.dao.MapResponseDao; import com.navercorp.pinpoint.web.vo.Application; -import com.navercorp.pinpoint.web.vo.RangeFactory; import com.navercorp.pinpoint.web.vo.ResponseTime; import com.sematext.hbase.wd.RowKeyDistributorByHashPrefix; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Scan; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Repository; import java.util.ArrayList; @@ -51,27 +48,25 @@ public class HbaseMapResponseTimeDao implements MapResponseDao { private static final HbaseColumnFamily.SelfStatMap DESCRIPTOR = HbaseColumnFamily.MAP_STATISTICS_SELF_VER2_COUNTER; - private int scanCacheSize = 40; - private final RowMapper responseTimeMapper; private final HbaseOperations hbaseOperations; private final TableNameProvider tableNameProvider; - private final RangeFactory rangeFactory; + private final MapScanFactory scanFactory; - private final RowKeyDistributorByHashPrefix rowKeyDistributorByHashPrefix; + private final RowKeyDistributorByHashPrefix rowKeyDistributor; - public HbaseMapResponseTimeDao(@Qualifier("mapHbaseTemplate") HbaseOperations hbaseOperations, + public HbaseMapResponseTimeDao(HbaseOperations hbaseOperations, TableNameProvider tableNameProvider, - @Qualifier("responseTimeMapper") RowMapper responseTimeMapper, - RangeFactory rangeFactory, - @Qualifier("statisticsSelfRowKeyDistributor") RowKeyDistributorByHashPrefix rowKeyDistributorByHashPrefix) { + RowMapper responseTimeMapper, + MapScanFactory scanFactory, + RowKeyDistributorByHashPrefix rowKeyDistributor) { this.hbaseOperations = Objects.requireNonNull(hbaseOperations, "hbaseOperations"); this.tableNameProvider = Objects.requireNonNull(tableNameProvider, "tableNameProvider"); this.responseTimeMapper = Objects.requireNonNull(responseTimeMapper, "responseTimeMapper"); - this.rangeFactory = Objects.requireNonNull(rangeFactory, "rangeFactory"); - this.rowKeyDistributorByHashPrefix = Objects.requireNonNull(rowKeyDistributorByHashPrefix, "rowKeyDistributorByHashPrefix"); + this.scanFactory = Objects.requireNonNull(scanFactory, "scanFactory"); + this.rowKeyDistributor = Objects.requireNonNull(rowKeyDistributor, "rowKeyDistributor"); } @@ -83,10 +78,10 @@ public List selectResponseTime(Application application, Range rang logger.debug("selectResponseTime applicationName:{}, {}", application, range); } - Scan scan = createScan(application, range, DESCRIPTOR.getName()); + Scan scan = scanFactory.createScan("MapSelfScan", application, range, DESCRIPTOR.getName()); TableName mapStatisticsSelfTableName = tableNameProvider.getTableName(DESCRIPTOR.getTable()); - List responseTimeList = hbaseOperations.findParallel(mapStatisticsSelfTableName, scan, rowKeyDistributorByHashPrefix, responseTimeMapper, MAP_STATISTICS_SELF_VER2_NUM_PARTITIONS); + List responseTimeList = hbaseOperations.findParallel(mapStatisticsSelfTableName, scan, rowKeyDistributor, responseTimeMapper, MAP_STATISTICS_SELF_VER2_NUM_PARTITIONS); if (responseTimeList.isEmpty()) { return new ArrayList<>(); @@ -95,24 +90,4 @@ public List selectResponseTime(Application application, Range rang return responseTimeList; } - private Scan createScan(Application application, Range range, byte[] family) { - range = rangeFactory.createStatisticsRange(range); - if (logger.isDebugEnabled()) { - logger.debug("scan time:{} ", range.prettyToString()); - } - - // start key is replaced by end key because timestamp has been reversed - byte[] startKey = ApplicationMapStatisticsUtils.makeRowKey(application.getName(), application.getServiceTypeCode(), range.getTo()); - byte[] endKey = ApplicationMapStatisticsUtils.makeRowKey(application.getName(), application.getServiceTypeCode(), range.getFrom()); - - final Scan scan = new Scan(); - scan.setCaching(this.scanCacheSize); - scan.withStartRow(startKey); - scan.withStopRow(endKey); - scan.addFamily(family); - scan.setId("ApplicationSelfScan"); - - return scan; - } - } \ No newline at end of file diff --git a/web/src/main/java/com/navercorp/pinpoint/web/applicationmap/dao/hbase/HbaseMapStatisticsCalleeDao.java b/web/src/main/java/com/navercorp/pinpoint/web/applicationmap/dao/hbase/HbaseMapStatisticsCalleeDao.java index df227b3a1f6a..3373c4ba3028 100644 --- a/web/src/main/java/com/navercorp/pinpoint/web/applicationmap/dao/hbase/HbaseMapStatisticsCalleeDao.java +++ b/web/src/main/java/com/navercorp/pinpoint/web/applicationmap/dao/hbase/HbaseMapStatisticsCalleeDao.java @@ -18,27 +18,27 @@ import com.navercorp.pinpoint.common.hbase.HbaseColumnFamily; import com.navercorp.pinpoint.common.hbase.HbaseOperations; +import com.navercorp.pinpoint.common.hbase.HbaseTable; import com.navercorp.pinpoint.common.hbase.ResultsExtractor; import com.navercorp.pinpoint.common.hbase.RowMapper; import com.navercorp.pinpoint.common.hbase.TableNameProvider; -import com.navercorp.pinpoint.common.server.util.ApplicationMapStatisticsUtils; import com.navercorp.pinpoint.common.server.util.time.Range; import com.navercorp.pinpoint.web.applicationmap.dao.MapStatisticsCalleeDao; import com.navercorp.pinpoint.web.applicationmap.dao.mapper.MapStatisticsTimeWindowReducer; +import com.navercorp.pinpoint.web.applicationmap.dao.mapper.RowMapperFactory; import com.navercorp.pinpoint.web.applicationmap.link.LinkDirection; import com.navercorp.pinpoint.web.applicationmap.rawdata.LinkDataMap; import com.navercorp.pinpoint.web.applicationmap.rawdata.LinkDataMapUtils; import com.navercorp.pinpoint.web.mapper.RowMapReduceResultExtractor; import com.navercorp.pinpoint.web.util.TimeWindow; import com.navercorp.pinpoint.web.util.TimeWindowDownSampler; +import com.navercorp.pinpoint.web.util.TimeWindowFunction; import com.navercorp.pinpoint.web.vo.Application; -import com.navercorp.pinpoint.web.vo.RangeFactory; import com.sematext.hbase.wd.RowKeyDistributorByHashPrefix; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Scan; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Repository; import java.util.Objects; @@ -51,7 +51,6 @@ public class HbaseMapStatisticsCalleeDao implements MapStatisticsCalleeDao { private static final int MAP_STATISTICS_CALLER_VER2_NUM_PARTITIONS = 32; - private static final int SCAN_CACHE_SIZE = 40; private final Logger logger = LogManager.getLogger(this.getClass()); @@ -60,27 +59,24 @@ public class HbaseMapStatisticsCalleeDao implements MapStatisticsCalleeDao { private final HbaseOperations hbaseTemplate; private final TableNameProvider tableNameProvider; - private final RowMapper mapStatisticsCalleeMapper; - private final RowMapper mapStatisticsCalleeTimeAggregatedMapper; + private final RowMapperFactory calleeMapperFactory; - private final RangeFactory rangeFactory; + private final MapScanFactory scanFactory; - private final RowKeyDistributorByHashPrefix rowKeyDistributorByHashPrefix; + private final RowKeyDistributorByHashPrefix rowKeyDistributor; public HbaseMapStatisticsCalleeDao( - @Qualifier("mapHbaseTemplate") HbaseOperations hbaseTemplate, + HbaseOperations hbaseTemplate, TableNameProvider tableNameProvider, - @Qualifier("mapStatisticsCalleeMapper") RowMapper mapStatisticsCalleeMapper, - @Qualifier("mapStatisticsCalleeTimeAggregatedMapper") RowMapper mapStatisticsCalleeTimeAggregatedMapper, - RangeFactory rangeFactory, - @Qualifier("statisticsCalleeRowKeyDistributor") RowKeyDistributorByHashPrefix rowKeyDistributorByHashPrefix) { + RowMapperFactory calleeMapperFactory, + MapScanFactory scanFactory, + RowKeyDistributorByHashPrefix rowKeyDistributor) { this.hbaseTemplate = Objects.requireNonNull(hbaseTemplate, "hbaseTemplate"); this.tableNameProvider = Objects.requireNonNull(tableNameProvider, "tableNameProvider"); - this.mapStatisticsCalleeMapper = Objects.requireNonNull(mapStatisticsCalleeMapper, "mapStatisticsCalleeMapper"); - this.mapStatisticsCalleeTimeAggregatedMapper = Objects.requireNonNull(mapStatisticsCalleeTimeAggregatedMapper, "mapStatisticsCalleeTimeAggregatedMapper"); - this.rangeFactory = Objects.requireNonNull(rangeFactory, "rangeFactory"); - this.rowKeyDistributorByHashPrefix = Objects.requireNonNull(rowKeyDistributorByHashPrefix, "rowKeyDistributorByHashPrefix"); + this.calleeMapperFactory = Objects.requireNonNull(calleeMapperFactory, "calleeMapperFactory"); + this.scanFactory = Objects.requireNonNull(scanFactory, "scanFactory"); + this.rowKeyDistributor = Objects.requireNonNull(rowKeyDistributor, "rowKeyDistributor"); } @Override @@ -89,46 +85,32 @@ public LinkDataMap selectCallee(Application calleeApplication, Range range, bool Objects.requireNonNull(range, "range"); final TimeWindow timeWindow = new TimeWindow(range, TimeWindowDownSampler.SAMPLER); - // find distributed key - ver2. - final Scan scan = createScan(calleeApplication, range, DESCRIPTOR.getName()); - ResultsExtractor resultExtractor; + TimeWindowFunction mapperWindow = newTimeWindow(timeAggregated); + RowMapper rowMapper = this.calleeMapperFactory.newMapper(mapperWindow); + ResultsExtractor resultExtractor = new RowMapReduceResultExtractor<>(rowMapper, new MapStatisticsTimeWindowReducer(timeWindow)); + + final Scan scan = scanFactory.createScan("MapCalleeScan", calleeApplication, range, DESCRIPTOR.getName()); + + return selectInLink(scan, DESCRIPTOR.getTable(), resultExtractor, MAP_STATISTICS_CALLER_VER2_NUM_PARTITIONS); + } + + private TimeWindowFunction newTimeWindow(boolean timeAggregated) { if (timeAggregated) { - resultExtractor = new RowMapReduceResultExtractor<>(mapStatisticsCalleeTimeAggregatedMapper, new MapStatisticsTimeWindowReducer(timeWindow)); - } else { - resultExtractor = new RowMapReduceResultExtractor<>(mapStatisticsCalleeMapper, new MapStatisticsTimeWindowReducer(timeWindow)); + return TimeWindowFunction.ALL_IN_ONE; } + return TimeWindowFunction.identity(); + } + - TableName mapStatisticsCallerTableName = tableNameProvider.getTableName(DESCRIPTOR.getTable()); - LinkDataMap linkDataMap = hbaseTemplate.findParallel(mapStatisticsCallerTableName, scan, rowKeyDistributorByHashPrefix, resultExtractor, MAP_STATISTICS_CALLER_VER2_NUM_PARTITIONS); - logger.debug("{} data. {}, {}", LinkDirection.IN_LINK, linkDataMap, range); + private LinkDataMap selectInLink(Scan scan, HbaseTable table, ResultsExtractor resultExtractor, int parallel) { + TableName callerTableName = tableNameProvider.getTableName(table); + LinkDataMap linkDataMap = hbaseTemplate.findParallel(callerTableName, scan, rowKeyDistributor, resultExtractor, parallel); + logger.debug("{} {} data: {}", LinkDirection.IN_LINK, callerTableName.getNameAsString(), linkDataMap); if (LinkDataMapUtils.hasLength(linkDataMap)) { return linkDataMap; } return new LinkDataMap(); } - - private Scan createScan(Application application, Range range, byte[] family) { - range = rangeFactory.createStatisticsRange(range); - - if (logger.isDebugEnabled()) { - logger.debug("scan time:{} ", range.prettyToString()); - } - - // start key is replaced by end key because timestamp has been reversed - byte[] startKey = ApplicationMapStatisticsUtils.makeRowKey(application.getName(), application.getServiceTypeCode(), range.getTo()); - byte[] endKey = ApplicationMapStatisticsUtils.makeRowKey(application.getName(), application.getServiceTypeCode(), range.getFrom()); - - Scan scan = new Scan(); - scan.setCaching(SCAN_CACHE_SIZE); - scan.withStartRow(startKey); - scan.withStopRow(endKey); - scan.addFamily(family); - scan.setId("ApplicationStatisticsScan"); - - return scan; - } - - } \ No newline at end of file diff --git a/web/src/main/java/com/navercorp/pinpoint/web/applicationmap/dao/hbase/HbaseMapStatisticsCallerDao.java b/web/src/main/java/com/navercorp/pinpoint/web/applicationmap/dao/hbase/HbaseMapStatisticsCallerDao.java index 731220022057..c6a813c1381a 100644 --- a/web/src/main/java/com/navercorp/pinpoint/web/applicationmap/dao/hbase/HbaseMapStatisticsCallerDao.java +++ b/web/src/main/java/com/navercorp/pinpoint/web/applicationmap/dao/hbase/HbaseMapStatisticsCallerDao.java @@ -18,27 +18,27 @@ import com.navercorp.pinpoint.common.hbase.HbaseColumnFamily; import com.navercorp.pinpoint.common.hbase.HbaseOperations; +import com.navercorp.pinpoint.common.hbase.HbaseTable; import com.navercorp.pinpoint.common.hbase.ResultsExtractor; import com.navercorp.pinpoint.common.hbase.RowMapper; import com.navercorp.pinpoint.common.hbase.TableNameProvider; -import com.navercorp.pinpoint.common.server.util.ApplicationMapStatisticsUtils; import com.navercorp.pinpoint.common.server.util.time.Range; import com.navercorp.pinpoint.web.applicationmap.dao.MapStatisticsCallerDao; import com.navercorp.pinpoint.web.applicationmap.dao.mapper.MapStatisticsTimeWindowReducer; +import com.navercorp.pinpoint.web.applicationmap.dao.mapper.RowMapperFactory; import com.navercorp.pinpoint.web.applicationmap.link.LinkDirection; import com.navercorp.pinpoint.web.applicationmap.rawdata.LinkDataMap; import com.navercorp.pinpoint.web.applicationmap.rawdata.LinkDataMapUtils; import com.navercorp.pinpoint.web.mapper.RowMapReduceResultExtractor; import com.navercorp.pinpoint.web.util.TimeWindow; import com.navercorp.pinpoint.web.util.TimeWindowDownSampler; +import com.navercorp.pinpoint.web.util.TimeWindowFunction; import com.navercorp.pinpoint.web.vo.Application; -import com.navercorp.pinpoint.web.vo.RangeFactory; import com.sematext.hbase.wd.RowKeyDistributorByHashPrefix; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Scan; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Repository; import java.util.Objects; @@ -52,7 +52,6 @@ public class HbaseMapStatisticsCallerDao implements MapStatisticsCallerDao { private static final int MAP_STATISTICS_CALLEE_VER2_NUM_PARTITIONS = 32; - private static final int SCAN_CACHE_SIZE = 40; private final Logger logger = LogManager.getLogger(this.getClass()); @@ -61,76 +60,55 @@ public class HbaseMapStatisticsCallerDao implements MapStatisticsCallerDao { private final HbaseOperations hbaseTemplate; private final TableNameProvider tableNameProvider; - private final RowMapper mapStatisticsCallerMapper; - private final RowMapper mapStatisticsCallerTimeAggregatedMapper; + private final RowMapperFactory callerMapperFactory; - private final RangeFactory rangeFactory; + private final MapScanFactory scanFactory; - private final RowKeyDistributorByHashPrefix rowKeyDistributorByHashPrefix; + private final RowKeyDistributorByHashPrefix rowKeyDistributor; public HbaseMapStatisticsCallerDao( - @Qualifier("mapHbaseTemplate") HbaseOperations hbaseTemplate, + HbaseOperations hbaseTemplate, TableNameProvider tableNameProvider, - @Qualifier("mapStatisticsCallerMapper") RowMapper mapStatisticsCallerMapper, - @Qualifier("mapStatisticsCallerTimeAggregatedMapper") RowMapper mapStatisticsCallerTimeAggregatedMapper, - RangeFactory rangeFactory, - @Qualifier("statisticsCallerRowKeyDistributor") RowKeyDistributorByHashPrefix rowKeyDistributorByHashPrefix) { + RowMapperFactory callerMapperFactory, + MapScanFactory scanFactory, + RowKeyDistributorByHashPrefix rowKeyDistributor) { this.hbaseTemplate = Objects.requireNonNull(hbaseTemplate, "hbaseTemplate"); this.tableNameProvider = Objects.requireNonNull(tableNameProvider, "tableNameProvider"); - this.mapStatisticsCallerMapper = Objects.requireNonNull(mapStatisticsCallerMapper, "mapStatisticsCallerMapper"); - this.mapStatisticsCallerTimeAggregatedMapper = Objects.requireNonNull(mapStatisticsCallerTimeAggregatedMapper, "mapStatisticsTimeAggregatedCallerMapper"); - this.rangeFactory = Objects.requireNonNull(rangeFactory, "rangeFactory"); - this.rowKeyDistributorByHashPrefix = Objects.requireNonNull(rowKeyDistributorByHashPrefix, "rowKeyDistributorByHashPrefix"); + this.callerMapperFactory = Objects.requireNonNull(callerMapperFactory, "callerMapperFactory"); + this.scanFactory = Objects.requireNonNull(scanFactory, "scanFactory"); + this.rowKeyDistributor = Objects.requireNonNull(rowKeyDistributor, "rowKeyDistributor"); } @Override public LinkDataMap selectCaller(Application callerApplication, Range range, boolean timeAggregated) { - Objects.requireNonNull(callerApplication, "callerApplication"); - Objects.requireNonNull(range, "range"); final TimeWindow timeWindow = new TimeWindow(range, TimeWindowDownSampler.SAMPLER); - // find distributed key. - final Scan scan = createScan(callerApplication, range, DESCRIPTOR.getName()); - ResultsExtractor resultExtractor; - if (timeAggregated) { - resultExtractor = new RowMapReduceResultExtractor<>(mapStatisticsCallerTimeAggregatedMapper, new MapStatisticsTimeWindowReducer(timeWindow)); - } else { - resultExtractor = new RowMapReduceResultExtractor<>(mapStatisticsCallerMapper, new MapStatisticsTimeWindowReducer(timeWindow)); - } + TimeWindowFunction mapperWindow = newTimeWindow(timeAggregated); + RowMapper rowMapper = this.callerMapperFactory.newMapper(mapperWindow); - TableName mapStatisticsCalleeTableName = tableNameProvider.getTableName(DESCRIPTOR.getTable()); - LinkDataMap linkDataMap = this.hbaseTemplate.findParallel(mapStatisticsCalleeTableName, scan, rowKeyDistributorByHashPrefix, resultExtractor, MAP_STATISTICS_CALLEE_VER2_NUM_PARTITIONS); - logger.debug("tableInfo({}). {} data. {}, {} : ", mapStatisticsCalleeTableName.getNameAsString(), LinkDirection.OUT_LINK, linkDataMap, range ); - if (LinkDataMapUtils.hasLength(linkDataMap)) { - return linkDataMap; - } + ResultsExtractor resultExtractor = new RowMapReduceResultExtractor<>(rowMapper, new MapStatisticsTimeWindowReducer(timeWindow)); - return new LinkDataMap(); + final Scan scan = scanFactory.createScan("MapCallerScan", callerApplication, range, DESCRIPTOR.getName()); + return selectOutLink(scan, DESCRIPTOR.getTable(), resultExtractor, MAP_STATISTICS_CALLEE_VER2_NUM_PARTITIONS); } - - private Scan createScan(Application application, Range range, byte[]... familyArgs) { - range = rangeFactory.createStatisticsRange(range); - - if (logger.isDebugEnabled()) { - logger.debug("scan Time:{}", range.prettyToString()); + private TimeWindowFunction newTimeWindow(boolean timeAggregated) { + if (timeAggregated) { + return TimeWindowFunction.ALL_IN_ONE; } + return TimeWindowFunction.identity(); + } - // start key is replaced by end key because timestamp has been reversed - byte[] startKey = ApplicationMapStatisticsUtils.makeRowKey(application.getName(), application.getServiceTypeCode(), range.getTo()); - byte[] endKey = ApplicationMapStatisticsUtils.makeRowKey(application.getName(), application.getServiceTypeCode(), range.getFrom()); - Scan scan = new Scan(); - scan.setCaching(SCAN_CACHE_SIZE); - scan.withStartRow(startKey); - scan.withStopRow(endKey); - for (byte[] family : familyArgs) { - scan.addFamily(family); + private LinkDataMap selectOutLink(Scan scan, HbaseTable table, ResultsExtractor resultExtractor, int parallel) { + TableName calleeTableName = tableNameProvider.getTableName(table); + LinkDataMap linkDataMap = this.hbaseTemplate.findParallel(calleeTableName, scan, rowKeyDistributor, resultExtractor, parallel); + logger.debug("{} {} data: {}", LinkDirection.OUT_LINK, calleeTableName.getNameAsString(), linkDataMap); + if (LinkDataMapUtils.hasLength(linkDataMap)) { + return linkDataMap; } - scan.setId("ApplicationStatisticsScan"); - - return scan; + return new LinkDataMap(); } } \ No newline at end of file diff --git a/web/src/main/java/com/navercorp/pinpoint/web/applicationmap/dao/hbase/MapScanFactory.java b/web/src/main/java/com/navercorp/pinpoint/web/applicationmap/dao/hbase/MapScanFactory.java new file mode 100644 index 000000000000..bc58e67b5b4d --- /dev/null +++ b/web/src/main/java/com/navercorp/pinpoint/web/applicationmap/dao/hbase/MapScanFactory.java @@ -0,0 +1,50 @@ +package com.navercorp.pinpoint.web.applicationmap.dao.hbase; + +import com.navercorp.pinpoint.common.server.util.ApplicationMapStatisticsUtils; +import com.navercorp.pinpoint.common.server.util.time.Range; +import com.navercorp.pinpoint.web.vo.Application; +import com.navercorp.pinpoint.web.vo.RangeFactory; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.Objects; + +public class MapScanFactory { + private final Logger logger = LogManager.getLogger(this.getClass()); + + public static final int SCAN_CACHE_SIZE = 40; + + private final RangeFactory rangeFactory; + + private int scanCacheSize; + + public MapScanFactory(RangeFactory rangeFactory) { + this.rangeFactory = Objects.requireNonNull(rangeFactory, "rangeFactory"); + this.scanCacheSize = SCAN_CACHE_SIZE; + } + + public void setScanCacheSize(int scanCacheSize) { + this.scanCacheSize = scanCacheSize; + } + + public Scan createScan(String id, Application application, Range range, byte[] family) { + range = rangeFactory.createStatisticsRange(range); + if (logger.isDebugEnabled()) { + logger.debug("scan time:{} ", range.prettyToString()); + } + + // start key is replaced by end key because timestamp has been reversed + byte[] startKey = ApplicationMapStatisticsUtils.makeRowKey(application.getName(), application.getServiceTypeCode(), range.getTo()); + byte[] endKey = ApplicationMapStatisticsUtils.makeRowKey(application.getName(), application.getServiceTypeCode(), range.getFrom()); + + final Scan scan = new Scan(); + scan.setCaching(this.scanCacheSize); + scan.withStartRow(startKey); + scan.withStopRow(endKey); + scan.addFamily(family); + scan.setId(id); + + return scan; + } +} diff --git a/web/src/main/java/com/navercorp/pinpoint/web/applicationmap/dao/mapper/MapStatisticsTimeWindowReducer.java b/web/src/main/java/com/navercorp/pinpoint/web/applicationmap/dao/mapper/MapStatisticsTimeWindowReducer.java index fd924dcb734d..e6f0c5414827 100644 --- a/web/src/main/java/com/navercorp/pinpoint/web/applicationmap/dao/mapper/MapStatisticsTimeWindowReducer.java +++ b/web/src/main/java/com/navercorp/pinpoint/web/applicationmap/dao/mapper/MapStatisticsTimeWindowReducer.java @@ -2,13 +2,13 @@ import com.navercorp.pinpoint.common.hbase.RowReducer; import com.navercorp.pinpoint.web.applicationmap.rawdata.LinkDataMap; -import com.navercorp.pinpoint.web.util.TimeWindow; +import com.navercorp.pinpoint.web.util.TimeWindowFunction; public class MapStatisticsTimeWindowReducer implements RowReducer { private final LinkDataMap result; - public MapStatisticsTimeWindowReducer(TimeWindow timeWindow) { + public MapStatisticsTimeWindowReducer(TimeWindowFunction timeWindow) { result = new LinkDataMap(timeWindow); } diff --git a/web/src/main/java/com/navercorp/pinpoint/web/applicationmap/dao/mapper/RowMapperFactory.java b/web/src/main/java/com/navercorp/pinpoint/web/applicationmap/dao/mapper/RowMapperFactory.java new file mode 100644 index 000000000000..ea709855d4c4 --- /dev/null +++ b/web/src/main/java/com/navercorp/pinpoint/web/applicationmap/dao/mapper/RowMapperFactory.java @@ -0,0 +1,10 @@ +package com.navercorp.pinpoint.web.applicationmap.dao.mapper; + +import com.navercorp.pinpoint.common.hbase.RowMapper; +import com.navercorp.pinpoint.web.util.TimeWindowFunction; + +@FunctionalInterface +public interface RowMapperFactory { + + RowMapper newMapper(TimeWindowFunction timeWindow); +}