From c2915c97891ec6208485f488414459d43aeaca42 Mon Sep 17 00:00:00 2001 From: emeroad Date: Fri, 23 Aug 2024 11:58:35 +0900 Subject: [PATCH] [#11411] Improve performance of ServerMap Link --- .../hbase/statistics/DefaultBulkWriter.java | 24 ++++++++++++------- .../dao/hbase/statistics/RowKeyMerge.java | 8 +++++-- .../dao/hbase/statistics/SyncWriter.java | 3 +++ 3 files changed, 24 insertions(+), 11 deletions(-) diff --git a/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/statistics/DefaultBulkWriter.java b/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/statistics/DefaultBulkWriter.java index 401b3a5c59bd7..42730d68990d6 100644 --- a/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/statistics/DefaultBulkWriter.java +++ b/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/statistics/DefaultBulkWriter.java @@ -1,6 +1,5 @@ package com.navercorp.pinpoint.collector.dao.hbase.statistics; -import com.navercorp.pinpoint.common.hbase.CasResult; import com.navercorp.pinpoint.common.hbase.CheckAndMax; import com.navercorp.pinpoint.common.hbase.HbaseColumnFamily; import com.navercorp.pinpoint.common.hbase.TableNameProvider; @@ -12,10 +11,11 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.concurrent.CompletableFuture; /** * @author emeroad @@ -93,12 +93,6 @@ public void flushLink() { } - private CompletableFuture checkAndMax(TableName tableName, CheckAndMax checkAndMax) { - Objects.requireNonNull(tableName, "tableName"); - Objects.requireNonNull(checkAndMax, "checkAndMax"); - return this.asyncTemplate.maxColumnValue(tableName, checkAndMax); - } - @Override public void flushAvgMax() { @@ -110,13 +104,25 @@ public void flushAvgMax() { } } + Map> maxUpdates = new HashMap<>(); for (Map.Entry entry : maxUpdateMap.entrySet()) { final RowInfo rowInfo = entry.getKey(); final Long val = entry.getValue(); final byte[] rowKey = getDistributedKey(rowInfo.getRowKey().getRowKey()); byte[] columnName = rowInfo.getColumnName().getColumnName(); CheckAndMax checkAndMax = new CheckAndMax(rowKey, getColumnFamilyName(), columnName, val); - checkAndMax(rowInfo.getTableName(), checkAndMax); + + List checkAndMaxes = maxUpdates.computeIfAbsent(rowInfo.getTableName(), k -> new ArrayList<>()); + checkAndMaxes.add(checkAndMax); + } + + for (Map.Entry> entry : maxUpdates.entrySet()) { + TableName tableName = entry.getKey(); + List maxs = entry.getValue(); + List> partition = ListUtils.partition(maxs, batchSize); + for (List checkAndMaxes : partition) { + this.asyncTemplate.maxColumnValue(tableName, checkAndMaxes); + } } } diff --git a/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/statistics/RowKeyMerge.java b/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/statistics/RowKeyMerge.java index d212997969ae6..e9ee48f859322 100644 --- a/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/statistics/RowKeyMerge.java +++ b/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/statistics/RowKeyMerge.java @@ -17,12 +17,11 @@ package com.navercorp.pinpoint.collector.dao.hbase.statistics; import com.navercorp.pinpoint.common.hbase.HbaseColumnFamily; - import com.sematext.hbase.wd.RowKeyDistributorByHashPrefix; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Increment; -import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import java.util.ArrayList; import java.util.Arrays; @@ -64,6 +63,8 @@ public Map> createBulkIncrement(Map da for (Map.Entry> rowKeyEntry : tableRowKeys.getValue().entrySet()) { Increment increment = createIncrement(rowKeyEntry, rowKeyDistributorByHashPrefix); incrementList.add(increment); + + } tableIncrementMap.put(tableName, incrementList); } @@ -74,9 +75,12 @@ private Increment createIncrement(Map.Entry> rowKeyEntr RowKey rowKey = rowKeyEntry.getKey(); byte[] key = getRowKey(rowKey, rowKeyDistributorByHashPrefix); final Increment increment = new Increment(key); + increment.setReturnResults(false); for (ColumnName columnName : rowKeyEntry.getValue()) { increment.addColumn(family, columnName.getColumnName(), columnName.getCallCount()); } + + logger.trace("create increment row:{}, column:{}", rowKey, rowKeyEntry.getValue()); return increment; } diff --git a/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/statistics/SyncWriter.java b/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/statistics/SyncWriter.java index de6d2eb972d3f..f55e26016d752 100644 --- a/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/statistics/SyncWriter.java +++ b/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/statistics/SyncWriter.java @@ -50,7 +50,10 @@ public void increment(RowKey rowKey, ColumnName columnName, long addition) { TableName tableName = tableNameProvider.getTableName(this.tableDescriptor.getTable()); final byte[] rowKeyBytes = getDistributedKey(rowKey.getRowKey()); + Increment increment = Increments.increment(rowKeyBytes, getColumnFamilyName(), columnName.getColumnName(), 1); + increment.setReturnResults(false); + this.hbaseTemplate.increment(tableName, increment); }